From 88e1523e403520f6741a5f47113433dc94019233 Mon Sep 17 00:00:00 2001 From: silent-cipher Date: Wed, 22 Apr 2026 16:35:29 +0530 Subject: [PATCH 01/12] feat: add network column to schema --- apps/backend/src/common/constants.ts | 3 + .../data-retention.service.spec.ts | 2 +- .../data-retention/data-retention.service.ts | 2 +- .../data-retention-baseline.entity.ts | 4 + .../src/database/entities/deal.entity.ts | 9 +- .../entities/job-schedule-state.entity.ts | 4 + .../entities/storage-provider.entity.ts | 4 + .../1776790420000-AddNetworkColumn.ts | 183 ++++++++++++++++++ .../repositories/job-schedule.repository.ts | 2 +- .../src/wallet-sdk/wallet-sdk.service.spec.ts | 2 +- .../src/wallet-sdk/wallet-sdk.service.ts | 2 +- 11 files changed, 211 insertions(+), 6 deletions(-) create mode 100644 apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts diff --git a/apps/backend/src/common/constants.ts b/apps/backend/src/common/constants.ts index 57416ae0..0b73afb9 100644 --- a/apps/backend/src/common/constants.ts +++ b/apps/backend/src/common/constants.ts @@ -7,3 +7,6 @@ export const ZERO_ADDRESS = "0x0000000000000000000000000000000000000000"; export const MAX_BLOCK_SIZE = 5 * 1024 * 1024; export const DEV_TAG = stringToHex("dev"); + +// First network will be used as default in absence of NETWORKS config +export const SUPPORTED_NETWORKS = ["calibration", "mainnet"] as const; diff --git a/apps/backend/src/data-retention/data-retention.service.spec.ts b/apps/backend/src/data-retention/data-retention.service.spec.ts index 17151bd1..b30ed0aa 100644 --- a/apps/backend/src/data-retention/data-retention.service.spec.ts +++ b/apps/backend/src/data-retention/data-retention.service.spec.ts @@ -210,7 +210,7 @@ describe("DataRetentionService", () => { successPeriods: "90", lastBlockNumber: "1200", }, - ["providerAddress"], + ["providerAddress", "network"], ); }); diff --git a/apps/backend/src/data-retention/data-retention.service.ts b/apps/backend/src/data-retention/data-retention.service.ts index f4d7ec6d..1a824f0f 100644 --- a/apps/backend/src/data-retention/data-retention.service.ts +++ b/apps/backend/src/data-retention/data-retention.service.ts @@ -457,7 +457,7 @@ export class DataRetentionService { successPeriods: baseline.successPeriods.toString(), lastBlockNumber: blockNumber.toString(), }, - ["providerAddress"], + ["providerAddress", "network"], ); } diff --git a/apps/backend/src/database/entities/data-retention-baseline.entity.ts b/apps/backend/src/database/entities/data-retention-baseline.entity.ts index e3b8169a..c6d857e5 100644 --- a/apps/backend/src/database/entities/data-retention-baseline.entity.ts +++ b/apps/backend/src/database/entities/data-retention-baseline.entity.ts @@ -1,10 +1,14 @@ import { Column, Entity, PrimaryColumn, UpdateDateColumn } from "typeorm"; +import type { Network } from "../../common/types.js"; @Entity("data_retention_baselines") export class DataRetentionBaseline { @PrimaryColumn({ name: "provider_address", type: "text" }) providerAddress!: string; + @PrimaryColumn({ name: "network", type: "text", default: "calibration" }) + network!: Network; + @Column({ name: "faulted_periods", type: "bigint" }) faultedPeriods!: string; // bigint stored as string diff --git a/apps/backend/src/database/entities/deal.entity.ts b/apps/backend/src/database/entities/deal.entity.ts index f03a9ea8..c435bede 100644 --- a/apps/backend/src/database/entities/deal.entity.ts +++ b/apps/backend/src/database/entities/deal.entity.ts @@ -8,6 +8,7 @@ import { PrimaryGeneratedColumn, UpdateDateColumn, } from "typeorm"; +import type { Network } from "../../common/types.js"; import { BigIntColumn } from "../helpers/bigint-column.js"; import { type DealMetadata, DealStatus, IpniStatus, type ServiceType } from "../types.js"; import type { Retrieval } from "./retrieval.entity.js"; @@ -21,6 +22,9 @@ export class Deal { @Column({ name: "sp_address" }) spAddress: string; + @Column({ name: "network", type: "text", default: "calibration" }) + network: Network; + @Column({ name: "wallet_address" }) walletAddress: string; @@ -151,7 +155,10 @@ export class Deal { (sp) => sp.deals, { onDelete: "CASCADE" }, ) - @JoinColumn({ name: "sp_address" }) + @JoinColumn([ + { name: "sp_address", referencedColumnName: "address" }, + { name: "network", referencedColumnName: "network" }, + ]) storageProvider: StorageProvider | null; @OneToMany("Retrieval", "deal") diff --git a/apps/backend/src/database/entities/job-schedule-state.entity.ts b/apps/backend/src/database/entities/job-schedule-state.entity.ts index d1758ae9..12390cb4 100644 --- a/apps/backend/src/database/entities/job-schedule-state.entity.ts +++ b/apps/backend/src/database/entities/job-schedule-state.entity.ts @@ -1,4 +1,5 @@ import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn, UpdateDateColumn } from "typeorm"; +import type { Network } from "../../common/types.js"; // `job_type` is stored as TEXT in Postgres, so legacy rows may still contain // values that are no longer scheduled for new work. Keep them in the entity @@ -26,6 +27,9 @@ export class JobScheduleState { @Column({ name: "sp_address", type: "text", default: "" }) spAddress!: string; + @Column({ name: "network", type: "text", default: "calibration" }) + network!: Network; + @Column({ name: "interval_seconds" }) intervalSeconds!: number; diff --git a/apps/backend/src/database/entities/storage-provider.entity.ts b/apps/backend/src/database/entities/storage-provider.entity.ts index 3c130094..d151cac2 100644 --- a/apps/backend/src/database/entities/storage-provider.entity.ts +++ b/apps/backend/src/database/entities/storage-provider.entity.ts @@ -1,4 +1,5 @@ import { Column, CreateDateColumn, Entity, Index, OneToMany, PrimaryColumn, UpdateDateColumn } from "typeorm"; +import type { Network } from "../../common/types.js"; import { BigIntColumn } from "../helpers/bigint-column.js"; import { Deal } from "./deal.entity.js"; @@ -8,6 +9,9 @@ export class StorageProvider { @PrimaryColumn() address!: string; + @PrimaryColumn({ name: "network", type: "text", default: "calibration" }) + network!: Network; + @BigIntColumn({ nullable: true }) providerId: bigint | null; diff --git a/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts b/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts new file mode 100644 index 00000000..74c88661 --- /dev/null +++ b/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts @@ -0,0 +1,183 @@ +import type { MigrationInterface, QueryRunner } from "typeorm"; +import { SUPPORTED_NETWORKS } from "../../common/constants.js"; +import { Network } from "../../common/types.js"; + +/** + * Add a `network` column to runtime tables so records from mainnet and calibration + * are isolated correctly when a single dealbot instance operates on both networks. + * + * Backfill strategy: existing rows are assigned 'calibration' because all + * currently running dealbot deployments target calibration. Operators switching a + * previously single-network deployment to mainnet must ensure their NETWORKS env + * var reflects the correct value and re-run a providers_refresh to populate the + * correct network-scoped rows. + */ +export class AddNetworkColumn1776790420000 implements MigrationInterface { + name = "AddNetworkColumn1776790420000"; + + public async up(queryRunner: QueryRunner): Promise { + const backfillNetwork = (process.env.DEALBOT_LEGACY_NETWORK_BACKFILL ?? process.env.NETWORK ?? "").trim(); + if (!SUPPORTED_NETWORKS.includes(backfillNetwork as Network)) { + throw new Error( + `AddNetworkColumn migration requires DEALBOT_LEGACY_NETWORK_BACKFILL (or legacy NETWORK) ` + + `to be set to one of: ${SUPPORTED_NETWORKS.join(", ")}. Got: "${backfillNetwork}"`, + ); + } + + // ------------------------------------------------------------------------- + // Add `network` columns first so composite PK/FK can reference them. + // ------------------------------------------------------------------------- + await queryRunner.query(` + ALTER TABLE storage_providers + ADD COLUMN IF NOT EXISTS network TEXT NOT NULL DEFAULT '${backfillNetwork}' + `); + + await queryRunner.query(` + ALTER TABLE deals + ADD COLUMN IF NOT EXISTS network TEXT NOT NULL DEFAULT '${backfillNetwork}' + `); + + // ------------------------------------------------------------------------- + // Indexes on storage_providers + // ------------------------------------------------------------------------- + await queryRunner.query(` + DROP INDEX IF EXISTS "IDX_storage_providers_region_is_active" + `); + + await queryRunner.query(` + CREATE INDEX IF NOT EXISTS "IDX_storage_providers_location_is_active" + ON storage_providers (location, is_active) + `); + + await queryRunner.query(` + CREATE INDEX IF NOT EXISTS "IDX_storage_providers_network_is_active" + ON storage_providers (network, is_active) + `); + + await queryRunner.query(` + CREATE INDEX IF NOT EXISTS "IDX_deals_network_sp_address" + ON deals (network, sp_address) + `); + + // ------------------------------------------------------------------------- + // Drop FK before dropping the PK it depends on. + // ------------------------------------------------------------------------- + await queryRunner.query(` + ALTER TABLE deals DROP CONSTRAINT IF EXISTS "FK_deals_storage_providers" + `); + + // ------------------------------------------------------------------------- + // storage_providers: change single-column PK to composite (address, network) + // ------------------------------------------------------------------------- + await queryRunner.query(` + ALTER TABLE storage_providers DROP CONSTRAINT IF EXISTS "PK_4edd0e54ccdb29b54a3ef1e2547" + `); + await queryRunner.query(` + ALTER TABLE storage_providers DROP CONSTRAINT IF EXISTS storage_providers_pkey + `); + + await queryRunner.query(` + ALTER TABLE storage_providers ADD PRIMARY KEY (address, network) + `); + + // ------------------------------------------------------------------------- + // Recreate deals FK against the new composite PK. + // ------------------------------------------------------------------------- + await queryRunner.query(` + ALTER TABLE deals + ADD CONSTRAINT "FK_deals_storage_providers" + FOREIGN KEY (sp_address, network) + REFERENCES storage_providers(address, network) + ON DELETE CASCADE + `); + + // ------------------------------------------------------------------------- + // job_schedule_state: replace unique constraint to include network + // ------------------------------------------------------------------------- + await queryRunner.query(` + ALTER TABLE job_schedule_state + ADD COLUMN IF NOT EXISTS network TEXT NOT NULL DEFAULT '${backfillNetwork}' + `); + + await queryRunner.query(` + ALTER TABLE job_schedule_state + DROP CONSTRAINT IF EXISTS job_schedule_state_job_type_sp_unique + `); + + await queryRunner.query(` + ALTER TABLE job_schedule_state + ADD CONSTRAINT job_schedule_state_job_type_sp_network_unique + UNIQUE (job_type, sp_address, network) + `); + + // ------------------------------------------------------------------------- + // data_retention_baselines: change single-column PK to composite (provider_address, network) + // ------------------------------------------------------------------------- + await queryRunner.query(` + ALTER TABLE data_retention_baselines + ADD COLUMN IF NOT EXISTS network TEXT NOT NULL DEFAULT '${backfillNetwork}' + `); + + await queryRunner.query(` + ALTER TABLE data_retention_baselines + DROP CONSTRAINT IF EXISTS data_retention_baselines_pkey + `); + + await queryRunner.query(` + ALTER TABLE data_retention_baselines + ADD PRIMARY KEY (provider_address, network) + `); + } + + public async down(queryRunner: QueryRunner): Promise { + // data_retention_baselines + await queryRunner.query(` + ALTER TABLE data_retention_baselines DROP CONSTRAINT IF EXISTS data_retention_baselines_pkey + `); + await queryRunner.query(` + ALTER TABLE data_retention_baselines ADD PRIMARY KEY (provider_address) + `); + await queryRunner.query(` + ALTER TABLE data_retention_baselines DROP COLUMN IF EXISTS network + `); + + // job_schedule_state + await queryRunner.query(` + ALTER TABLE job_schedule_state + DROP CONSTRAINT IF EXISTS job_schedule_state_job_type_sp_network_unique + `); + await queryRunner.query(` + ALTER TABLE job_schedule_state + ADD CONSTRAINT job_schedule_state_job_type_sp_unique + UNIQUE (job_type, sp_address) + `); + await queryRunner.query(` + ALTER TABLE job_schedule_state DROP COLUMN IF EXISTS network + `); + + // deals + await queryRunner.query(` + ALTER TABLE deals DROP CONSTRAINT IF EXISTS "FK_deals_storage_providers" + `); + await queryRunner.query(` + ALTER TABLE deals + ADD CONSTRAINT "FK_deals_storage_providers" + FOREIGN KEY (sp_address) + REFERENCES storage_providers(address) + ON DELETE CASCADE + `); + await queryRunner.query(`DROP INDEX IF EXISTS "IDX_deals_network_sp_address"`); + await queryRunner.query(`ALTER TABLE deals DROP COLUMN IF EXISTS network`); + + // storage_providers + await queryRunner.query(` + ALTER TABLE storage_providers DROP CONSTRAINT IF EXISTS storage_providers_pkey + `); + await queryRunner.query(` + ALTER TABLE storage_providers ADD PRIMARY KEY (address) + `); + await queryRunner.query(`DROP INDEX IF EXISTS "IDX_storage_providers_network_is_active"`); + await queryRunner.query(`DROP INDEX IF EXISTS "IDX_storage_providers_location_is_active"`); + await queryRunner.query(`ALTER TABLE storage_providers DROP COLUMN IF EXISTS network`); + } +} diff --git a/apps/backend/src/jobs/repositories/job-schedule.repository.ts b/apps/backend/src/jobs/repositories/job-schedule.repository.ts index f0da6a80..e698c2d2 100644 --- a/apps/backend/src/jobs/repositories/job-schedule.repository.ts +++ b/apps/backend/src/jobs/repositories/job-schedule.repository.ts @@ -44,7 +44,7 @@ export class JobScheduleRepository { ` INSERT INTO job_schedule_state (job_type, sp_address, interval_seconds, next_run_at) VALUES ($1, $2, $3, $4) - ON CONFLICT (job_type, sp_address) DO UPDATE + ON CONFLICT (job_type, sp_address, network) DO UPDATE SET interval_seconds = EXCLUDED.interval_seconds, paused = job_schedule_state.paused, updated_at = NOW() diff --git a/apps/backend/src/wallet-sdk/wallet-sdk.service.spec.ts b/apps/backend/src/wallet-sdk/wallet-sdk.service.spec.ts index 9b0a7070..982d441b 100644 --- a/apps/backend/src/wallet-sdk/wallet-sdk.service.spec.ts +++ b/apps/backend/src/wallet-sdk/wallet-sdk.service.spec.ts @@ -146,7 +146,7 @@ describe("WalletSdkService", () => { expect(loggerMock.error).not.toHaveBeenCalled(); const [entities, options] = repoMock.upsert.mock.calls[0]; - expect(options).toEqual(expect.objectContaining({ conflictPaths: ["address"] })); + expect(options).toEqual(expect.objectContaining({ conflictPaths: ["address", "network"] })); expect(entities).toEqual( expect.arrayContaining([ expect.objectContaining({ address: "0xdup", providerId: 21n, name: "new" }), diff --git a/apps/backend/src/wallet-sdk/wallet-sdk.service.ts b/apps/backend/src/wallet-sdk/wallet-sdk.service.ts index 81c4288b..530833ed 100644 --- a/apps/backend/src/wallet-sdk/wallet-sdk.service.ts +++ b/apps/backend/src/wallet-sdk/wallet-sdk.service.ts @@ -488,7 +488,7 @@ export class WalletSdkService implements OnModuleInit { ); await this.spRepository.upsert(entities, { - conflictPaths: ["address"], + conflictPaths: ["address", "network"], skipUpdateIfNoValuesChanged: true, }); } catch (error) { From 492caf1d9cb1487495afa7ee7bc4bd76cc9bc38f Mon Sep 17 00:00:00 2001 From: silent-cipher Date: Wed, 22 Apr 2026 16:42:22 +0530 Subject: [PATCH 02/12] fix: typecheck --- apps/backend/src/providers/providers.controller.spec.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/backend/src/providers/providers.controller.spec.ts b/apps/backend/src/providers/providers.controller.spec.ts index 28ec15c4..967cfae0 100644 --- a/apps/backend/src/providers/providers.controller.spec.ts +++ b/apps/backend/src/providers/providers.controller.spec.ts @@ -6,6 +6,7 @@ import { ProvidersService } from "./providers.service.js"; function makeProvider(overrides: Partial = {}): StorageProvider { return { + network: "calibration", address: "f01234", providerId: 99n, name: "Test SP", From 4c163ce8181af752ee15a3a500ae7899303c85bf Mon Sep 17 00:00:00 2001 From: silent-cipher Date: Wed, 22 Apr 2026 17:13:03 +0530 Subject: [PATCH 03/12] fix: align typeorm with migration --- .../entities/data-retention-baseline.entity.ts | 2 +- apps/backend/src/database/entities/deal.entity.ts | 4 +++- .../src/database/entities/job-schedule-state.entity.ts | 4 ++-- .../src/database/entities/storage-provider.entity.ts | 3 ++- apps/backend/src/wallet-sdk/wallet-sdk.service.spec.ts | 10 +++++----- apps/backend/src/wallet-sdk/wallet-sdk.service.ts | 6 ++++-- 6 files changed, 17 insertions(+), 12 deletions(-) diff --git a/apps/backend/src/database/entities/data-retention-baseline.entity.ts b/apps/backend/src/database/entities/data-retention-baseline.entity.ts index c6d857e5..56dec47a 100644 --- a/apps/backend/src/database/entities/data-retention-baseline.entity.ts +++ b/apps/backend/src/database/entities/data-retention-baseline.entity.ts @@ -6,7 +6,7 @@ export class DataRetentionBaseline { @PrimaryColumn({ name: "provider_address", type: "text" }) providerAddress!: string; - @PrimaryColumn({ name: "network", type: "text", default: "calibration" }) + @PrimaryColumn({ name: "network", type: "text" }) network!: Network; @Column({ name: "faulted_periods", type: "bigint" }) diff --git a/apps/backend/src/database/entities/deal.entity.ts b/apps/backend/src/database/entities/deal.entity.ts index c435bede..252570f4 100644 --- a/apps/backend/src/database/entities/deal.entity.ts +++ b/apps/backend/src/database/entities/deal.entity.ts @@ -2,6 +2,7 @@ import { Column, CreateDateColumn, Entity, + Index, JoinColumn, ManyToOne, OneToMany, @@ -15,6 +16,7 @@ import type { Retrieval } from "./retrieval.entity.js"; import { StorageProvider } from "./storage-provider.entity.js"; @Entity("deals") +@Index(["network", "spAddress"]) export class Deal { @PrimaryGeneratedColumn("uuid") id: string; @@ -22,7 +24,7 @@ export class Deal { @Column({ name: "sp_address" }) spAddress: string; - @Column({ name: "network", type: "text", default: "calibration" }) + @Column({ name: "network", type: "text" }) network: Network; @Column({ name: "wallet_address" }) diff --git a/apps/backend/src/database/entities/job-schedule-state.entity.ts b/apps/backend/src/database/entities/job-schedule-state.entity.ts index 12390cb4..761fbe43 100644 --- a/apps/backend/src/database/entities/job-schedule-state.entity.ts +++ b/apps/backend/src/database/entities/job-schedule-state.entity.ts @@ -15,7 +15,7 @@ export type JobType = | "piece_cleanup"; @Entity("job_schedule_state") -@Index("job_schedule_state_job_type_sp_unique", ["jobType", "spAddress"], { unique: true }) +@Index("job_schedule_state_job_type_sp_network_unique", ["jobType", "spAddress", "network"], { unique: true }) @Index("idx_job_schedule_state_next_run", ["nextRunAt"]) export class JobScheduleState { @PrimaryGeneratedColumn("increment", { type: "bigint" }) @@ -27,7 +27,7 @@ export class JobScheduleState { @Column({ name: "sp_address", type: "text", default: "" }) spAddress!: string; - @Column({ name: "network", type: "text", default: "calibration" }) + @Column({ name: "network", type: "text" }) network!: Network; @Column({ name: "interval_seconds" }) diff --git a/apps/backend/src/database/entities/storage-provider.entity.ts b/apps/backend/src/database/entities/storage-provider.entity.ts index d151cac2..e22844c6 100644 --- a/apps/backend/src/database/entities/storage-provider.entity.ts +++ b/apps/backend/src/database/entities/storage-provider.entity.ts @@ -5,11 +5,12 @@ import { Deal } from "./deal.entity.js"; @Entity("storage_providers") @Index(["location", "isActive"]) +@Index(["network", "isActive"]) export class StorageProvider { @PrimaryColumn() address!: string; - @PrimaryColumn({ name: "network", type: "text", default: "calibration" }) + @PrimaryColumn({ name: "network", type: "text" }) network!: Network; @BigIntColumn({ nullable: true }) diff --git a/apps/backend/src/wallet-sdk/wallet-sdk.service.spec.ts b/apps/backend/src/wallet-sdk/wallet-sdk.service.spec.ts index 982d441b..9888e25a 100644 --- a/apps/backend/src/wallet-sdk/wallet-sdk.service.spec.ts +++ b/apps/backend/src/wallet-sdk/wallet-sdk.service.spec.ts @@ -125,7 +125,7 @@ describe("WalletSdkService", () => { }); const other = makeProvider({ id: 22n, serviceProvider: "0xother" }); - await service.syncProvidersToDatabase([inactive, active, other]); + await service.syncProvidersToDatabase([inactive, active, other], "calibration"); expect(loggerMock.warn).toHaveBeenCalledWith( expect.objectContaining({ @@ -149,8 +149,8 @@ describe("WalletSdkService", () => { expect(options).toEqual(expect.objectContaining({ conflictPaths: ["address", "network"] })); expect(entities).toEqual( expect.arrayContaining([ - expect.objectContaining({ address: "0xdup", providerId: 21n, name: "new" }), - expect.objectContaining({ address: "0xother", providerId: 22n }), + expect.objectContaining({ network: "calibration", address: "0xdup", providerId: 21n, name: "new" }), + expect.objectContaining({ network: "calibration", address: "0xother", providerId: 22n }), ]), ); }); @@ -169,7 +169,7 @@ describe("WalletSdkService", () => { name: "inactive", }); - await service.syncProvidersToDatabase([active, inactive]); + await service.syncProvidersToDatabase([active, inactive], "calibration"); expect(loggerMock.warn).toHaveBeenCalledWith( expect.objectContaining({ @@ -202,7 +202,7 @@ describe("WalletSdkService", () => { name: "second", }); - await service.syncProvidersToDatabase([first, second]); + await service.syncProvidersToDatabase([first, second], "calibration"); expect(loggerMock.error).toHaveBeenCalledWith( expect.objectContaining({ diff --git a/apps/backend/src/wallet-sdk/wallet-sdk.service.ts b/apps/backend/src/wallet-sdk/wallet-sdk.service.ts index 530833ed..a46d58b4 100644 --- a/apps/backend/src/wallet-sdk/wallet-sdk.service.ts +++ b/apps/backend/src/wallet-sdk/wallet-sdk.service.ts @@ -10,6 +10,7 @@ import type { Repository } from "typeorm"; import type { Hex } from "viem"; import { toStructuredError } from "../common/logging.js"; import { createSynapseFromConfig } from "../common/synapse-factory.js"; +import { Network } from "../common/types.js"; import type { IBlockchainConfig, IConfig } from "../config/app.config.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import type { PDPProviderEx, WalletServices } from "./wallet-sdk.types.js"; @@ -177,7 +178,7 @@ export class WalletSdkService implements OnModuleInit { }; }); - this.syncProvidersToDatabase(extendedProviders).catch((err) => + this.syncProvidersToDatabase(extendedProviders, this.blockchainConfig.network).catch((err) => this.logger.error({ event: "providers_sync_to_db_failed", message: "Failed to sync providers to DB", @@ -398,7 +399,7 @@ export class WalletSdkService implements OnModuleInit { /** * Create or update provider in database */ - async syncProvidersToDatabase(providerInfos: PDPProviderEx[]): Promise { + async syncProvidersToDatabase(providerInfos: PDPProviderEx[], network: Network): Promise { try { const dedupedProviders = new Map(); const duplicatesByAddress = new Map>(); @@ -474,6 +475,7 @@ export class WalletSdkService implements OnModuleInit { const entities = Array.from(dedupedProviders.values()).map((info) => this.spRepository.create({ + network, address: info.serviceProvider as Hex, providerId: info.id, name: info.name, From 166a1b31f288cf8f5e9ef7bb1b89270eeb581823 Mon Sep 17 00:00:00 2001 From: silent-cipher Date: Wed, 22 Apr 2026 17:26:04 +0530 Subject: [PATCH 04/12] fix: correct migration rollback order --- .../1776790420000-AddNetworkColumn.ts | 54 +++++++++++++++---- 1 file changed, 45 insertions(+), 9 deletions(-) diff --git a/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts b/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts index 74c88661..11af78aa 100644 --- a/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts +++ b/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts @@ -130,6 +130,26 @@ export class AddNetworkColumn1776790420000 implements MigrationInterface { } public async down(queryRunner: QueryRunner): Promise { + // Reverting to a single-network schema is destructive when rows for multiple + // networks exist: collapsing storage_providers' PK back to (address) would + // fail on duplicate addresses that live under different networks. The + // operator must declare which network's data to keep; rows belonging to any + // other network are deleted before the schema is collapsed. + const keepNetwork = (process.env.DEALBOT_LEGACY_NETWORK_BACKFILL ?? process.env.NETWORK ?? "").trim(); + if (!SUPPORTED_NETWORKS.includes(keepNetwork as Network)) { + throw new Error( + `AddNetworkColumn.down migration requires DEALBOT_LEGACY_NETWORK_BACKFILL (or legacy NETWORK) ` + + `to declare which network's rows to preserve. Got: "${keepNetwork}". Allowed: ${SUPPORTED_NETWORKS.join(", ")}`, + ); + } + + // Delete non-kept-network rows. The composite FK on deals has ON DELETE + // CASCADE, so removing storage_providers rows also removes their deals. + await queryRunner.query(`DELETE FROM data_retention_baselines WHERE network <> $1`, [keepNetwork]); + await queryRunner.query(`DELETE FROM job_schedule_state WHERE network <> $1`, [keepNetwork]); + await queryRunner.query(`DELETE FROM deals WHERE network <> $1`, [keepNetwork]); + await queryRunner.query(`DELETE FROM storage_providers WHERE network <> $1`, [keepNetwork]); + // data_retention_baselines await queryRunner.query(` ALTER TABLE data_retention_baselines DROP CONSTRAINT IF EXISTS data_retention_baselines_pkey @@ -155,10 +175,21 @@ export class AddNetworkColumn1776790420000 implements MigrationInterface { ALTER TABLE job_schedule_state DROP COLUMN IF EXISTS network `); - // deals + // Drop composite FK on deals before altering the PK it depends on. await queryRunner.query(` ALTER TABLE deals DROP CONSTRAINT IF EXISTS "FK_deals_storage_providers" `); + + // Restore single-column PK on storage_providers so (address) is unique again + // before any FK targeting it is recreated. + await queryRunner.query(` + ALTER TABLE storage_providers DROP CONSTRAINT IF EXISTS storage_providers_pkey + `); + await queryRunner.query(` + ALTER TABLE storage_providers ADD PRIMARY KEY (address) + `); + + // Recreate the original single-column FK on deals. await queryRunner.query(` ALTER TABLE deals ADD CONSTRAINT "FK_deals_storage_providers" @@ -166,18 +197,23 @@ export class AddNetworkColumn1776790420000 implements MigrationInterface { REFERENCES storage_providers(address) ON DELETE CASCADE `); + + // Drop indexes that referenced the network column. await queryRunner.query(`DROP INDEX IF EXISTS "IDX_deals_network_sp_address"`); - await queryRunner.query(`ALTER TABLE deals DROP COLUMN IF EXISTS network`); + await queryRunner.query(`DROP INDEX IF EXISTS "IDX_storage_providers_network_is_active"`); - // storage_providers - await queryRunner.query(` - ALTER TABLE storage_providers DROP CONSTRAINT IF EXISTS storage_providers_pkey - `); + // Restore the pre-migration index name on storage_providers. The column is + // still named `location` here; migration 1761500000004 (rename region -> location) + // runs its own down() later in the revert chain and will rename this index's + // underlying column back to `region` transparently. + await queryRunner.query(`DROP INDEX IF EXISTS "IDX_storage_providers_location_is_active"`); await queryRunner.query(` - ALTER TABLE storage_providers ADD PRIMARY KEY (address) + CREATE INDEX IF NOT EXISTS "IDX_storage_providers_region_is_active" + ON storage_providers (location, is_active) `); - await queryRunner.query(`DROP INDEX IF EXISTS "IDX_storage_providers_network_is_active"`); - await queryRunner.query(`DROP INDEX IF EXISTS "IDX_storage_providers_location_is_active"`); + + // Finally, drop the network columns now that no constraint or index depends on them. + await queryRunner.query(`ALTER TABLE deals DROP COLUMN IF EXISTS network`); await queryRunner.query(`ALTER TABLE storage_providers DROP COLUMN IF EXISTS network`); } } From b4e9f0ceef032bb9d1436a300b49ce41bed0a1c5 Mon Sep 17 00:00:00 2001 From: silent-cipher Date: Wed, 22 Apr 2026 17:53:21 +0530 Subject: [PATCH 05/12] chore: fix migration comment --- .../migrations/1776790420000-AddNetworkColumn.ts | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts b/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts index 11af78aa..8f98b6a3 100644 --- a/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts +++ b/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts @@ -6,11 +6,13 @@ import { Network } from "../../common/types.js"; * Add a `network` column to runtime tables so records from mainnet and calibration * are isolated correctly when a single dealbot instance operates on both networks. * - * Backfill strategy: existing rows are assigned 'calibration' because all - * currently running dealbot deployments target calibration. Operators switching a - * previously single-network deployment to mainnet must ensure their NETWORKS env - * var reflects the correct value and re-run a providers_refresh to populate the - * correct network-scoped rows. + * Backfill strategy: existing rows are assigned the network declared by the + * operator via `DEALBOT_LEGACY_NETWORK_BACKFILL` (preferred) or the legacy + * `NETWORK` env var. The migration fails fast if neither is set to a supported + * network, so backfill is never silently wrong. Operators later expanding a + * single-network deployment to an additional network must update their + * `NETWORKS` config and re-run a providers_refresh to populate the + * network-scoped rows for the newly added network. */ export class AddNetworkColumn1776790420000 implements MigrationInterface { name = "AddNetworkColumn1776790420000"; From 4178902be68666071c329a3823467d5b96b0d9fe Mon Sep 17 00:00:00 2001 From: silent-cipher Date: Wed, 6 May 2026 12:11:23 +0530 Subject: [PATCH 06/12] chore: address pr comments --- .../data-retention.service.spec.ts | 103 ++++++++++---- .../data-retention/data-retention.service.ts | 83 +++++++---- .../data-retention-baseline.entity.ts | 8 +- .../src/database/entities/deal.entity.ts | 8 +- .../entities/job-schedule-state.entity.ts | 8 +- .../entities/storage-provider.entity.ts | 8 +- .../1776790420000-AddNetworkColumn.ts | 32 ++++- apps/backend/src/jobs/jobs.service.spec.ts | 10 +- apps/backend/src/jobs/jobs.service.ts | 15 +- .../repositories/job-schedule.repository.ts | 24 +++- .../pdp-subgraph/pdp-subgraph.service.spec.ts | 107 +++++--------- .../src/pdp-subgraph/pdp-subgraph.service.ts | 30 ++-- docs/runbooks/multi-network-migration.md | 131 ++++++++++++++++++ 13 files changed, 408 insertions(+), 159 deletions(-) create mode 100644 docs/runbooks/multi-network-migration.md diff --git a/apps/backend/src/data-retention/data-retention.service.spec.ts b/apps/backend/src/data-retention/data-retention.service.spec.ts index b30ed0aa..b9b2c870 100644 --- a/apps/backend/src/data-retention/data-retention.service.spec.ts +++ b/apps/backend/src/data-retention/data-retention.service.spec.ts @@ -62,7 +62,7 @@ describe("DataRetentionService", () => { configServiceMock = { get: vi.fn((key: keyof IConfig) => { if (key === "blockchain") { - return { pdpSubgraphEndpoint: "https://example.com/subgraph" }; + return { pdpSubgraphEndpoint: "https://example.com/subgraph", network: "calibration" }; } if (key === "spBlocklists") { return { ids: new Set(), addresses: new Set() }; @@ -153,7 +153,7 @@ describe("DataRetentionService", () => { it("returns early when all providers are blocked for data-retention", async () => { (configServiceMock.get as ReturnType).mockImplementation((key: string) => { - if (key === "blockchain") return { pdpSubgraphEndpoint: "https://example.com/subgraph" }; + if (key === "blockchain") return { pdpSubgraphEndpoint: "https://example.com/subgraph", network: "calibration" }; if (key === "spBlocklists") return { ids: new Set(), addresses: new Set([PROVIDER_A, PROVIDER_B]) }; }); @@ -164,7 +164,7 @@ describe("DataRetentionService", () => { it("excludes blocked providers from data-retention polling while retaining unblocked ones", async () => { (configServiceMock.get as ReturnType).mockImplementation((key: string) => { - if (key === "blockchain") return { pdpSubgraphEndpoint: "https://example.com/subgraph" }; + if (key === "blockchain") return { pdpSubgraphEndpoint: "https://example.com/subgraph", network: "calibration" }; if (key === "spBlocklists") return { ids: new Set(), addresses: new Set([PROVIDER_A]) }; }); pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([makeProvider({ address: PROVIDER_B })]); @@ -172,8 +172,8 @@ describe("DataRetentionService", () => { await service.pollDataRetention(); const allAddressesPolled: string[] = ( - pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls as [{ addresses: string[] }][] - ).flatMap(([{ addresses }]) => addresses); + pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls as [string, { addresses: string[] }][] + ).flatMap(([, { addresses }]) => addresses); expect(allAddressesPolled).toContain(PROVIDER_B.toLowerCase()); expect(allAddressesPolled).not.toContain(PROVIDER_A.toLowerCase()); }); @@ -192,7 +192,7 @@ describe("DataRetentionService", () => { await service.pollDataRetention(); expect(pdpSubgraphServiceMock.fetchSubgraphMeta).toHaveBeenCalled(); - expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenCalledWith({ + expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenCalledWith("https://example.com/subgraph", { blockNumber: 1200, addresses: [PROVIDER_A, PROVIDER_B], }); @@ -206,6 +206,7 @@ describe("DataRetentionService", () => { expect(mockBaselineRepository.upsert).toHaveBeenCalledWith( { providerAddress: PROVIDER_A, + network: "calibration", faultedPeriods: "10", successPeriods: "90", lastBlockNumber: "1200", @@ -259,8 +260,20 @@ describe("DataRetentionService", () => { it("handles multiple providers independently", async () => { // Seed DB baselines so first poll emits deltas mockBaselineRepository.find.mockResolvedValueOnce([ - { providerAddress: PROVIDER_A, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" }, - { providerAddress: PROVIDER_B, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" }, + { + providerAddress: PROVIDER_A, + network: "calibration", + faultedPeriods: "0", + successPeriods: "0", + lastBlockNumber: "1000", + }, + { + providerAddress: PROVIDER_B, + network: "calibration", + faultedPeriods: "0", + successPeriods: "0", + lastBlockNumber: "1000", + }, ]); const providerA = makeProvider({ address: PROVIDER_A, totalFaultedPeriods: 5n }); @@ -283,7 +296,13 @@ describe("DataRetentionService", () => { it("uses subgraph-confirmed totals directly without overdue estimation", async () => { // Seed baseline so we can verify the computed values via deltas mockBaselineRepository.find.mockResolvedValueOnce([ - { providerAddress: PROVIDER_A, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" }, + { + providerAddress: PROVIDER_A, + network: "calibration", + faultedPeriods: "0", + successPeriods: "0", + lastBlockNumber: "1000", + }, ]); const provider = makeProvider(); @@ -320,7 +339,13 @@ describe("DataRetentionService", () => { it("emits both faulted and success counters from subgraph totals", async () => { // Seed baseline so we can verify the computed values via deltas mockBaselineRepository.find.mockResolvedValueOnce([ - { providerAddress: PROVIDER_A, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" }, + { + providerAddress: PROVIDER_A, + network: "calibration", + faultedPeriods: "0", + successPeriods: "0", + lastBlockNumber: "1000", + }, ]); const provider = makeProvider(); @@ -386,7 +411,13 @@ describe("DataRetentionService", () => { // Seed baseline at zero so the full largeValue becomes the delta mockBaselineRepository.find.mockResolvedValueOnce([ - { providerAddress: PROVIDER_A, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" }, + { + providerAddress: PROVIDER_A, + network: "calibration", + faultedPeriods: "0", + successPeriods: "0", + lastBlockNumber: "1000", + }, ]); pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([ @@ -410,7 +441,13 @@ describe("DataRetentionService", () => { // Seed baseline at zero so the full value becomes the delta mockBaselineRepository.find.mockResolvedValueOnce([ - { providerAddress: PROVIDER_A, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" }, + { + providerAddress: PROVIDER_A, + network: "calibration", + faultedPeriods: "0", + successPeriods: "0", + lastBlockNumber: "1000", + }, ]); pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([ @@ -426,7 +463,13 @@ describe("DataRetentionService", () => { it("uses only subgraph-confirmed provider-level totals", async () => { // Seed baseline at zero so subgraph totals are visible as delta mockBaselineRepository.find.mockResolvedValueOnce([ - { providerAddress: PROVIDER_A, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" }, + { + providerAddress: PROVIDER_A, + network: "calibration", + faultedPeriods: "0", + successPeriods: "0", + lastBlockNumber: "1000", + }, ]); const provider = makeProvider({ @@ -458,12 +501,16 @@ describe("DataRetentionService", () => { // Should be called twice: once for first 50, once for remaining 25 expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenCalledTimes(2); - expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenNthCalledWith(1, { - addresses: expect.arrayContaining([expect.any(String)]), - blockNumber: 1200, - }); - expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls[0][0].addresses).toHaveLength(50); - expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls[1][0].addresses).toHaveLength(25); + expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenNthCalledWith( + 1, + "https://example.com/subgraph", + { + addresses: expect.arrayContaining([expect.any(String)]), + blockNumber: 1200, + }, + ); + expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls[0][1].addresses).toHaveLength(50); + expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls[1][1].addresses).toHaveLength(25); }); it("continues processing next batch if one batch fails", async () => { @@ -539,9 +586,9 @@ describe("DataRetentionService", () => { await service.pollDataRetention(); - // Should fetch stale provider info from database + // Should fetch stale provider info from database (network-scoped) expect(mockSPRepository.find).toHaveBeenCalledWith({ - where: { address: expect.anything() }, + where: { address: expect.anything(), network: "calibration" }, select: ["address", "providerId", "name", "isApproved"], }); @@ -780,9 +827,9 @@ describe("DataRetentionService", () => { await service.pollDataRetention(); - // Should fetch both stale providers in one query + // Should fetch both stale providers in one query (network-scoped) expect(mockSPRepository.find).toHaveBeenCalledWith({ - where: { address: expect.anything() }, + where: { address: expect.anything(), network: "calibration" }, select: ["address", "providerId", "name", "isApproved"], }); @@ -853,6 +900,7 @@ describe("DataRetentionService", () => { mockBaselineRepository.find.mockResolvedValueOnce([ { providerAddress: PROVIDER_A, + network: "calibration", faultedPeriods: "10", successPeriods: "90", lastBlockNumber: "1100", @@ -875,6 +923,7 @@ describe("DataRetentionService", () => { mockBaselineRepository.find.mockResolvedValueOnce([ { providerAddress: PROVIDER_A, + network: "calibration", faultedPeriods: "8", successPeriods: "85", lastBlockNumber: "1000", @@ -913,6 +962,7 @@ describe("DataRetentionService", () => { mockBaselineRepository.find.mockRejectedValueOnce(new Error("DB connection failed")).mockResolvedValueOnce([ { providerAddress: PROVIDER_A, + network: "calibration", faultedPeriods: "10", successPeriods: "90", lastBlockNumber: "1100", @@ -976,8 +1026,11 @@ describe("DataRetentionService", () => { await service.pollDataRetention(); - // Should delete the baseline from DB - expect(mockBaselineRepository.delete).toHaveBeenCalledWith({ providerAddress: PROVIDER_A }); + // Should delete the baseline from DB (network-scoped) + expect(mockBaselineRepository.delete).toHaveBeenCalledWith({ + providerAddress: PROVIDER_A, + network: "calibration", + }); }); }); diff --git a/apps/backend/src/data-retention/data-retention.service.ts b/apps/backend/src/data-retention/data-retention.service.ts index 1a824f0f..b0fb445b 100644 --- a/apps/backend/src/data-retention/data-retention.service.ts +++ b/apps/backend/src/data-retention/data-retention.service.ts @@ -6,7 +6,8 @@ import { Counter, Gauge } from "prom-client"; import { Raw, Repository } from "typeorm"; import { toStructuredError } from "../common/logging.js"; import { isSpBlocked } from "../common/sp-blocklist.js"; -import { IConfig } from "../config/app.config.js"; +import type { Network } from "../common/types.js"; +import { IBlockchainConfig, IConfig } from "../config/app.config.js"; import { DataRetentionBaseline } from "../database/entities/data-retention-baseline.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { buildCheckMetricLabels, CheckMetricLabels } from "../metrics-prometheus/check-metric-labels.js"; @@ -24,7 +25,7 @@ export class DataRetentionService { private static readonly CHALLENGES_PER_PROVING_PERIOD = 5n; /** - * Tracks cumulative faulted/success period totals per provider address. + * Tracks cumulative faulted/success period totals keyed by "network:providerAddress". * Used to compute deltas between consecutive polls for Prometheus counter increments. * Populated from the database on first poll, then kept in sync. * Note: Baselines are stored in periods, but emitted metrics are converted to challenges @@ -38,8 +39,8 @@ export class DataRetentionService { } >; - /** Whether baselines have been loaded from the database */ - private baselinesLoaded = false; + /** Per-network baseline load flags */ + private readonly baselinesLoadedByNetwork: Map = new Map(); constructor( private readonly configService: ConfigService, @@ -57,30 +58,39 @@ export class DataRetentionService { this.providerCumulativeTotals = new Map(); } + private cumulativeTotalsKey(network: Network, address: string): string { + return `${network}:${address}`; + } + /** * Polls the PDP subgraph for provider proof-set data, computes proving period deltas, * converts them to challenge counts, and increments Prometheus counters with the * challenge delta since the last poll. */ async pollDataRetention(): Promise { - const pdpSubgraphEndpoint = this.configService.get("blockchain").pdpSubgraphEndpoint; + const blockchainCfg = this.configService.get("blockchain"); + const { network, pdpSubgraphEndpoint } = blockchainCfg; if (!pdpSubgraphEndpoint) { this.logger.warn({ event: "pdp_subgraph_endpoint_not_configured", message: "No PDP subgraph endpoint configured", + network, }); return; } - await this.loadBaselinesFromDb(); + await this.loadBaselinesFromDb(network); - if (!this.baselinesLoaded) { + if (!this.baselinesLoadedByNetwork.get(network)) { // Cannot safely compute deltas without baselines — would emit full cumulative history + this.logger.log({ + event: "failed_to_load_baselines_by_network", + }); return; } try { - const subgraphMeta = await this.pdpSubgraphService.fetchSubgraphMeta(); + const subgraphMeta = await this.pdpSubgraphService.fetchSubgraphMeta(pdpSubgraphEndpoint); const allProviderInfos = this.walletSdkService.getTestingProviders(); const spBlocklists = this.configService.get("spBlocklists"); const providerInfos = allProviderInfos?.filter((p) => !isSpBlocked(spBlocklists, p.serviceProvider, p.id)); @@ -109,7 +119,7 @@ export class DataRetentionService { ); try { - const providersFromSubgraph = await this.pdpSubgraphService.fetchProvidersWithDatasets({ + const providersFromSubgraph = await this.pdpSubgraphService.fetchProvidersWithDatasets(pdpSubgraphEndpoint, { blockNumber, addresses: batchAddresses, }); @@ -125,7 +135,7 @@ export class DataRetentionService { ), ); } - return this.processProvider(provider, providerInfo, blockNumberBigInt); + return this.processProvider(provider, providerInfo, blockNumberBigInt, network); }), ); @@ -142,11 +152,12 @@ export class DataRetentionService { providerAddress: addr, providerId: providerInfo?.id, providerName: providerInfo?.name, + network, error: toStructuredError(result.reason), }); } else { const addr = providersFromSubgraph[index].address.toLowerCase(); - upsertPromises.push(this.persistBaseline(addr, result.value, blockNumberBigInt)); + upsertPromises.push(this.persistBaseline(addr, result.value, blockNumberBigInt, network)); } }); @@ -173,19 +184,21 @@ export class DataRetentionService { } } - // Only cleanup stale providers after successful poll to preserve baselines during transient failures if (!hasProcessingErrors) { - await this.cleanupStaleProviders(providerAddresses); + // Only cleanup stale providers after successful poll to preserve baselines during transient failures + await this.cleanupStaleProviders(providerAddresses, network); } else { this.logger.warn({ event: "stale_provider_cleanup_skipped", message: "Skipping stale provider cleanup due to processing errors", + network, }); } } catch (error) { this.logger.error({ event: "data_retention_poll_failed", message: "Failed to poll data retention", + network, error: toStructuredError(error), }); } @@ -201,12 +214,13 @@ export class DataRetentionService { * * @param activeProviderAddresses - Array of currently active provider addresses (normalized to lowercase) */ - private async cleanupStaleProviders(activeProviderAddresses: string[]): Promise { + private async cleanupStaleProviders(activeProviderAddresses: string[], network: Network): Promise { const activeAddressSet = new Set(activeProviderAddresses); const staleAddresses: string[] = []; - for (const [address] of this.providerCumulativeTotals) { - if (!activeAddressSet.has(address)) { + for (const [key] of this.providerCumulativeTotals) { + const [keyNetwork, address] = key.split(":", 2); + if (keyNetwork === network && address && !activeAddressSet.has(address)) { staleAddresses.push(address); } } @@ -224,7 +238,10 @@ export class DataRetentionService { let staleProviders: StorageProvider[] = []; try { staleProviders = await this.storageProviderRepository.find({ - where: { address: Raw((alias) => `LOWER(${alias}) IN (:...addresses)`, { addresses: staleAddresses }) }, + where: { + network, + address: Raw((alias) => `LOWER(${alias}) IN (:...addresses)`, { addresses: staleAddresses }), + }, select: ["address", "providerId", "name", "isApproved"], }); } catch (error) { @@ -240,6 +257,7 @@ export class DataRetentionService { const providerLookup = new Map(staleProviders.map((p) => [p.address.toLowerCase(), p])); for (const address of staleAddresses) { + const totalsKey = this.cumulativeTotalsKey(network, address); try { const provider = providerLookup.get(address); @@ -266,14 +284,15 @@ export class DataRetentionService { this.estimatedOverduePeriodsGauge.remove(unapprovedLabels); // Only delete local memory if Prometheus removal succeeded without throwing - this.providerCumulativeTotals.delete(address); + this.providerCumulativeTotals.delete(totalsKey); // Also remove persisted baseline from DB - this.baselineRepository.delete({ providerAddress: address }).catch((err) => { + this.baselineRepository.delete({ providerAddress: address, network }).catch((err) => { this.logger.warn({ event: "baseline_db_delete_failed", message: "Failed to delete persisted baseline for stale provider", providerAddress: address, + network, error: toStructuredError(err), }); }); @@ -320,6 +339,7 @@ export class DataRetentionService { provider: ProviderDataSetResponse["providers"][number], pdpProvider: PDPProviderEx, currentBlock: bigint, + network: Network, ): Promise<{ faultedPeriods: bigint; successPeriods: bigint }> { const { address, totalFaultedPeriods, totalProvingPeriods, proofSets } = provider; // Note: Query filters proofSets with nextDeadline_lt: $blockNumber, so all deadlines are in the past @@ -333,7 +353,8 @@ export class DataRetentionService { const confirmedTotalSuccess = totalProvingPeriods - totalFaultedPeriods; const normalizedAddress = address.toLowerCase(); - const previous = this.providerCumulativeTotals.get(normalizedAddress); + const totalsKey = this.cumulativeTotalsKey(network, normalizedAddress); + const previous = this.providerCumulativeTotals.get(totalsKey); const newBaseline = { faultedPeriods: totalFaultedPeriods, @@ -371,7 +392,7 @@ export class DataRetentionService { faultedPeriods: totalFaultedPeriods.toString(), successPeriods: confirmedTotalSuccess.toString(), }); - this.providerCumulativeTotals.set(normalizedAddress, newBaseline); + this.providerCumulativeTotals.set(totalsKey, newBaseline); return newBaseline; } @@ -393,7 +414,7 @@ export class DataRetentionService { successChallengesDelta: successChallengesDelta.toString(), }); // Reset baseline without incrementing counters - this.providerCumulativeTotals.set(normalizedAddress, newBaseline); + this.providerCumulativeTotals.set(totalsKey, newBaseline); return newBaseline; } @@ -405,38 +426,40 @@ export class DataRetentionService { this.safeIncrementCounter(this.dataSetChallengeStatusCounter, providerLabels, "success", successChallengesDelta); } - this.providerCumulativeTotals.set(normalizedAddress, newBaseline); + this.providerCumulativeTotals.set(totalsKey, newBaseline); return newBaseline; } /** * Loads persisted baselines from the database into the in-memory map. - * Only runs once; if the DB read fails, retries on the next poll. + * Only runs once per network; if the DB read fails, retries on the next poll. */ - private async loadBaselinesFromDb(): Promise { - if (this.baselinesLoaded) { + private async loadBaselinesFromDb(network: Network): Promise { + if (this.baselinesLoadedByNetwork.get(network)) { return; } try { - const rows = await this.baselineRepository.find(); + const rows = await this.baselineRepository.find({ where: { network } }); for (const row of rows) { - this.providerCumulativeTotals.set(row.providerAddress, { + this.providerCumulativeTotals.set(this.cumulativeTotalsKey(network, row.providerAddress), { faultedPeriods: BigInt(row.faultedPeriods), successPeriods: BigInt(row.successPeriods), }); } - this.baselinesLoaded = true; + this.baselinesLoadedByNetwork.set(network, true); this.logger.log({ event: "baselines_loaded_from_db", message: "Loaded baseline(s) from database", + network, baselineCount: rows.length, }); } catch (error) { this.logger.error({ event: "baseline_load_failed", message: "Failed to load baselines from database. Will retry on next poll.", + network, error: toStructuredError(error), }); } @@ -449,10 +472,12 @@ export class DataRetentionService { providerAddress: string, baseline: { faultedPeriods: bigint; successPeriods: bigint }, blockNumber: bigint, + network: Network, ): Promise { await this.baselineRepository.upsert( { providerAddress, + network, faultedPeriods: baseline.faultedPeriods.toString(), successPeriods: baseline.successPeriods.toString(), lastBlockNumber: blockNumber.toString(), diff --git a/apps/backend/src/database/entities/data-retention-baseline.entity.ts b/apps/backend/src/database/entities/data-retention-baseline.entity.ts index 56dec47a..365a2a26 100644 --- a/apps/backend/src/database/entities/data-retention-baseline.entity.ts +++ b/apps/backend/src/database/entities/data-retention-baseline.entity.ts @@ -1,4 +1,5 @@ import { Column, Entity, PrimaryColumn, UpdateDateColumn } from "typeorm"; +import { SUPPORTED_NETWORKS } from "../../common/constants.js"; import type { Network } from "../../common/types.js"; @Entity("data_retention_baselines") @@ -6,7 +7,12 @@ export class DataRetentionBaseline { @PrimaryColumn({ name: "provider_address", type: "text" }) providerAddress!: string; - @PrimaryColumn({ name: "network", type: "text" }) + @PrimaryColumn({ + name: "network", + type: "enum", + enum: [...SUPPORTED_NETWORKS], + enumName: "network_enum", + }) network!: Network; @Column({ name: "faulted_periods", type: "bigint" }) diff --git a/apps/backend/src/database/entities/deal.entity.ts b/apps/backend/src/database/entities/deal.entity.ts index 252570f4..1bbf357f 100644 --- a/apps/backend/src/database/entities/deal.entity.ts +++ b/apps/backend/src/database/entities/deal.entity.ts @@ -9,6 +9,7 @@ import { PrimaryGeneratedColumn, UpdateDateColumn, } from "typeorm"; +import { SUPPORTED_NETWORKS } from "../../common/constants.js"; import type { Network } from "../../common/types.js"; import { BigIntColumn } from "../helpers/bigint-column.js"; import { type DealMetadata, DealStatus, IpniStatus, type ServiceType } from "../types.js"; @@ -24,7 +25,12 @@ export class Deal { @Column({ name: "sp_address" }) spAddress: string; - @Column({ name: "network", type: "text" }) + @Column({ + name: "network", + type: "enum", + enum: [...SUPPORTED_NETWORKS], + enumName: "network_enum", + }) network: Network; @Column({ name: "wallet_address" }) diff --git a/apps/backend/src/database/entities/job-schedule-state.entity.ts b/apps/backend/src/database/entities/job-schedule-state.entity.ts index 761fbe43..6146da02 100644 --- a/apps/backend/src/database/entities/job-schedule-state.entity.ts +++ b/apps/backend/src/database/entities/job-schedule-state.entity.ts @@ -1,4 +1,5 @@ import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn, UpdateDateColumn } from "typeorm"; +import { SUPPORTED_NETWORKS } from "../../common/constants.js"; import type { Network } from "../../common/types.js"; // `job_type` is stored as TEXT in Postgres, so legacy rows may still contain @@ -27,7 +28,12 @@ export class JobScheduleState { @Column({ name: "sp_address", type: "text", default: "" }) spAddress!: string; - @Column({ name: "network", type: "text" }) + @Column({ + name: "network", + type: "enum", + enum: [...SUPPORTED_NETWORKS], + enumName: "network_enum", + }) network!: Network; @Column({ name: "interval_seconds" }) diff --git a/apps/backend/src/database/entities/storage-provider.entity.ts b/apps/backend/src/database/entities/storage-provider.entity.ts index e22844c6..71f7d256 100644 --- a/apps/backend/src/database/entities/storage-provider.entity.ts +++ b/apps/backend/src/database/entities/storage-provider.entity.ts @@ -1,4 +1,5 @@ import { Column, CreateDateColumn, Entity, Index, OneToMany, PrimaryColumn, UpdateDateColumn } from "typeorm"; +import { SUPPORTED_NETWORKS } from "../../common/constants.js"; import type { Network } from "../../common/types.js"; import { BigIntColumn } from "../helpers/bigint-column.js"; import { Deal } from "./deal.entity.js"; @@ -10,7 +11,12 @@ export class StorageProvider { @PrimaryColumn() address!: string; - @PrimaryColumn({ name: "network", type: "text" }) + @PrimaryColumn({ + name: "network", + type: "enum", + enum: [...SUPPORTED_NETWORKS], + enumName: "network_enum", + }) network!: Network; @BigIntColumn({ nullable: true }) diff --git a/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts b/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts index 8f98b6a3..cd9629d8 100644 --- a/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts +++ b/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts @@ -13,6 +13,11 @@ import { Network } from "../../common/types.js"; * single-network deployment to an additional network must update their * `NETWORKS` config and re-run a providers_refresh to populate the * network-scoped rows for the newly added network. + * + * Operator runbook: `docs/runbooks/multi-network-migration.md` covers the + * pre-migration checklist (backup, env vars, stopping writers), running the + * migration, post-migration verification, expanding to a second network, and + * rollback semantics. */ export class AddNetworkColumn1776790420000 implements MigrationInterface { name = "AddNetworkColumn1776790420000"; @@ -26,17 +31,32 @@ export class AddNetworkColumn1776790420000 implements MigrationInterface { ); } + // ------------------------------------------------------------------------- + // Create the shared Postgres enum type for network values. All four + // network-bearing tables reference this single type (mirrors TypeORM's + // `enumName: "network_enum"` on the entities), so adding/removing + // supported networks happens in one place. + // ------------------------------------------------------------------------- + await queryRunner.query(` + DO $$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'network_enum') THEN + CREATE TYPE network_enum AS ENUM (${SUPPORTED_NETWORKS.map((n) => `'${n}'`).join(", ")}); + END IF; + END$$; + `); + // ------------------------------------------------------------------------- // Add `network` columns first so composite PK/FK can reference them. // ------------------------------------------------------------------------- await queryRunner.query(` ALTER TABLE storage_providers - ADD COLUMN IF NOT EXISTS network TEXT NOT NULL DEFAULT '${backfillNetwork}' + ADD COLUMN IF NOT EXISTS network network_enum NOT NULL DEFAULT '${backfillNetwork}' `); await queryRunner.query(` ALTER TABLE deals - ADD COLUMN IF NOT EXISTS network TEXT NOT NULL DEFAULT '${backfillNetwork}' + ADD COLUMN IF NOT EXISTS network network_enum NOT NULL DEFAULT '${backfillNetwork}' `); // ------------------------------------------------------------------------- @@ -98,7 +118,7 @@ export class AddNetworkColumn1776790420000 implements MigrationInterface { // ------------------------------------------------------------------------- await queryRunner.query(` ALTER TABLE job_schedule_state - ADD COLUMN IF NOT EXISTS network TEXT NOT NULL DEFAULT '${backfillNetwork}' + ADD COLUMN IF NOT EXISTS network network_enum NOT NULL DEFAULT '${backfillNetwork}' `); await queryRunner.query(` @@ -117,7 +137,7 @@ export class AddNetworkColumn1776790420000 implements MigrationInterface { // ------------------------------------------------------------------------- await queryRunner.query(` ALTER TABLE data_retention_baselines - ADD COLUMN IF NOT EXISTS network TEXT NOT NULL DEFAULT '${backfillNetwork}' + ADD COLUMN IF NOT EXISTS network network_enum NOT NULL DEFAULT '${backfillNetwork}' `); await queryRunner.query(` @@ -217,5 +237,9 @@ export class AddNetworkColumn1776790420000 implements MigrationInterface { // Finally, drop the network columns now that no constraint or index depends on them. await queryRunner.query(`ALTER TABLE deals DROP COLUMN IF EXISTS network`); await queryRunner.query(`ALTER TABLE storage_providers DROP COLUMN IF EXISTS network`); + + // Drop the shared enum type once no column references it. We guard with + // IF EXISTS so a partial revert is idempotent. + await queryRunner.query(`DROP TYPE IF EXISTS network_enum`); } } diff --git a/apps/backend/src/jobs/jobs.service.spec.ts b/apps/backend/src/jobs/jobs.service.spec.ts index 5b8c58bc..0d196f66 100644 --- a/apps/backend/src/jobs/jobs.service.spec.ts +++ b/apps/backend/src/jobs/jobs.service.spec.ts @@ -116,7 +116,11 @@ describe("JobsService schedule rows", () => { baseConfigValues = { app: { runMode: "both" } as IConfig["app"], - blockchain: { useOnlyApprovedProviders: false, minNumDataSetsForChecks: 1 } as IConfig["blockchain"], + blockchain: { + useOnlyApprovedProviders: false, + minNumDataSetsForChecks: 1, + network: "calibration", + } as IConfig["blockchain"], scheduling: { providersRefreshIntervalSeconds: 4 * 3600, dataRetentionPollIntervalSeconds: 3600, @@ -643,7 +647,7 @@ describe("JobsService schedule rows", () => { it("uses approved-only filter when configured", async () => { baseConfigValues = { ...baseConfigValues, - blockchain: { useOnlyApprovedProviders: true } as IConfig["blockchain"], + blockchain: { useOnlyApprovedProviders: true, network: "calibration" } as IConfig["blockchain"], }; configService = { get: vi.fn((key: keyof IConfig) => baseConfigValues[key]), @@ -668,12 +672,14 @@ describe("JobsService schedule rows", () => { expect(jobScheduleRepositoryMock.upsertSchedule).toHaveBeenCalledWith( "providers_refresh", "", + "calibration", expect.any(Number), expect.any(Date), ); expect(jobScheduleRepositoryMock.upsertSchedule).toHaveBeenCalledWith( "data_retention_poll", "", + "calibration", expect.any(Number), expect.any(Date), ); diff --git a/apps/backend/src/jobs/jobs.service.ts b/apps/backend/src/jobs/jobs.service.ts index 01357225..b24dca55 100644 --- a/apps/backend/src/jobs/jobs.service.ts +++ b/apps/backend/src/jobs/jobs.service.ts @@ -963,6 +963,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { const providersRefreshStartAt = new Date(now.getTime() + phaseMs); const minDataSets = this.configService.get("blockchain").minNumDataSetsForChecks; + const network = this.configService.get("blockchain").network; const cleanupStartAt = new Date(now.getTime() + phaseMs); const spBlocklistsCfg = this.configService.get("spBlocklists"); @@ -979,12 +980,19 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { } for (const address of unblockedAddresses) { - await this.jobScheduleRepository.upsertSchedule("deal", address, dealIntervalSeconds, dealStartAt); - await this.jobScheduleRepository.upsertSchedule("retrieval", address, retrievalIntervalSeconds, retrievalStartAt); + await this.jobScheduleRepository.upsertSchedule("deal", address, network, dealIntervalSeconds, dealStartAt); + await this.jobScheduleRepository.upsertSchedule( + "retrieval", + address, + network, + retrievalIntervalSeconds, + retrievalStartAt, + ); if (minDataSets >= 1) { await this.jobScheduleRepository.upsertSchedule( "data_set_creation", address, + network, dataSetCreationIntervalSeconds, dataSetCreationStartAt, ); @@ -992,6 +1000,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { await this.jobScheduleRepository.upsertSchedule( "piece_cleanup", address, + network, pieceCleanupIntervalSeconds, cleanupStartAt, ); @@ -1018,12 +1027,14 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { await this.jobScheduleRepository.upsertSchedule( "data_retention_poll", "", + network, dataRetentionPollIntervalSeconds, dataRetentionPollStartAt, ); await this.jobScheduleRepository.upsertSchedule( "providers_refresh", "", + network, providersRefreshIntervalSeconds, providersRefreshStartAt, ); diff --git a/apps/backend/src/jobs/repositories/job-schedule.repository.ts b/apps/backend/src/jobs/repositories/job-schedule.repository.ts index e698c2d2..5a7bb47d 100644 --- a/apps/backend/src/jobs/repositories/job-schedule.repository.ts +++ b/apps/backend/src/jobs/repositories/job-schedule.repository.ts @@ -2,6 +2,7 @@ import { Injectable, Logger } from "@nestjs/common"; import { InjectDataSource } from "@nestjs/typeorm"; import type { DataSource } from "typeorm"; import { toStructuredError } from "../../common/logging.js"; +import type { Network } from "../../common/types.js"; import type { JobType } from "../../database/entities/job-schedule-state.entity.js"; import { DATA_RETENTION_POLL_QUEUE, @@ -31,25 +32,36 @@ export class JobScheduleRepository { ) {} /** - * Inserts or updates a schedule row for a specific job type and provider. - * If the row exists, it updates the interval and ensures the job is not paused. + * Inserts or updates a schedule row for a specific job type, provider, and + * network. If the row exists, it updates the interval and ensures the job + * is not paused. + * + * Schedules are scoped per network so the same provider running on multiple + * networks (e.g. mainnet + calibration) gets independent cadence rows. * * @param jobType - The type of job (deal, retrieval, metrics, etc.) * @param spAddress - The storage provider address (or empty string for global jobs) + * @param network - The blockchain network this schedule belongs to * @param intervalSeconds - The frequency of the job in seconds * @param nextRunAt - The scheduled time for the next run */ - async upsertSchedule(jobType: JobType, spAddress: string, intervalSeconds: number, nextRunAt: Date): Promise { + async upsertSchedule( + jobType: JobType, + spAddress: string, + network: Network, + intervalSeconds: number, + nextRunAt: Date, + ): Promise { await this.dataSource.query( ` - INSERT INTO job_schedule_state (job_type, sp_address, interval_seconds, next_run_at) - VALUES ($1, $2, $3, $4) + INSERT INTO job_schedule_state (job_type, sp_address, network, interval_seconds, next_run_at) + VALUES ($1, $2, $3, $4, $5) ON CONFLICT (job_type, sp_address, network) DO UPDATE SET interval_seconds = EXCLUDED.interval_seconds, paused = job_schedule_state.paused, updated_at = NOW() `, - [jobType, spAddress, intervalSeconds, nextRunAt], + [jobType, spAddress, network, intervalSeconds, nextRunAt], ); } diff --git a/apps/backend/src/pdp-subgraph/pdp-subgraph.service.spec.ts b/apps/backend/src/pdp-subgraph/pdp-subgraph.service.spec.ts index cd3a1ea8..7337b3d7 100644 --- a/apps/backend/src/pdp-subgraph/pdp-subgraph.service.spec.ts +++ b/apps/backend/src/pdp-subgraph/pdp-subgraph.service.spec.ts @@ -1,6 +1,4 @@ -import type { ConfigService } from "@nestjs/config"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import type { IConfig } from "../config/app.config.js"; import { PDPSubgraphService } from "./pdp-subgraph.service.js"; const VALID_ADDRESS = "0xd8da6bf26964af9d7eed9e03e53415d37aa96045" as const; @@ -40,16 +38,7 @@ describe("PDPSubgraphService", () => { let fetchMock: ReturnType; beforeEach(() => { - const configService = { - get: vi.fn((key: keyof IConfig) => { - if (key === "blockchain") { - return { pdpSubgraphEndpoint: SUBGRAPH_ENDPOINT }; - } - return undefined; - }), - } as unknown as ConfigService; - - service = new PDPSubgraphService(configService); + service = new PDPSubgraphService(); fetchMock = vi.fn(); vi.stubGlobal("fetch", fetchMock); @@ -69,7 +58,7 @@ describe("PDPSubgraphService", () => { json: async () => makeSubgraphResponse([makeValidProvider()]), }); - const providers = await service.fetchProvidersWithDatasets({ + const providers = await service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [VALID_ADDRESS], }); @@ -93,7 +82,7 @@ describe("PDPSubgraphService", () => { json: async () => makeSubgraphResponse([]), }); - const providers = await service.fetchProvidersWithDatasets({ + const providers = await service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [VALID_ADDRESS], }); @@ -101,7 +90,7 @@ describe("PDPSubgraphService", () => { }); it("returns empty array when addresses array is empty", async () => { - const providers = await service.fetchProvidersWithDatasets({ + const providers = await service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [], }); @@ -116,7 +105,7 @@ describe("PDPSubgraphService", () => { status: 500, }); - const promise = service.fetchProvidersWithDatasets({ + const promise = service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [VALID_ADDRESS], }); @@ -139,7 +128,7 @@ describe("PDPSubgraphService", () => { }), }); - const promise = service.fetchProvidersWithDatasets({ + const promise = service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [VALID_ADDRESS], }); @@ -155,7 +144,7 @@ describe("PDPSubgraphService", () => { it("throws on network failure", async () => { fetchMock.mockRejectedValueOnce(new Error("Network error")); - const promise = service.fetchProvidersWithDatasets({ + const promise = service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [VALID_ADDRESS], }); @@ -177,10 +166,7 @@ describe("PDPSubgraphService", () => { }); await expect( - service.fetchProvidersWithDatasets({ - blockNumber: 5000, - addresses: [VALID_ADDRESS], - }), + service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [VALID_ADDRESS] }), ).rejects.toThrow("Data validation failed"); // Should only be called once - no retries for validation errors @@ -196,10 +182,7 @@ describe("PDPSubgraphService", () => { }); await expect( - service.fetchProvidersWithDatasets({ - blockNumber: 5000, - addresses: [VALID_ADDRESS], - }), + service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [VALID_ADDRESS] }), ).rejects.toThrow("Data validation failed"); // Should only be called once - no retries for validation errors @@ -212,10 +195,7 @@ describe("PDPSubgraphService", () => { json: async () => makeSubgraphResponse([makeValidProvider()]), }); - await service.fetchProvidersWithDatasets({ - blockNumber: 12345, - addresses: [VALID_ADDRESS], - }); + await service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 12345, addresses: [VALID_ADDRESS] }); const body = JSON.parse(fetchMock.mock.calls[0][1].body); expect(body.variables.blockNumber).toBe("12345"); @@ -233,7 +213,7 @@ describe("PDPSubgraphService", () => { }), }); - const promise = service.fetchProvidersWithDatasets({ + const promise = service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [VALID_ADDRESS], }); @@ -255,10 +235,7 @@ describe("PDPSubgraphService", () => { }); const addresses = [VALID_ADDRESS, "0xAb5801a7D398351b8bE11C439e05C5B3259aeC9B"]; - await service.fetchProvidersWithDatasets({ - blockNumber: 5000, - addresses, - }); + await service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses }); const body = JSON.parse(fetchMock.mock.calls[0][1].body); expect(body.variables.addresses).toEqual(addresses); @@ -273,10 +250,7 @@ describe("PDPSubgraphService", () => { json: async () => makeSubgraphResponse([]), }); - await service.fetchProvidersWithDatasets({ - blockNumber: 5000, - addresses, - }); + await service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses }); // Should make 2 requests expect(fetchMock).toHaveBeenCalledTimes(2); @@ -289,7 +263,7 @@ describe("PDPSubgraphService", () => { json: async () => makeSubgraphResponse([makeValidProvider()]), }); - const promise = service.fetchProvidersWithDatasets({ + const promise = service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [VALID_ADDRESS], }); @@ -321,10 +295,7 @@ describe("PDPSubgraphService", () => { }; }); - const fetchPromise = service.fetchProvidersWithDatasets({ - blockNumber: 5000, - addresses, - }); + const fetchPromise = service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses }); await vi.runAllTimersAsync(); @@ -343,7 +314,7 @@ describe("PDPSubgraphService", () => { json: async () => makeSubgraphMetaResponse(12345), }); - const meta = await service.fetchSubgraphMeta(); + const meta = await service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT); expect(fetchMock).toHaveBeenCalledWith(SUBGRAPH_ENDPOINT, { method: "POST", @@ -361,13 +332,9 @@ describe("PDPSubgraphService", () => { }); it("throws when PDP subgraph endpoint is not configured", async () => { - const configService = { - get: vi.fn(() => ({ pdpSubgraphEndpoint: "" })), - } as unknown as ConfigService; - - const serviceWithoutEndpoint = new PDPSubgraphService(configService); + const serviceWithoutEndpoint = new PDPSubgraphService(); - await expect(serviceWithoutEndpoint.fetchSubgraphMeta()).rejects.toThrow("No PDP subgraph endpoint configured"); + await expect(serviceWithoutEndpoint.fetchSubgraphMeta("")).rejects.toThrow("No PDP subgraph endpoint configured"); }); it("throws on HTTP error response", async () => { @@ -377,7 +344,7 @@ describe("PDPSubgraphService", () => { statusText: "Internal Server Error", }); - const promise = service.fetchSubgraphMeta(); + const promise = service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT); promise.catch(() => {}); await vi.runAllTimersAsync(); @@ -394,7 +361,7 @@ describe("PDPSubgraphService", () => { }), }); - const promise = service.fetchSubgraphMeta(); + const promise = service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT); promise.catch(() => {}); await vi.runAllTimersAsync(); @@ -417,7 +384,7 @@ describe("PDPSubgraphService", () => { }), }); - await expect(service.fetchSubgraphMeta()).rejects.toThrow("Data validation failed"); + await expect(service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)).rejects.toThrow("Data validation failed"); expect(fetchMock).toHaveBeenCalledTimes(1); // Should not retry validation errors }); @@ -435,7 +402,7 @@ describe("PDPSubgraphService", () => { }), }); - await expect(service.fetchSubgraphMeta()).rejects.toThrow("Data validation failed"); + await expect(service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)).rejects.toThrow("Data validation failed"); expect(fetchMock).toHaveBeenCalledTimes(1); }); @@ -445,7 +412,7 @@ describe("PDPSubgraphService", () => { json: async () => makeSubgraphMetaResponse(12345), }); - const promise = service.fetchSubgraphMeta(); + const promise = service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT); await vi.runAllTimersAsync(); @@ -459,7 +426,7 @@ describe("PDPSubgraphService", () => { it("throws after MAX_RETRIES attempts on persistent network errors", async () => { fetchMock.mockRejectedValue(new Error("Network timeout")); - const promise = service.fetchSubgraphMeta(); + const promise = service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT); promise.catch(() => {}); await vi.runAllTimersAsync(); @@ -480,7 +447,7 @@ describe("PDPSubgraphService", () => { const startTime = Date.now(); // Make 5 requests - should all go through immediately - const promises = Array.from({ length: 5 }, () => service.fetchSubgraphMeta()); + const promises = Array.from({ length: 5 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await Promise.all(promises); @@ -499,13 +466,13 @@ describe("PDPSubgraphService", () => { }); // Fill up the rate limit window with 50 requests - const initialPromises = Array.from({ length: 50 }, () => service.fetchSubgraphMeta()); + const initialPromises = Array.from({ length: 50 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await Promise.all(initialPromises); fetchMock.mockClear(); // Try to make one more request - should wait for oldest to expire - const promise = service.fetchSubgraphMeta(); + const promise = service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT); // Advance past the 10 second window + buffer await vi.advanceTimersByTimeAsync(10010); @@ -528,7 +495,7 @@ describe("PDPSubgraphService", () => { }); // Fill 48 slots - const initialPromises = Array.from({ length: 48 }, () => service.fetchSubgraphMeta()); + const initialPromises = Array.from({ length: 48 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await vi.runAllTimersAsync(); await Promise.all(initialPromises); @@ -556,7 +523,7 @@ describe("PDPSubgraphService", () => { }); // Make 30 requests at t=0 - const batch1 = Array.from({ length: 30 }, () => service.fetchSubgraphMeta()); + const batch1 = Array.from({ length: 30 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await vi.runAllTimersAsync(); await Promise.all(batch1); @@ -564,7 +531,7 @@ describe("PDPSubgraphService", () => { await vi.advanceTimersByTimeAsync(5000); // Make 20 more requests at t=5000 - const batch2 = Array.from({ length: 20 }, () => service.fetchSubgraphMeta()); + const batch2 = Array.from({ length: 20 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await vi.runAllTimersAsync(); await Promise.all(batch2); @@ -575,7 +542,7 @@ describe("PDPSubgraphService", () => { fetchMock.mockClear(); // Should be able to make 30 more requests immediately - const batch3 = Array.from({ length: 30 }, () => service.fetchSubgraphMeta()); + const batch3 = Array.from({ length: 30 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await vi.runAllTimersAsync(); await Promise.all(batch3); @@ -589,13 +556,13 @@ describe("PDPSubgraphService", () => { }); // Fill the window - const initialPromises = Array.from({ length: 50 }, () => service.fetchSubgraphMeta()); + const initialPromises = Array.from({ length: 50 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await vi.runAllTimersAsync(); await Promise.all(initialPromises); fetchMock.mockClear(); - const promise = service.fetchSubgraphMeta(); + const promise = service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT); // Advance past the window + buffer await vi.advanceTimersByTimeAsync(10010); @@ -611,7 +578,7 @@ describe("PDPSubgraphService", () => { }); // Fill window with 50 requests - const batch1 = Array.from({ length: 50 }, () => service.fetchSubgraphMeta()); + const batch1 = Array.from({ length: 50 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await vi.runAllTimersAsync(); await Promise.all(batch1); @@ -642,7 +609,7 @@ describe("PDPSubgraphService", () => { }); // Fill 47 slots - const initial = Array.from({ length: 47 }, () => service.fetchSubgraphMeta()); + const initial = Array.from({ length: 47 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await vi.runAllTimersAsync(); await Promise.all(initial); @@ -674,7 +641,7 @@ describe("PDPSubgraphService", () => { }); // Make 20 requests - const batch1 = Array.from({ length: 20 }, () => service.fetchSubgraphMeta()); + const batch1 = Array.from({ length: 20 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await vi.runAllTimersAsync(); await Promise.all(batch1); @@ -684,7 +651,7 @@ describe("PDPSubgraphService", () => { fetchMock.mockClear(); // Make another request - should have full window available - await service.fetchSubgraphMeta(); + await service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT); const timestamps = (service as any).requestTimestamps; // Should only have 1 timestamp (the new one), old ones filtered out diff --git a/apps/backend/src/pdp-subgraph/pdp-subgraph.service.ts b/apps/backend/src/pdp-subgraph/pdp-subgraph.service.ts index aedd8bce..88407e24 100644 --- a/apps/backend/src/pdp-subgraph/pdp-subgraph.service.ts +++ b/apps/backend/src/pdp-subgraph/pdp-subgraph.service.ts @@ -1,7 +1,5 @@ import { Injectable, Logger } from "@nestjs/common"; -import { ConfigService } from "@nestjs/config"; import { toStructuredError } from "../common/logging.js"; -import type { IBlockchainConfig, IConfig } from "../config/app.config.js"; import { Queries } from "./queries.js"; import type { GraphQLResponse, ProviderDataSetResponse, ProvidersWithDataSetsOptions, SubgraphMeta } from "./types.js"; import { validateProviderDataSetResponse, validateSubgraphMetaResponse } from "./types.js"; @@ -23,7 +21,6 @@ class ValidationError extends Error { @Injectable() export class PDPSubgraphService { private readonly logger: Logger = new Logger(PDPSubgraphService.name); - private readonly blockchainConfig: IBlockchainConfig; private static readonly MAX_PROVIDERS_PER_QUERY = 100; private static readonly MAX_CONCURRENT_REQUESTS = 50; @@ -33,10 +30,6 @@ export class PDPSubgraphService { private requestTimestamps: number[] = []; - constructor(private readonly configService: ConfigService) { - this.blockchainConfig = this.configService.get("blockchain"); - } - /** * Fetch subgraph metadata including the latest indexed block number * @@ -44,15 +37,15 @@ export class PDPSubgraphService { * @returns Subgraph metadata with block number * @throws Error if endpoint is not configured or after MAX_RETRIES attempts */ - async fetchSubgraphMeta(attempt: number = 1): Promise { - if (!this.blockchainConfig.pdpSubgraphEndpoint) { + async fetchSubgraphMeta(endpoint: string, attempt: number = 1): Promise { + if (!endpoint) { throw new Error("No PDP subgraph endpoint configured"); } try { await this.enforceRateLimit(); - const response = await fetch(this.blockchainConfig.pdpSubgraphEndpoint, { + const response = await fetch(endpoint, { method: "POST", headers: { "Content-Type": "application/json", @@ -106,7 +99,7 @@ export class PDPSubgraphService { error: toStructuredError(error), }); await new Promise((resolve) => setTimeout(resolve, delay)); - return this.fetchSubgraphMeta(attempt + 1); + return this.fetchSubgraphMeta(endpoint, attempt + 1); } this.logger.error({ @@ -128,6 +121,7 @@ export class PDPSubgraphService { * @returns Array of providers with their data sets currently proving */ async fetchProvidersWithDatasets( + endpoint: string, options: ProvidersWithDataSetsOptions, ): Promise { const { blockNumber, addresses } = options; @@ -137,16 +131,17 @@ export class PDPSubgraphService { } if (addresses.length <= PDPSubgraphService.MAX_PROVIDERS_PER_QUERY) { - return this.fetchWithRetry(blockNumber, addresses); + return this.fetchWithRetry(endpoint, blockNumber, addresses); } - return this.fetchMultipleBatchesWithRateLimit(blockNumber, addresses); + return this.fetchMultipleBatchesWithRateLimit(endpoint, blockNumber, addresses); } /** * Fetch multiple batches with rate limiting and concurrency control */ private async fetchMultipleBatchesWithRateLimit( + endpoint: string, blockNumber: number, addresses: string[], ): Promise { @@ -161,7 +156,7 @@ export class PDPSubgraphService { for (let i = 0; i < batches.length; i += PDPSubgraphService.MAX_CONCURRENT_REQUESTS) { const batchGroup = batches.slice(i, i + PDPSubgraphService.MAX_CONCURRENT_REQUESTS); - const results = await Promise.all(batchGroup.map((batch) => this.fetchWithRetry(blockNumber, batch))); + const results = await Promise.all(batchGroup.map((batch) => this.fetchWithRetry(endpoint, blockNumber, batch))); allProviders.push(...results.flat()); } @@ -174,11 +169,12 @@ export class PDPSubgraphService { * Assuming initial request to be first attempt */ private async fetchWithRetry( + endpoint: string, blockNumber: number, addresses: string[], attempt: number = 1, ): Promise { - if (!this.blockchainConfig.pdpSubgraphEndpoint) { + if (!endpoint) { throw new Error("No PDP subgraph endpoint configured"); } @@ -190,7 +186,7 @@ export class PDPSubgraphService { try { await this.enforceRateLimit(); - const response = await fetch(this.blockchainConfig.pdpSubgraphEndpoint, { + const response = await fetch(endpoint, { method: "POST", headers: { "Content-Type": "application/json", @@ -247,7 +243,7 @@ export class PDPSubgraphService { error: toStructuredError(error), }); await new Promise((resolve) => setTimeout(resolve, delay)); - return this.fetchWithRetry(blockNumber, addresses, attempt + 1); + return this.fetchWithRetry(endpoint, blockNumber, addresses, attempt + 1); } this.logger.error({ diff --git a/docs/runbooks/multi-network-migration.md b/docs/runbooks/multi-network-migration.md new file mode 100644 index 00000000..f4cde7e0 --- /dev/null +++ b/docs/runbooks/multi-network-migration.md @@ -0,0 +1,131 @@ +# Multi-Network Migration Runbook + +This runbook walks operators through enabling the `network` column added by +`AddNetworkColumn1776790420000` (`apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts`). +The migration partitions runtime state (storage providers, deals, job schedules, +data-retention baselines) by blockchain network so a single dealbot instance — +or two cooperating instances — can safely operate on multiple networks +(e.g. `mainnet` and `calibration`) without rows colliding under shared keys. + +> Audience: operators upgrading an existing single-network deployment. Fresh +> deployments only need to set `NETWORK` (or `DEALBOT_LEGACY_NETWORK_BACKFILL`) +> to a supported value before first start; the migration runs automatically. + +## What the migration changes + +- Creates a shared Postgres `network_enum` type (`'calibration'`, `'mainnet'`). +- Adds a `network network_enum NOT NULL` column to: + - `storage_providers` (now part of the composite primary key with `address`) + - `deals` + - `job_schedule_state` (now part of the composite uniqueness with `job_type, sp_address`) + - `data_retention_baselines` (now part of the composite primary key with `provider_address`) +- Recreates the `deals → storage_providers` foreign key as a composite + `(sp_address, network) → (address, network)` reference. +- Replaces the unique `job_schedule_state_job_type_sp_unique` constraint with + `job_schedule_state_job_type_sp_network_unique`. + +The migration **fails fast** if the backfill network is not supplied or is not +in `SUPPORTED_NETWORKS` (see `apps/backend/src/common/constants.ts`). + +## Pre-migration checklist + +1. **Take a database backup.** This is a structural migration affecting four + tables and a foreign key. See `docs/runbooks/supabase-backup-restore.md`. +2. **Identify the network of all existing rows.** Pre-migration, the deployment + has been single-network. Confirm with operations which network's data + currently lives in the database. Allowed values: `calibration`, `mainnet`. +3. **Set `DEALBOT_LEGACY_NETWORK_BACKFILL`** (preferred) or rely on the legacy + `NETWORK` env var so the migration can backfill the new column. + + ```bash + export DEALBOT_LEGACY_NETWORK_BACKFILL=mainnet # or: calibration + ``` + +4. **Stop writers** (or scale to zero) for the duration of the migration so no + rows are inserted with the old default-only `network` column shape. + +## Running the migration + +The migration runs as part of the normal startup sequence +(`migrationsRun: true`). To run it explicitly: + +```bash +pnpm --filter @dealbot/backend run typeorm:migration:run +``` + +If the env var is missing or invalid, startup aborts with: + +``` +AddNetworkColumn migration requires DEALBOT_LEGACY_NETWORK_BACKFILL (or legacy NETWORK) +to be set to one of: calibration, mainnet. Got: "" +``` + +Set the env var and rerun. + +## Post-migration verification + +1. **Confirm the enum type exists**: + + ```sql + SELECT typname FROM pg_type WHERE typname = 'network_enum'; + ``` + +2. **Confirm every row has a network assigned**: + + ```sql + SELECT 'storage_providers' AS tbl, network, COUNT(*) + FROM storage_providers GROUP BY network + UNION ALL + SELECT 'deals', network, COUNT(*) FROM deals GROUP BY network + UNION ALL + SELECT 'job_schedule_state', network, COUNT(*) FROM job_schedule_state GROUP BY network + UNION ALL + SELECT 'data_retention_baselines', network, COUNT(*) FROM data_retention_baselines GROUP BY network; + ``` + + All groups should match the backfill network. + +3. **Restart the backend** and confirm the providers refresh job runs without + errors. The Prometheus `network` label on app metrics should reflect the + configured network. + +## Expanding to a second network + +Once the schema is migrated, adding a second network to a deployment is +purely an application-level change: + +1. Update the deployment configuration to point at the new network's RPC and + contracts (or run a second backend instance dedicated to it). +2. Trigger a `providers_refresh` job. The wallet SDK service writes new + `storage_providers` rows with the active `network` value, so the new + network's providers will not collide with existing rows even if SP + addresses overlap. +3. Job schedules, deals, and data-retention baselines created from this point + onward are automatically scoped to the new network. + +No database changes are required to onboard a new network — only the existing +`network_enum` values are accepted, so adding networks beyond +`SUPPORTED_NETWORKS` requires extending that constant and adding a follow-up +migration that calls `ALTER TYPE network_enum ADD VALUE 'newnet'`. + +## Rolling back + +The down migration is destructive when rows for multiple networks exist. The +operator must declare which network's data to preserve via +`DEALBOT_LEGACY_NETWORK_BACKFILL` (or legacy `NETWORK`); rows from any other +network are deleted before the schema collapses back to single-network shape. + +```bash +export DEALBOT_LEGACY_NETWORK_BACKFILL=mainnet +pnpm --filter @dealbot/backend run typeorm:migration:revert +``` + +After revert: + +- The `network_enum` type is dropped. +- `storage_providers` reverts to a single-column `(address)` primary key. +- The `deals → storage_providers` FK reverts to `(sp_address) → (address)`. +- The `job_schedule_state_job_type_sp_unique` constraint is restored. + +> Always take a fresh backup before reverting — deleted-other-network rows +> are not recoverable from the running database afterwards. From cdadd774b9818fa7b0ba5159aad8c8cd5f7da6d1 Mon Sep 17 00:00:00 2001 From: silent-cipher Date: Thu, 21 May 2026 18:25:54 +0530 Subject: [PATCH 07/12] fix: typecheck --- apps/backend/src/jobs/jobs.service.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/backend/src/jobs/jobs.service.ts b/apps/backend/src/jobs/jobs.service.ts index d9821bd2..360315d6 100644 --- a/apps/backend/src/jobs/jobs.service.ts +++ b/apps/backend/src/jobs/jobs.service.ts @@ -1096,6 +1096,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { await this.jobScheduleRepository.upsertSchedule( "pull_check", address, + network, pullCheckIntervalSeconds, pullCheckStartAt, ); @@ -1136,6 +1137,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { await this.jobScheduleRepository.upsertSchedule( "pull_piece_cleanup", "", + network, pullPieceCleanupIntervalSeconds, new Date(now.getTime() + phaseMs), ); From 0968395cfbec68b4ad66726951ebe3fb34470463 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Fri, 22 May 2026 03:55:57 -0400 Subject: [PATCH 08/12] fix(network): close multi-network gaps before #297 expansion (#510) * fix(network): close multi-network gaps before #297 expansion - NETWORK env now required (Joi). Migration's fail-fast was bypassed by Joi default writing back to process.env. - DealService + dev-tools set deal.network from config on every insert. Drop DB DEFAULT clauses post-backfill so missed write paths fail loudly. - Thread network through ScheduleRow and pg-boss payloads (SpJobData, ProvidersRefreshJobData, DataRetentionJobData) so workers can route by network instead of relying on the running pod's NETWORK env. * refactor: expand network filtering * refactor: use network filtering in job scheduling --------- Co-authored-by: silent-cipher --- apps/backend/src/config/app.config.ts | 4 +- .../1776790420000-AddNetworkColumn.ts | 12 +++++ apps/backend/src/deal/deal.service.ts | 3 +- .../src/dev-tools/dev-tools.service.ts | 4 ++ apps/backend/src/jobs/jobs.service.spec.ts | 48 +++++++++++------- apps/backend/src/jobs/jobs.service.ts | 46 +++++++++++------ .../repositories/job-schedule.repository.ts | 49 +++++++++++++++---- .../piece-cleanup/piece-cleanup.service.ts | 5 ++ .../src/retrieval/retrieval.service.ts | 10 ++-- .../src/wallet-sdk/wallet-sdk.service.spec.ts | 1 + 10 files changed, 132 insertions(+), 50 deletions(-) diff --git a/apps/backend/src/config/app.config.ts b/apps/backend/src/config/app.config.ts index 62300eaa..ffc70eff 100644 --- a/apps/backend/src/config/app.config.ts +++ b/apps/backend/src/config/app.config.ts @@ -45,7 +45,7 @@ export const configValidationSchema = Joi.object({ DATABASE_NAME: Joi.string().required(), // Blockchain - NETWORK: Joi.string().valid("mainnet", "calibration").default("calibration"), + NETWORK: Joi.string().valid("mainnet", "calibration").required(), WALLET_ADDRESS: Joi.string().required(), WALLET_PRIVATE_KEY: Joi.string().optional().empty(""), RPC_URL: Joi.string() @@ -440,7 +440,7 @@ export function loadConfig(): IConfig { database: process.env.DATABASE_NAME || "filecoin_dealbot", }, blockchain: { - network: (process.env.NETWORK || "calibration") as Network, + network: process.env.NETWORK as Network, rpcUrl: process.env.RPC_URL || undefined, sessionKeyPrivateKey: (process.env.SESSION_KEY_PRIVATE_KEY || undefined) as `0x${string}` | undefined, walletAddress: process.env.WALLET_ADDRESS || "0x0000000000000000000000000000000000000000", diff --git a/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts b/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts index cd9629d8..6bfa4fe9 100644 --- a/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts +++ b/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts @@ -149,6 +149,18 @@ export class AddNetworkColumn1776790420000 implements MigrationInterface { ALTER TABLE data_retention_baselines ADD PRIMARY KEY (provider_address, network) `); + + // ------------------------------------------------------------------------- + // Drop DEFAULT clauses now that backfill is complete. Future writes must + // set `network` explicitly so a missing app code path fails loudly with a + // NOT NULL violation instead of silently inheriting the migration-time + // default (which is frozen at the legacy backfill network and would + // mislabel rows after the deployment expands to a second network). + // ------------------------------------------------------------------------- + await queryRunner.query(`ALTER TABLE storage_providers ALTER COLUMN network DROP DEFAULT`); + await queryRunner.query(`ALTER TABLE deals ALTER COLUMN network DROP DEFAULT`); + await queryRunner.query(`ALTER TABLE job_schedule_state ALTER COLUMN network DROP DEFAULT`); + await queryRunner.query(`ALTER TABLE data_retention_baselines ALTER COLUMN network DROP DEFAULT`); } public async down(queryRunner: QueryRunner): Promise { diff --git a/apps/backend/src/deal/deal.service.ts b/apps/backend/src/deal/deal.service.ts index 2edf4026..e45fdcd7 100644 --- a/apps/backend/src/deal/deal.service.ts +++ b/apps/backend/src/deal/deal.service.ts @@ -311,6 +311,7 @@ export class DealService implements OnModuleInit, OnModuleDestroy { deal.fileName = dealInput.processedData.name; deal.fileSize = dealInput.processedData.size; deal.spAddress = providerAddress; + deal.network = this.blockchainConfig.network; deal.status = DealStatus.PENDING; deal.walletAddress = this.blockchainConfig.walletAddress; deal.metadata = dealInput.metadata; @@ -336,7 +337,7 @@ export class DealService implements OnModuleInit, OnModuleDestroy { try { // Load storageProvider relation deal.storageProvider = await this.storageProviderRepository.findOne({ - where: { address: deal.spAddress }, + where: { address: deal.spAddress, network: deal.network }, }); dealLogContext.providerId = deal.storageProvider?.providerId ?? dealLogContext.providerId; providerLabels = buildCheckMetricLabels({ diff --git a/apps/backend/src/dev-tools/dev-tools.service.ts b/apps/backend/src/dev-tools/dev-tools.service.ts index 7a837faf..87522b0f 100644 --- a/apps/backend/src/dev-tools/dev-tools.service.ts +++ b/apps/backend/src/dev-tools/dev-tools.service.ts @@ -1,8 +1,10 @@ import { BadRequestException, Injectable, Logger, NotFoundException } from "@nestjs/common"; +import { ConfigService } from "@nestjs/config"; import { InjectRepository } from "@nestjs/typeorm"; import type { Repository } from "typeorm"; import { DealJobTerminatedDataSetError } from "../common/errors.js"; import { type DealLogContext, toStructuredError } from "../common/logging.js"; +import type { IBlockchainConfig, IConfig } from "../config/app.config.js"; import { Deal } from "../database/entities/deal.entity.js"; import { DealStatus, RetrievalStatus } from "../database/types.js"; import { DealService } from "../deal/deal.service.js"; @@ -19,6 +21,7 @@ export class DevToolsService { private readonly walletSdkService: WalletSdkService, private readonly dealService: DealService, private readonly retrievalService: RetrievalService, + private readonly configService: ConfigService, @InjectRepository(Deal) private readonly dealRepository: Repository, ) {} @@ -93,6 +96,7 @@ export class DevToolsService { // Create a pending deal record first so we can return the ID immediately const pendingDeal = this.dealRepository.create({ spAddress, + network: this.configService.get("blockchain").network, walletAddress: this.dealService.getWalletAddress(), fileName: "pending", fileSize: 0, diff --git a/apps/backend/src/jobs/jobs.service.spec.ts b/apps/backend/src/jobs/jobs.service.spec.ts index 8630e3e8..87b270f4 100644 --- a/apps/backend/src/jobs/jobs.service.spec.ts +++ b/apps/backend/src/jobs/jobs.service.spec.ts @@ -317,6 +317,7 @@ describe("JobsService schedule rows", () => { data: { jobType: "deal", spAddress: "0xaaa", + network: "calibration", intervalSeconds: 60, }, }); @@ -375,6 +376,7 @@ describe("JobsService schedule rows", () => { data: { jobType: "retrieval", spAddress: "0xaaa", + network: "calibration", intervalSeconds: 60, }, }); @@ -410,6 +412,7 @@ describe("JobsService schedule rows", () => { data: { jobType: "retrieval", spAddress: "0xaaa", + network: "calibration", intervalSeconds: 60, }, }); @@ -451,6 +454,7 @@ describe("JobsService schedule rows", () => { data: { jobType: "retrieval", spAddress: "0xaaa", + network: "calibration", intervalSeconds: 60, }, }), @@ -657,7 +661,10 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "ensureScheduleRows"); - expect(jobScheduleRepositoryMock.deleteSchedulesForInactiveProviders).toHaveBeenCalledWith([providerA.address]); + expect(jobScheduleRepositoryMock.deleteSchedulesForInactiveProviders).toHaveBeenCalledWith( + [providerA.address], + "calibration", + ); }); it("does not delete schedule rows when no active providers exist", async () => { @@ -684,7 +691,7 @@ describe("JobsService schedule rows", () => { expect(storageProviderRepositoryMock.find).toHaveBeenCalledWith({ select: { address: true, providerId: true }, - where: { isActive: true, isApproved: true }, + where: { isActive: true, isApproved: true, network: "calibration" }, }); }); @@ -884,7 +891,7 @@ describe("JobsService schedule rows", () => { service, "deferJobForMaintenance", "deal", - { jobType: "deal", spAddress: "0xaaa", intervalSeconds: 60 }, + { jobType: "deal", spAddress: "0xaaa", network: "calibration", intervalSeconds: 60 }, maintenance, now, ); @@ -893,7 +900,7 @@ describe("JobsService schedule rows", () => { expect(safeSend).toHaveBeenCalledWith( "deal", "sp.work", - { jobType: "deal", spAddress: "0xaaa", intervalSeconds: 60 }, + { jobType: "deal", spAddress: "0xaaa", network: "calibration", intervalSeconds: 60 }, { startAfter: expectedResumeAt }, ); }); @@ -923,7 +930,7 @@ describe("JobsService schedule rows", () => { service, "deferJobForMaintenance", "retrieval", - { jobType: "retrieval", spAddress: "0xbbb", intervalSeconds: 60 }, + { jobType: "retrieval", spAddress: "0xbbb", network: "calibration", intervalSeconds: 60 }, maintenance, now, ); @@ -932,7 +939,7 @@ describe("JobsService schedule rows", () => { expect(safeSend).toHaveBeenCalledWith( "retrieval", "sp.work", - { jobType: "retrieval", spAddress: "0xbbb", intervalSeconds: 60 }, + { jobType: "retrieval", spAddress: "0xbbb", network: "calibration", intervalSeconds: 60 }, { startAfter: expectedResumeAt }, ); }); @@ -958,7 +965,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleDealJob", { id: "job-deal-1", - data: { jobType: "deal", spAddress: "0xaaa", intervalSeconds: 60 }, + data: { jobType: "deal", spAddress: "0xaaa", network: "calibration", intervalSeconds: 60 }, }); expect(dealService.createDealForProvider).toHaveBeenCalledTimes(1); @@ -998,7 +1005,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleDealJob", { id: "job-deal-no-quota-gate", - data: { jobType: "deal", spAddress: "0xaaa", intervalSeconds: 60 }, + data: { jobType: "deal", spAddress: "0xaaa", network: "calibration", intervalSeconds: 60 }, }); expect(pieceCleanupService.cleanupPiecesForProvider).not.toHaveBeenCalled(); @@ -1030,7 +1037,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleDealJob", { id: "job-deal-terminated", - data: { jobType: "deal", spAddress: "0xaaa", intervalSeconds: 60 }, + data: { jobType: "deal", spAddress: "0xaaa", network: "calibration", intervalSeconds: 60 }, }); expect(dealService.createDealForProvider).toHaveBeenCalledTimes(1); @@ -1059,7 +1066,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleDataSetCreationJob", { id: "job-ds-1", - data: { jobType: "data_set_creation", spAddress: "0xaaa", intervalSeconds: 3600 }, + data: { jobType: "data_set_creation", spAddress: "0xaaa", network: "calibration", intervalSeconds: 3600 }, }); expect(dealService.createDataSetWithPiece).toHaveBeenCalledTimes(1); @@ -1101,7 +1108,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleDataSetCreationJob", { id: "job-ds-2", - data: { jobType: "data_set_creation", spAddress: "0xaaa", intervalSeconds: 3600 }, + data: { jobType: "data_set_creation", spAddress: "0xaaa", network: "calibration", intervalSeconds: 3600 }, }); expect(dealService.createDataSetWithPiece).not.toHaveBeenCalled(); @@ -1142,7 +1149,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleDataSetCreationJob", { id: "job-ds-3", - data: { jobType: "data_set_creation", spAddress: "0xaaa", intervalSeconds: 3600 }, + data: { jobType: "data_set_creation", spAddress: "0xaaa", network: "calibration", intervalSeconds: 3600 }, }); // Only the first missing data set (index 0) should be created @@ -1187,7 +1194,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleDataSetCreationJob", { id: "job-ds-3b", - data: { jobType: "data_set_creation", spAddress: "0xaaa", intervalSeconds: 3600 }, + data: { jobType: "data_set_creation", spAddress: "0xaaa", network: "calibration", intervalSeconds: 3600 }, }); // Should skip index 0 (exists) and create only index 1 @@ -1334,7 +1341,7 @@ describe("JobsService schedule rows", () => { expect(jobTypes).not.toContain("retrieval"); // Blocked provider is excluded from the active-address list passed to cleanup, // so its existing schedule rows will be deleted. - expect(jobScheduleRepositoryMock.deleteSchedulesForInactiveProviders).toHaveBeenCalledWith([]); + expect(jobScheduleRepositoryMock.deleteSchedulesForInactiveProviders).toHaveBeenCalledWith([], "calibration"); }); it("deal job is skipped at runtime when provider is blocked", async () => { @@ -1360,7 +1367,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleDealJob", { id: "job-blocked-deal", - data: { jobType: "deal", spAddress: "0xaaa", intervalSeconds: 60 }, + data: { jobType: "deal", spAddress: "0xaaa", network: "calibration", intervalSeconds: 60 }, }); expect(dealService.createDealForProvider).not.toHaveBeenCalled(); @@ -1384,7 +1391,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleRetrievalJob", { id: "job-blocked-retrieval", - data: { jobType: "retrieval", spAddress: "0xaaa", intervalSeconds: 60 }, + data: { jobType: "retrieval", spAddress: "0xaaa", network: "calibration", intervalSeconds: 60 }, }); expect(retrievalService.performRandomRetrievalForProvider).not.toHaveBeenCalled(); @@ -1413,7 +1420,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleDataSetCreationJob", { id: "job-blocked-ds", - data: { jobType: "data_set_creation", spAddress: "0xaaa", intervalSeconds: 3600 }, + data: { jobType: "data_set_creation", spAddress: "0xaaa", network: "calibration", intervalSeconds: 3600 }, }); expect(dealService.createDataSetWithPiece).not.toHaveBeenCalled(); @@ -1479,7 +1486,12 @@ describe("JobsService schedule rows", () => { for (const testCase of cases) { await callPrivate(testCase.service, testCase.handler, { id: `job-address-blocked-${testCase.jobType}`, - data: { jobType: testCase.jobType, spAddress: "0xaaa", intervalSeconds: testCase.intervalSeconds }, + data: { + jobType: testCase.jobType, + spAddress: "0xaaa", + network: "calibration", + intervalSeconds: testCase.intervalSeconds, + }, }); testCase.expectCheckNotRun(); diff --git a/apps/backend/src/jobs/jobs.service.ts b/apps/backend/src/jobs/jobs.service.ts index 360315d6..1915bf78 100644 --- a/apps/backend/src/jobs/jobs.service.ts +++ b/apps/backend/src/jobs/jobs.service.ts @@ -9,6 +9,7 @@ import { DealJobTerminatedDataSetError } from "../common/errors.js"; import { type JobLogContext, type ProviderJobContext, toStructuredError } from "../common/logging.js"; import { getMaintenanceWindowStatus } from "../common/maintenance-window.js"; import { isSpBlocked } from "../common/sp-blocklist.js"; +import type { Network } from "../common/types.js"; import type { IConfig, ISpBlocklistConfig } from "../config/app.config.js"; import { DataRetentionService } from "../data-retention/data-retention.service.js"; import type { JobType } from "../database/entities/job-schedule-state.entity.js"; @@ -39,16 +40,17 @@ function isSpJobType(jobType: string): jobType is SpJobType { return SP_JOB_TYPES.has(jobType); } -type SpJobData = { jobType: SpJobType; spAddress: string; intervalSeconds: number }; -type ProvidersRefreshJobData = { intervalSeconds: number }; +type SpJobData = { jobType: SpJobType; spAddress: string; network: Network; intervalSeconds: number }; +type ProvidersRefreshJobData = { network: Network; intervalSeconds: number }; type SpJob = Job; -type DataRetentionJobData = { intervalSeconds: number }; -type PullPieceCleanupJobData = { intervalSeconds: number }; +type DataRetentionJobData = { network: Network; intervalSeconds: number }; +type PullPieceCleanupJobData = { network: Network; intervalSeconds: number }; type ScheduleRow = { id: number; job_type: JobType; sp_address: string; + network: Network; interval_seconds: number; next_run_at: string; }; @@ -1035,12 +1037,15 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { pullPieceCleanupIntervalSeconds, } = this.getIntervalSecondsForRates(); - const useOnlyApprovedProviders = this.configService.get("blockchain").useOnlyApprovedProviders; + const blockchainCfg = this.configService.get("blockchain", { infer: true }); + const network = blockchainCfg.network; + + const useOnlyApprovedProviders = blockchainCfg.useOnlyApprovedProviders; // Active providers are guaranteed to support ipniIpfs // as validated by WalletSdkService.loadProvidersInternal() const providers = await this.storageProviderRepository.find({ select: { address: true, providerId: true }, - where: useOnlyApprovedProviders ? { isActive: true, isApproved: true } : { isActive: true }, + where: { network, isActive: true, ...(useOnlyApprovedProviders && { isApproved: true }) }, }); const phaseMs = this.schedulePhaseSeconds() * 1000; @@ -1050,8 +1055,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { const dataRetentionPollStartAt = new Date(now.getTime() + phaseMs); const providersRefreshStartAt = new Date(now.getTime() + phaseMs); - const minDataSets = this.configService.get("blockchain").minNumDataSetsForChecks; - const network = this.configService.get("blockchain").network; + const minDataSets = blockchainCfg.minNumDataSetsForChecks; const cleanupStartAt = new Date(now.getTime() + phaseMs); const pullCheckStartAt = new Date(now.getTime() + phaseMs); @@ -1103,7 +1107,10 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { } if (providers.length > 0) { - const deletedAddresses = await this.jobScheduleRepository.deleteSchedulesForInactiveProviders(unblockedAddresses); + const deletedAddresses = await this.jobScheduleRepository.deleteSchedulesForInactiveProviders( + unblockedAddresses, + network, + ); if (deletedAddresses.length > 0) { this.logger.warn({ event: "job_schedules_deleted", @@ -1171,6 +1178,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { private async enqueueDueJobs(): Promise { if (!this.boss) return; + const network = this.configService.get("blockchain", { infer: true }).network; const now = new Date(); const maintenance = this.getMaintenanceWindowStatus(now); @@ -1181,7 +1189,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { } await this.jobScheduleRepository.runTransaction(async (manager) => { - const rows = await this.jobScheduleRepository.findDueSchedulesWithManager(manager, now); + const rows = await this.jobScheduleRepository.findDueSchedulesWithManager(manager, now, network); for (const row of rows) { const timing = this.getScheduleTiming(row, now); @@ -1258,9 +1266,14 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { row.job_type === "piece_cleanup" || row.job_type === "pull_check" ) { - return { jobType: row.job_type, spAddress: row.sp_address, intervalSeconds: row.interval_seconds }; + return { + jobType: row.job_type, + spAddress: row.sp_address, + network: row.network, + intervalSeconds: row.interval_seconds, + }; } - return { intervalSeconds: row.interval_seconds }; + return { network: row.network, intervalSeconds: row.interval_seconds }; } /** @@ -1324,6 +1337,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { * Refreshes queue depth and age gauges from pg-boss tables. */ private async updateQueueMetrics(): Promise { + const network = this.configService.get("blockchain", { infer: true }).network; const jobTypes: JobType[] = [ "deal", "retrieval", @@ -1343,7 +1357,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { this.oldestInFlightAgeGauge.set({ job_type: jobType }, 0); } - const rows = await this.jobScheduleRepository.countBossJobStates(["created", "retry", "active"]); + const rows = await this.jobScheduleRepository.countBossJobStates(["created", "retry", "active"], network); if (rows.length > 0) { for (const row of rows) { const jobType = row.job_type as JobType; @@ -1365,20 +1379,20 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { }); } - const pausedSchedules = await this.jobScheduleRepository.countPausedSchedules(); + const pausedSchedules = await this.jobScheduleRepository.countPausedSchedules(network); for (const row of pausedSchedules) { this.jobsPausedGauge.set({ job_type: row.job_type }, row.count); } const now = new Date(); - const queuedAges = await this.jobScheduleRepository.minBossJobAgeSecondsByState("created", now); + const queuedAges = await this.jobScheduleRepository.minBossJobAgeSecondsByState("created", now, network); for (const row of queuedAges) { const jobType = row.job_type as JobType; if (!jobTypes.includes(jobType)) continue; this.oldestQueuedAgeGauge.set({ job_type: jobType }, Math.max(0, row.min_age_seconds ?? 0)); } - const activeAges = await this.jobScheduleRepository.minBossJobAgeSecondsByState("active", now); + const activeAges = await this.jobScheduleRepository.minBossJobAgeSecondsByState("active", now, network); for (const row of activeAges) { const jobType = row.job_type as JobType; if (!jobTypes.includes(jobType)) continue; diff --git a/apps/backend/src/jobs/repositories/job-schedule.repository.ts b/apps/backend/src/jobs/repositories/job-schedule.repository.ts index edc7c11f..2ea494b5 100644 --- a/apps/backend/src/jobs/repositories/job-schedule.repository.ts +++ b/apps/backend/src/jobs/repositories/job-schedule.repository.ts @@ -15,6 +15,7 @@ export type ScheduleRow = { id: number; job_type: JobType; sp_address: string; + network: Network; interval_seconds: number; next_run_at: string; }; @@ -72,21 +73,24 @@ export class JobScheduleRepository { * @param activeAddresses - List of currently active provider addresses to keep. * @returns Array of storage provider addresses whose schedules were deleted. */ - async deleteSchedulesForInactiveProviders(activeAddresses: string[]): Promise { + async deleteSchedulesForInactiveProviders(activeAddresses: string[], network: Network): Promise { try { if (activeAddresses.length === 0) { this.logger.warn({ event: "delete_all_provider_schedules_warning", message: "Deleting all provider schedules because activeAddresses is empty. Ensure this is intended to avoid mass deletion.", + network, }); const [rows] = (await this.dataSource.query( ` DELETE FROM job_schedule_state WHERE job_type IN ('deal', 'retrieval', 'data_set_creation', 'piece_cleanup', 'pull_check') AND sp_address <> '' + AND network = $1 RETURNING sp_address `, + [network], )) || [[]]; return rows.map((row: { sp_address: string }) => row.sp_address); } @@ -96,10 +100,11 @@ export class JobScheduleRepository { DELETE FROM job_schedule_state WHERE job_type IN ('deal', 'retrieval', 'data_set_creation', 'piece_cleanup', 'pull_check') AND sp_address <> '' - AND sp_address <> ALL($1::text[]) + AND network = $1 + AND sp_address <> ALL($2::text[]) RETURNING sp_address `, - [activeAddresses], + [network, activeAddresses], )) || [[]]; return rows.map((row: { sp_address: string }) => row.sp_address); } catch (error) { @@ -107,6 +112,7 @@ export class JobScheduleRepository { event: "delete_inactive_provider_schedules_failed", message: "Failed to delete schedules for inactive providers", error: toStructuredError(error), + network, }); throw error; } @@ -115,14 +121,16 @@ export class JobScheduleRepository { /** * Counts manually paused jobs by type. */ - async countPausedSchedules(): Promise<{ job_type: string; count: number }[]> { + async countPausedSchedules(network?: Network): Promise<{ job_type: string; count: number }[]> { return this.dataSource.query( ` SELECT job_type, COUNT(*)::int AS count FROM job_schedule_state WHERE paused = true + AND ($1::text IS NULL OR network = $1) GROUP BY job_type `, + [network ?? null], ); } @@ -136,17 +144,19 @@ export class JobScheduleRepository { async findDueSchedulesWithManager( manager: { query: (sql: string, params?: any[]) => Promise }, now: Date, + network?: Network, ): Promise { return manager.query( ` - SELECT id, job_type, sp_address, interval_seconds, next_run_at + SELECT id, job_type, sp_address, network, interval_seconds, next_run_at FROM job_schedule_state WHERE paused = false AND next_run_at <= $1 + AND ($2::text IS NULL OR network = $2) ORDER BY next_run_at ASC FOR UPDATE SKIP LOCKED `, - [now], + [now, network ?? null], ); } @@ -214,7 +224,10 @@ export class JobScheduleRepository { * Uses `data->>'jobType'` for the shared sp.work queue. * Casts state to text so drivers always return a string (pg-boss uses job_state enum). */ - async countBossJobStates(states: string[]): Promise<{ job_type: string; state: string; count: number }[]> { + async countBossJobStates( + states: string[], + network?: Network, + ): Promise<{ job_type: string; state: string; count: number }[]> { return this.dataSource.query( ` SELECT @@ -229,9 +242,17 @@ export class JobScheduleRepository { COUNT(*)::int AS count FROM pgboss.job WHERE state::text = ANY($1::text[]) + AND ($6::text IS NULL OR data->>'network' = $6) GROUP BY 1, 2 `, - [states, SP_WORK_QUEUE, DATA_RETENTION_POLL_QUEUE, PROVIDERS_REFRESH_QUEUE, PULL_PIECE_CLEANUP_QUEUE], + [ + states, + SP_WORK_QUEUE, + DATA_RETENTION_POLL_QUEUE, + PROVIDERS_REFRESH_QUEUE, + PULL_PIECE_CLEANUP_QUEUE, + network ?? null, + ], ); } @@ -242,6 +263,7 @@ export class JobScheduleRepository { async minBossJobAgeSecondsByState( state: "created" | "active", now: Date, + network?: Network, ): Promise<{ job_type: string; min_age_seconds: number | null }[]> { return this.dataSource.query( ` @@ -265,9 +287,18 @@ export class JobScheduleRepository { ) AS min_age_seconds FROM pgboss.job WHERE state::text = $2 + AND ($7::text IS NULL OR data->>'network' = $7) GROUP BY 1 `, - [now, state, SP_WORK_QUEUE, DATA_RETENTION_POLL_QUEUE, PROVIDERS_REFRESH_QUEUE, PULL_PIECE_CLEANUP_QUEUE], + [ + now, + state, + SP_WORK_QUEUE, + DATA_RETENTION_POLL_QUEUE, + PROVIDERS_REFRESH_QUEUE, + PULL_PIECE_CLEANUP_QUEUE, + network ?? null, + ], ); } } diff --git a/apps/backend/src/piece-cleanup/piece-cleanup.service.ts b/apps/backend/src/piece-cleanup/piece-cleanup.service.ts index 45454347..990793a3 100644 --- a/apps/backend/src/piece-cleanup/piece-cleanup.service.ts +++ b/apps/backend/src/piece-cleanup/piece-cleanup.service.ts @@ -321,10 +321,13 @@ export class PieceCleanupService implements OnModuleInit, OnModuleDestroy { */ async getStoredBytesForProvider(spAddress: string): Promise { const walletAddress = this.blockchainConfig.walletAddress; + const network = this.blockchainConfig.network; + const result = await this.dealRepository .createQueryBuilder("deal") .select("COALESCE(SUM(deal.piece_size), 0)", "totalBytes") .where("deal.sp_address = :spAddress", { spAddress }) + .andWhere("deal.network = :network", { network }) .andWhere("deal.wallet_address = :walletAddress", { walletAddress }) .andWhere("deal.status = :status", { status: DealStatus.DEAL_CREATED }) .andWhere("deal.piece_id IS NOT NULL") @@ -340,10 +343,12 @@ export class PieceCleanupService implements OnModuleInit, OnModuleDestroy { */ async getCleanupCandidates(spAddress: string, limit: number): Promise { const walletAddress = this.blockchainConfig.walletAddress; + const network = this.blockchainConfig.network; return this.dealRepository.find({ where: { spAddress, walletAddress, + network, status: DealStatus.DEAL_CREATED, pieceId: Not(IsNull()), dataSetId: Not(IsNull()), diff --git a/apps/backend/src/retrieval/retrieval.service.ts b/apps/backend/src/retrieval/retrieval.service.ts index c148ccec..972596d7 100644 --- a/apps/backend/src/retrieval/retrieval.service.ts +++ b/apps/backend/src/retrieval/retrieval.service.ts @@ -5,8 +5,8 @@ import { CID } from "multiformats/cid"; import type { Repository } from "typeorm"; import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import { type ProviderJobContext, type RetrievalLogContext, toStructuredError } from "../common/logging.js"; -import type { Hex } from "../common/types.js"; -import type { IConfig } from "../config/app.config.js"; +import type { Hex, Network } from "../common/types.js"; +import type { IBlockchainConfig, IConfig } from "../config/app.config.js"; import { Deal } from "../database/entities/deal.entity.js"; import { Retrieval } from "../database/entities/retrieval.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; @@ -58,7 +58,8 @@ export class RetrievalService { signal?: AbortSignal, logContext?: ProviderJobContext, ): Promise { - const deal = await this.selectRandomSuccessfulDealForProvider(spAddress); + const blockchainCfg = this.configService.get("blockchain"); + const deal = await this.selectRandomSuccessfulDealForProvider(spAddress, blockchainCfg.network); if (!deal) { this.logger.warn({ ...logContext, @@ -445,12 +446,13 @@ export class RetrievalService { * We select a random successful deal (DEAL_CREATED only) for a given provider. * Uses Postgres ORDER BY RANDOM() since Dealbot is Postgres-only. */ - private async selectRandomSuccessfulDealForProvider(spAddress: string): Promise { + private async selectRandomSuccessfulDealForProvider(spAddress: string, network: Network): Promise { const randomDatasetSizes = this.getRandomDatasetSizes(); const query = this.dealRepository .createQueryBuilder("deal") .innerJoin("deal.storageProvider", "sp", "sp.isActive = :isActive", { isActive: true }) .where("deal.sp_address = :spAddress", { spAddress }) + .andWhere("deal.network = :network", { network }) .andWhere("deal.status IN (:...statuses)", { statuses: [DealStatus.DEAL_CREATED], }) diff --git a/apps/backend/src/wallet-sdk/wallet-sdk.service.spec.ts b/apps/backend/src/wallet-sdk/wallet-sdk.service.spec.ts index 765f8f93..ed114f53 100644 --- a/apps/backend/src/wallet-sdk/wallet-sdk.service.spec.ts +++ b/apps/backend/src/wallet-sdk/wallet-sdk.service.spec.ts @@ -44,6 +44,7 @@ describe("config validation", () => { DATABASE_PASSWORD: "test", DATABASE_NAME: "test", WALLET_ADDRESS: "0x1234567890123456789012345678901234567890", + NETWORK: "calibration", }; it("requires WALLET_PRIVATE_KEY when SESSION_KEY_PRIVATE_KEY is absent", () => { From 69bc4655184fa65e804982d7d7ce6f2f5ea9179f Mon Sep 17 00:00:00 2001 From: silent-cipher Date: Fri, 22 May 2026 13:35:23 +0530 Subject: [PATCH 09/12] fix: include network in job singleton keys --- apps/backend/src/jobs/jobs.service.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/apps/backend/src/jobs/jobs.service.ts b/apps/backend/src/jobs/jobs.service.ts index 1915bf78..ffbc6066 100644 --- a/apps/backend/src/jobs/jobs.service.ts +++ b/apps/backend/src/jobs/jobs.service.ts @@ -1292,11 +1292,12 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { if (isSpJobType(jobType)) { const spData = data as SpJobData; if (!finalOptions.singletonKey) { - finalOptions.singletonKey = spData.spAddress; + // Include network in singleton key to prevent cross-network deduplication collisions + finalOptions.singletonKey = `${spData.network}:${spData.spAddress}`; } } else { - // Global jobs: use job type as singleton key. - finalOptions.singletonKey = jobType; + // Global jobs: include network in singleton key for per-network isolation + finalOptions.singletonKey = `${data.network}:${jobType}`; } await this.boss.send(name, data, finalOptions); this.jobsEnqueueAttemptsCounter.inc({ job_type: jobType, outcome: "success" }); From 625abeb7457f0d2f27539a1e3fc276ade9a1b29e Mon Sep 17 00:00:00 2001 From: silent-cipher Date: Fri, 22 May 2026 13:37:52 +0530 Subject: [PATCH 10/12] test: singleton key --- apps/backend/src/jobs/jobs.service.spec.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/apps/backend/src/jobs/jobs.service.spec.ts b/apps/backend/src/jobs/jobs.service.spec.ts index 87b270f4..31011cba 100644 --- a/apps/backend/src/jobs/jobs.service.spec.ts +++ b/apps/backend/src/jobs/jobs.service.spec.ts @@ -742,6 +742,7 @@ describe("JobsService schedule rows", () => { id: 1, job_type: "deal", sp_address: "0xaaa", + network: "calibration", interval_seconds: 1, next_run_at: "2024-01-01T00:00:00Z", }, @@ -752,8 +753,8 @@ describe("JobsService schedule rows", () => { expect(send).toHaveBeenCalledTimes(3); for (const call of send.mock.calls) { expect(call[0]).toBe("sp.work"); - expect(call[1]).toMatchObject({ jobType: "deal", spAddress: "0xaaa" }); - expect(call[2]).toMatchObject({ singletonKey: "0xaaa", retryLimit: 0 }); + expect(call[1]).toMatchObject({ jobType: "deal", spAddress: "0xaaa", network: "calibration" }); + expect(call[2]).toMatchObject({ singletonKey: "calibration:0xaaa", retryLimit: 0 }); expect(call[2]?.startAfter).toBeUndefined(); } @@ -809,6 +810,7 @@ describe("JobsService schedule rows", () => { id: 11, job_type: "providers_refresh", sp_address: "", + network: "calibration", interval_seconds: 14400, next_run_at: "2024-01-01T00:00:00Z", }, @@ -818,7 +820,7 @@ describe("JobsService schedule rows", () => { expect(send).toHaveBeenCalledTimes(1); expect(send.mock.calls[0][2]).toMatchObject({ - singletonKey: "providers_refresh", + singletonKey: "calibration:providers_refresh", retryLimit: 0, }); }); From c1948eab454a8c50107f65d4d5a0b0bfad1ba9e3 Mon Sep 17 00:00:00 2001 From: silent-cipher Date: Sat, 23 May 2026 14:00:28 +0530 Subject: [PATCH 11/12] fix: config access pattern --- apps/backend/src/retrieval/retrieval.service.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/apps/backend/src/retrieval/retrieval.service.ts b/apps/backend/src/retrieval/retrieval.service.ts index b0b1bde3..fbb5879c 100644 --- a/apps/backend/src/retrieval/retrieval.service.ts +++ b/apps/backend/src/retrieval/retrieval.service.ts @@ -503,8 +503,7 @@ export class RetrievalService { * Uses Postgres ORDER BY RANDOM() since Dealbot is Postgres-only. */ private async selectRandomSuccessfulDealForProvider(spAddress: string): Promise { - const network = this.configService.get("blockchain.network", { infer: true }); - const walletAddress = this.configService.get("blockchain.walletAddress", { infer: true }); + const { network, walletAddress } = this.configService.get("blockchain", { infer: true }); const randomDatasetSizes = this.getRandomDatasetSizes(); const query = this.dealRepository From 070e11ade62b7c18ddae599e529c08ad1adb57c4 Mon Sep 17 00:00:00 2001 From: silent-cipher Date: Mon, 25 May 2026 21:31:18 +0530 Subject: [PATCH 12/12] fix(jobs): cast network enum to text in SQL queries --- .../src/jobs/repositories/job-schedule.repository.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/backend/src/jobs/repositories/job-schedule.repository.ts b/apps/backend/src/jobs/repositories/job-schedule.repository.ts index 2ea494b5..7d2f2426 100644 --- a/apps/backend/src/jobs/repositories/job-schedule.repository.ts +++ b/apps/backend/src/jobs/repositories/job-schedule.repository.ts @@ -87,7 +87,7 @@ export class JobScheduleRepository { DELETE FROM job_schedule_state WHERE job_type IN ('deal', 'retrieval', 'data_set_creation', 'piece_cleanup', 'pull_check') AND sp_address <> '' - AND network = $1 + AND network::text = $1 RETURNING sp_address `, [network], @@ -100,7 +100,7 @@ export class JobScheduleRepository { DELETE FROM job_schedule_state WHERE job_type IN ('deal', 'retrieval', 'data_set_creation', 'piece_cleanup', 'pull_check') AND sp_address <> '' - AND network = $1 + AND network::text = $1 AND sp_address <> ALL($2::text[]) RETURNING sp_address `, @@ -127,7 +127,7 @@ export class JobScheduleRepository { SELECT job_type, COUNT(*)::int AS count FROM job_schedule_state WHERE paused = true - AND ($1::text IS NULL OR network = $1) + AND ($1::text IS NULL OR network::text = $1) GROUP BY job_type `, [network ?? null], @@ -152,7 +152,7 @@ export class JobScheduleRepository { FROM job_schedule_state WHERE paused = false AND next_run_at <= $1 - AND ($2::text IS NULL OR network = $2) + AND ($2::text IS NULL OR network::text = $2) ORDER BY next_run_at ASC FOR UPDATE SKIP LOCKED `,