Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
ed2e041
feat: anon piece selection and retrieval
dennis-tra Apr 21, 2026
dd47e6b
feat(subgraph): add @dealbot/subgraph package
dennis-tra Apr 22, 2026
a1c38dd
refactor(subgraph): trim schema to dealbot-queried fields
dennis-tra Apr 22, 2026
09de7a8
refactor(subgraph): prune handlers to surviving schema
dennis-tra Apr 22, 2026
fa30fd4
test(subgraph): trim to surviving handlers, wire CI, update env docs
dennis-tra Apr 22, 2026
256e0a4
refactor(subgraph): rename calibnet to calibration
dennis-tra Apr 22, 2026
300b5c9
refactor(subgraph): consolidate helper methods
dennis-tra Apr 22, 2026
713bd96
rename: pdp-subgraph to just subgraph
dennis-tra Apr 22, 2026
55b9187
refactor(retrieval-anon): random piece selection
dennis-tra Apr 22, 2026
743ec17
fix: failing tests from rebasing
dennis-tra Apr 22, 2026
a673613
fix(ci): pnpm script handling
dennis-tra Apr 23, 2026
198174e
refactor: pull request self review
dennis-tra Apr 23, 2026
b214a03
change: decrease size bucket limits
dennis-tra Apr 23, 2026
ee65c7e
refactor(retrieval-anon): use dedicated anonymous retrieval table
dennis-tra Apr 23, 2026
744b3a4
add(retrieval-anon): raw size column
dennis-tra Apr 23, 2026
3791f4c
add(retrieval-anon): job timeout configuration
dennis-tra Apr 23, 2026
8890c70
fix(retrieval-anon): track partial retrieval data
dennis-tra Apr 23, 2026
06320f7
fix: don't fold connect and transfer timeout signals
dennis-tra Apr 23, 2026
caab4e0
refactor(subgraph): collapse SAMPLE_ANON_PIECE_INDEXED/_ANY into builder
dennis-tra Apr 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ jobs:
- name: Install dependencies
run: pnpm install --frozen-lockfile

- name: Codegen subgraph
run: pnpm --filter @dealbot/subgraph codegen

- name: Run unit tests
run: pnpm test

Expand Down
17 changes: 13 additions & 4 deletions apps/backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ WALLET_ADDRESS=0x0000000000000000000000000000000000000000
WALLET_PRIVATE_KEY=your_private_key_here
CHECK_DATASET_CREATION_FEES=true
USE_ONLY_APPROVED_PROVIDERS=true
PDP_SUBGRAPH_ENDPOINT=https://api.thegraph.com/subgraphs/filecoin/pdp
# Point at the dealbot-owned subgraph on Goldsky (see apps/subgraph/README.md).
SUBGRAPH_ENDPOINT=https://api.goldsky.com/api/public/<project>/subgraphs/dealbot-subgraph/<version>/gn

# Minimum number of datasets per SP (default: 1). When > 1, a separate data_set_creation job provisions extra datasets.
MIN_NUM_DATASETS_FOR_CHECKS=1
Expand Down Expand Up @@ -52,6 +53,9 @@ DEALBOT_MAINTENANCE_WINDOW_MINUTES=20
DEALS_PER_SP_PER_HOUR=2
DATASET_CREATIONS_PER_SP_PER_HOUR=1
RETRIEVALS_PER_SP_PER_HOUR=1
RETRIEVALS_ANON_PER_SP_PER_HOUR=
ANON_RETRIEVAL_BLOCK_SAMPLE_COUNT=5
METRICS_PER_HOUR=2
PG_BOSS_LOCAL_CONCURRENCY=20
JOB_SCHEDULER_POLL_SECONDS=300
JOB_WORKER_POLL_SECONDS=60
Expand All @@ -60,6 +64,7 @@ JOB_SCHEDULE_PHASE_SECONDS=0
JOB_ENQUEUE_JITTER_SECONDS=0
DEAL_JOB_TIMEOUT_SECONDS=360 # 6m: Max runtime for deal jobs (TODO: reduce default to 3m)
RETRIEVAL_JOB_TIMEOUT_SECONDS=60 # 1m: Max runtime for retrieval jobs (TODO: reduce default to 30s)
ANON_RETRIEVAL_JOB_TIMEOUT_SECONDS=360 # 6m: Max runtime for anon retrieval jobs (pieces up to ~70 MiB)
IPFS_BLOCK_FETCH_CONCURRENCY=6 # Parallel block fetches when validating IPFS DAGs
DEALBOT_PGBOSS_POOL_MAX=1
DEALBOT_PGBOSS_SCHEDULER_ENABLED=true
Expand All @@ -73,9 +78,13 @@ PROXY_LIST=http://username:password@host:port,http://username:password@host:port
PROXY_LOCATIONS=l1,l2

# Timeout Configuration (in milliseconds)
CONNECT_TIMEOUT_MS=10000 # 10s: Initial connection timeout
HTTP_REQUEST_TIMEOUT_MS=240000 # 4m: Total transfer timeout for HTTP/1.1 (10MiB @ 170KB/s + overhead)
HTTP2_REQUEST_TIMEOUT_MS=240000 # 4m: Total transfer timeout for HTTP/2 (10MiB @ 170KB/s + overhead)
CONNECT_TIMEOUT_MS=10000 # 10s: Connection + response-headers timeout (scoped to the header phase only)
# HTTP_REQUEST_TIMEOUT_MS and HTTP2_REQUEST_TIMEOUT_MS default to the longest job timeout above
# (max of DEAL_/RETRIEVAL_/ANON_RETRIEVAL_/DATA_SET_CREATION_/MAX_PIECE_CLEANUP_ * 1000 ms) so the
# HTTP-level ceiling never pre-empts a job-scoped AbortSignal. Only override when you have a non-job
# caller of HttpClientService that needs a specific deadline.
# HTTP_REQUEST_TIMEOUT_MS=360000
# HTTP2_REQUEST_TIMEOUT_MS=360000

# SP Blocklists configuration
# BLOCKED_SP_IDS=1234,5678
Expand Down
2 changes: 2 additions & 0 deletions apps/backend/.tool-versions
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
nodejs 25.8.1
pnpm 10.33.0
2 changes: 1 addition & 1 deletion apps/backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ All configuration is done via environment variables in `.env`.
| `CHECK_DATASET_CREATION_FEES` | Check fees before dataset creation | `true` |
| `ENABLE_IPNI_TESTING` | IPNI testing mode (`disabled`/`random`/`always`) | `always` |
| `USE_ONLY_APPROVED_PROVIDERS` | Only use approved storage providers | `true` |
| `PDP_SUBGRAPH_ENDPOINT` | PDP subgraph API endpoint for PDP proof-set/data-retention | `https://api.thegraph.com/subgraphs/filecoin/pdp` |
| `SUBGRAPH_ENDPOINT` | Subgraph GraphQL endpoint for PDP proof-set/data-retention and anon-retrieval queries | `https://api.goldsky.com/api/public/<project>/subgraphs/dealbot-subgraph/<version>/gn` |

### Scheduling Configuration (pg-boss)

Expand Down
2 changes: 2 additions & 0 deletions apps/backend/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { JobsModule } from "./jobs/jobs.module.js";
import { MetricsPrometheusModule } from "./metrics-prometheus/metrics-prometheus.module.js";
import { ProvidersModule } from "./providers/providers.module.js";
import { RetrievalModule } from "./retrieval/retrieval.module.js";
import { RetrievalAnonModule } from "./retrieval-anon/retrieval-anon.module.js";

@Module({
imports: [
Expand All @@ -26,6 +27,7 @@ import { RetrievalModule } from "./retrieval/retrieval.module.js";
JobsModule,
DealModule,
RetrievalModule,
RetrievalAnonModule,
DataSourceModule,
ProvidersModule,
...(process.env.ENABLE_DEV_MODE === "true" ? [DevToolsModule] : []),
Expand Down
86 changes: 75 additions & 11 deletions apps/backend/src/config/app.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export const configValidationSchema = Joi.object({
USE_ONLY_APPROVED_PROVIDERS: Joi.boolean().default(true),
DEALBOT_DATASET_VERSION: Joi.string().optional(),
MIN_NUM_DATASETS_FOR_CHECKS: Joi.number().integer().min(1).default(1),
PDP_SUBGRAPH_ENDPOINT: Joi.string().uri().optional().allow(""),
SUBGRAPH_ENDPOINT: Joi.string().uri().optional().allow(""),

// Scheduling
PROVIDERS_REFRESH_INTERVAL_SECONDS: Joi.number().default(4 * 3600),
Expand All @@ -80,6 +80,7 @@ export const configValidationSchema = Joi.object({
DEALS_PER_SP_PER_HOUR: Joi.number().min(0.001).max(20).default(4),
DATASET_CREATIONS_PER_SP_PER_HOUR: Joi.number().min(0.001).max(20).default(1),
RETRIEVALS_PER_SP_PER_HOUR: Joi.number().min(0.001).max(20).default(2),
RETRIEVALS_ANON_PER_SP_PER_HOUR: Joi.number().min(0.001).max(20).optional(),
// Polling interval for pg-boss scheduler (lower = more responsive, higher = less DB chatter).
JOB_SCHEDULER_POLL_SECONDS: Joi.number().min(60).default(300),
JOB_WORKER_POLL_SECONDS: Joi.number().min(5).default(60),
Expand All @@ -91,8 +92,10 @@ export const configValidationSchema = Joi.object({
JOB_ENQUEUE_JITTER_SECONDS: Joi.number().min(0).default(0),
DEAL_JOB_TIMEOUT_SECONDS: Joi.number().min(120).default(360), // 6 minutes max runtime for data storage jobs (TODO: reduce default to 3 minutes)
RETRIEVAL_JOB_TIMEOUT_SECONDS: Joi.number().min(60).default(60), // 1 minute max runtime for retrieval jobs (TODO: reduce default to 30 seconds)
ANON_RETRIEVAL_JOB_TIMEOUT_SECONDS: Joi.number().min(60).default(360), // 6 minutes max runtime for anon retrieval jobs (pieces can be up to ~70 MiB)
DATA_SET_CREATION_JOB_TIMEOUT_SECONDS: Joi.number().min(60).default(300), // 5 minutes max runtime for dataset creation jobs
IPFS_BLOCK_FETCH_CONCURRENCY: Joi.number().integer().min(1).max(32).default(6),
ANON_RETRIEVAL_BLOCK_SAMPLE_COUNT: Joi.number().integer().min(1).max(50).default(5),

// Piece Cleanup
MAX_DATASET_STORAGE_SIZE_BYTES: Joi.number()
Expand Down Expand Up @@ -124,8 +127,9 @@ export const configValidationSchema = Joi.object({

// Timeouts (in milliseconds)
CONNECT_TIMEOUT_MS: Joi.number().min(1000).default(10000), // 10 seconds to establish connection/receive headers
HTTP_REQUEST_TIMEOUT_MS: Joi.number().min(1000).default(240000), // 4 minutes total for HTTP requests (10MiB @ 170KB/s + overhead)
HTTP2_REQUEST_TIMEOUT_MS: Joi.number().min(1000).default(240000), // 4 minutes total for HTTP/2 requests (10MiB @ 170KB/s + overhead)
// Defaults intentionally omitted so loadConfig can derive them from the longest job timeout.
HTTP_REQUEST_TIMEOUT_MS: Joi.number().min(1000).optional(),
HTTP2_REQUEST_TIMEOUT_MS: Joi.number().min(1000).optional(),
IPNI_VERIFICATION_TIMEOUT_MS: Joi.number().min(1000).default(60000), // 60 seconds max time to wait for IPNI verification
IPNI_VERIFICATION_POLLING_MS: Joi.number().min(250).default(2000), // 2 seconds between IPNI verification polls

Expand Down Expand Up @@ -165,7 +169,7 @@ export interface IBlockchainConfig {
useOnlyApprovedProviders: boolean;
dealbotDataSetVersion?: string;
minNumDataSetsForChecks: number;
pdpSubgraphEndpoint?: string;
subgraphEndpoint?: string;
}

export interface ISchedulingConfig {
Expand Down Expand Up @@ -256,6 +260,14 @@ export interface IJobsConfig {
* Uses AbortController to actively cancel job execution.
*/
retrievalJobTimeoutSeconds: number;
/**
* Maximum runtime (seconds) for anonymous retrieval jobs before forced abort.
*
* Anonymous retrievals fetch arbitrary pieces (up to ~70 MiB), so this is
* typically larger than `retrievalJobTimeoutSeconds`. Uses AbortController
* to actively cancel job execution while still persisting partial metrics.
*/
anonRetrievalJobTimeoutSeconds: number;
/**
* Target number of piece cleanup runs per storage provider per hour.
*
Expand All @@ -270,6 +282,12 @@ export interface IJobsConfig {
* Only used when `DEALBOT_JOBS_MODE=pgboss`.
*/
maxPieceCleanupRuntimeSeconds: number;

/**
* Target number of anonymous retrieval tests per storage provider per hour.
* Defaults to retrievalsPerSpPerHour when not set.
*/
retrievalsAnonPerSpPerHour: number;
}

export interface IDatasetConfig {
Expand All @@ -287,6 +305,10 @@ export interface ITimeoutConfig {

export interface IRetrievalConfig {
ipfsBlockFetchConcurrency: number;
/**
* Number of CAR blocks to sample for IPNI + block-fetch validation.
*/
anonBlockSampleCount: number;
}

export interface IPieceCleanupConfig {
Expand Down Expand Up @@ -315,6 +337,43 @@ export interface IConfig {
}

export function loadConfig(): IConfig {
const jobTimeoutSeconds = {
deal: Number.parseInt(process.env.DEAL_JOB_TIMEOUT_SECONDS || "360", 10),
retrieval: Number.parseInt(process.env.RETRIEVAL_JOB_TIMEOUT_SECONDS || "60", 10),
anonRetrieval: Number.parseInt(process.env.ANON_RETRIEVAL_JOB_TIMEOUT_SECONDS || "360", 10),
dataSetCreation: Number.parseInt(process.env.DATA_SET_CREATION_JOB_TIMEOUT_SECONDS || "300", 10),
pieceCleanup: Number.parseInt(process.env.MAX_PIECE_CLEANUP_RUNTIME_SECONDS || "300", 10),
};

// HTTP-level request timeouts default to the longest job timeout so the
// per-request ceiling never caps below the per-job budget. Any job-scoped
// AbortSignal fires first and is authoritative; the HTTP timer only kicks
// in for callers that do not pass a parent signal.
const longestJobTimeoutMs = Math.max(...Object.values(jobTimeoutSeconds)) * 1000;

const httpRequestTimeoutMs = Number.parseInt(process.env.HTTP_REQUEST_TIMEOUT_MS || String(longestJobTimeoutMs), 10);
const http2RequestTimeoutMs = Number.parseInt(
process.env.HTTP2_REQUEST_TIMEOUT_MS || String(longestJobTimeoutMs),
10,
);

// Misconfiguration guard: if someone explicitly sets an HTTP timeout below
// the longest job timeout, the HTTP-level timer will abort in-flight work
// before the job signal has a chance to report it. Warn loudly so this is
// caught at boot rather than inferred from short-timeout incidents later.
for (const [name, value] of [
["HTTP_REQUEST_TIMEOUT_MS", httpRequestTimeoutMs],
["HTTP2_REQUEST_TIMEOUT_MS", http2RequestTimeoutMs],
] as const) {
if (value < longestJobTimeoutMs) {
// eslint-disable-next-line no-console
console.warn(
`[config] ${name}=${value}ms is lower than the longest job timeout (${longestJobTimeoutMs}ms). ` +
`HTTP requests may abort before the job signal fires, producing short, unexplained timeouts.`,
);
}
}

return {
app: {
env: process.env.NODE_ENV || "development",
Expand Down Expand Up @@ -356,7 +415,7 @@ export function loadConfig(): IConfig {
useOnlyApprovedProviders: process.env.USE_ONLY_APPROVED_PROVIDERS !== "false",
dealbotDataSetVersion: process.env.DEALBOT_DATASET_VERSION,
minNumDataSetsForChecks: Number.parseInt(process.env.MIN_NUM_DATASETS_FOR_CHECKS || "1", 10),
pdpSubgraphEndpoint: process.env.PDP_SUBGRAPH_ENDPOINT || "",
subgraphEndpoint: process.env.SUBGRAPH_ENDPOINT || "",
},
scheduling: {
providersRefreshIntervalSeconds: Number.parseInt(process.env.PROVIDERS_REFRESH_INTERVAL_SECONDS || "14400", 10),
Expand All @@ -379,11 +438,15 @@ export function loadConfig(): IConfig {
catchupMaxEnqueue: Number.parseInt(process.env.JOB_CATCHUP_MAX_ENQUEUE || "10", 10),
schedulePhaseSeconds: Number.parseInt(process.env.JOB_SCHEDULE_PHASE_SECONDS || "0", 10),
enqueueJitterSeconds: Number.parseInt(process.env.JOB_ENQUEUE_JITTER_SECONDS || "0", 10),
dealJobTimeoutSeconds: Number.parseInt(process.env.DEAL_JOB_TIMEOUT_SECONDS || "360", 10),
retrievalJobTimeoutSeconds: Number.parseInt(process.env.RETRIEVAL_JOB_TIMEOUT_SECONDS || "60", 10),
dataSetCreationJobTimeoutSeconds: Number.parseInt(process.env.DATA_SET_CREATION_JOB_TIMEOUT_SECONDS || "300", 10),
dealJobTimeoutSeconds: jobTimeoutSeconds.deal,
retrievalJobTimeoutSeconds: jobTimeoutSeconds.retrieval,
anonRetrievalJobTimeoutSeconds: jobTimeoutSeconds.anonRetrieval,
retrievalsAnonPerSpPerHour: Number.parseFloat(
process.env.RETRIEVALS_ANON_PER_SP_PER_HOUR || process.env.RETRIEVALS_PER_SP_PER_HOUR || "2",
),
dataSetCreationJobTimeoutSeconds: jobTimeoutSeconds.dataSetCreation,
pieceCleanupPerSpPerHour: Number.parseFloat(process.env.JOB_PIECE_CLEANUP_PER_SP_PER_HOUR || String(1 / 24)),
maxPieceCleanupRuntimeSeconds: Number.parseInt(process.env.MAX_PIECE_CLEANUP_RUNTIME_SECONDS || "300", 10),
maxPieceCleanupRuntimeSeconds: jobTimeoutSeconds.pieceCleanup,
},
dataset: {
localDatasetsPath: process.env.DEALBOT_LOCAL_DATASETS_PATH || DEFAULT_LOCAL_DATASETS_PATH,
Expand All @@ -405,13 +468,14 @@ export function loadConfig(): IConfig {
},
timeouts: {
connectTimeoutMs: Number.parseInt(process.env.CONNECT_TIMEOUT_MS || "10000", 10),
httpRequestTimeoutMs: Number.parseInt(process.env.HTTP_REQUEST_TIMEOUT_MS || "240000", 10),
http2RequestTimeoutMs: Number.parseInt(process.env.HTTP2_REQUEST_TIMEOUT_MS || "240000", 10),
httpRequestTimeoutMs,
http2RequestTimeoutMs,
ipniVerificationTimeoutMs: Number.parseInt(process.env.IPNI_VERIFICATION_TIMEOUT_MS || "60000", 10),
ipniVerificationPollingMs: Number.parseInt(process.env.IPNI_VERIFICATION_POLLING_MS || "2000", 10),
},
retrieval: {
ipfsBlockFetchConcurrency: Number.parseInt(process.env.IPFS_BLOCK_FETCH_CONCURRENCY || "6", 10),
anonBlockSampleCount: Number.parseInt(process.env.ANON_RETRIEVAL_BLOCK_SAMPLE_COUNT || "5", 10),
},
pieceCleanup: {
maxDatasetStorageSizeBytes: Number.parseInt(
Expand Down
4 changes: 2 additions & 2 deletions apps/backend/src/data-retention/data-retention.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import { Module } from "@nestjs/common";
import { TypeOrmModule } from "@nestjs/typeorm";
import { DataRetentionBaseline } from "../database/entities/data-retention-baseline.entity.js";
import { StorageProvider } from "../database/entities/storage-provider.entity.js";
import { PdpSubgraphModule } from "../pdp-subgraph/pdp-subgraph.module.js";
import { SubgraphModule } from "../subgraph/subgraph.module.js";
import { WalletSdkModule } from "../wallet-sdk/wallet-sdk.module.js";
import { DataRetentionService } from "./data-retention.service.js";

@Module({
imports: [WalletSdkModule, PdpSubgraphModule, TypeOrmModule.forFeature([DataRetentionBaseline, StorageProvider])],
imports: [WalletSdkModule, SubgraphModule, TypeOrmModule.forFeature([DataRetentionBaseline, StorageProvider])],
providers: [DataRetentionService],
exports: [DataRetentionService],
})
Expand Down
Loading