From 62d337428e74cae9caeaafd5338a933acb7dac54 Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Tue, 17 Mar 2026 20:24:46 -0500 Subject: [PATCH 01/11] feat(dips): dedicated fast loop with offer-existence gate DIPs acceptance rode the 120s reconcile cycle with no slack for the 300s RCA deadline. A 5s loop now polls and accepts, gated on the offer existing on-chain (missing offer waits, doesn't fail). Bundles the signedRca unpack and pre-accept rule creation the loop needs. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/indexer-agent/src/agent.ts | 9 +- .../src/indexer-management/allocations.ts | 1 + .../__tests__/accept-proposals.test.ts | 189 +++++++++++- .../__tests__/offer-monitor.test.ts | 64 ++++ .../__tests__/pending-rca-consumer.test.ts | 2 +- .../indexer-common/src/indexing-fees/dips.ts | 279 +++++++++++++----- .../indexer-common/src/indexing-fees/index.ts | 1 + .../src/indexing-fees/offer-monitor.ts | 61 ++++ .../src/network-specification.ts | 4 + 9 files changed, 537 insertions(+), 73 deletions(-) create mode 100644 packages/indexer-common/src/indexing-fees/__tests__/offer-monitor.test.ts create mode 100644 packages/indexer-common/src/indexing-fees/offer-monitor.ts diff --git a/packages/indexer-agent/src/agent.ts b/packages/indexer-agent/src/agent.ts index 093144cb8..36100ab7c 100644 --- a/packages/indexer-agent/src/agent.ts +++ b/packages/indexer-agent/src/agent.ts @@ -713,10 +713,11 @@ export class Agent { activeAllocations, async ({ network, operator }, activeAllocations: Allocation[]) => { if (network.specification.indexerOptions.enableDips) { - await operator.dipsManager!.acceptPendingProposals( - activeAllocations, - ) - await operator.dipsManager!.collectAgreementPayments() + if (!operator.dipsManager) { + throw new Error('DipsManager is not available') + } + + await operator.dipsManager.collectAgreementPayments() } }, ) diff --git a/packages/indexer-common/src/indexer-management/allocations.ts b/packages/indexer-common/src/indexer-management/allocations.ts index 5449a8e2f..c650d657d 100644 --- a/packages/indexer-common/src/indexer-management/allocations.ts +++ b/packages/indexer-common/src/indexer-management/allocations.ts @@ -175,6 +175,7 @@ export class AllocationManager { this, this.pendingRcaModel, ) + this.dipsManager.startProposalAcceptanceLoop() } } diff --git a/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts b/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts index bcd33115d..23cf4bf4a 100644 --- a/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts +++ b/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts @@ -4,9 +4,12 @@ import { PendingRcaConsumer } from '../pending-rca-consumer' import { DecodedRcaProposal } from '../types' import { Allocation, + AllocationManager, AllocationStatus, IndexerManagementModels, + IndexingDecisionBasis, Network, + SubgraphIdentifierType, } from '@graphprotocol/indexer-common' let logger: Logger @@ -108,10 +111,17 @@ function createMockModels() { findOne: jest.fn().mockResolvedValue(null), findAll: jest.fn().mockResolvedValue([]), destroy: jest.fn().mockResolvedValue(1), + upsert: jest.fn().mockResolvedValue([{ id: 1 }, true]), }, } as unknown as IndexerManagementModels } +function createMockParent() { + return { + matchingRuleExists: jest.fn().mockResolvedValue(false), + } as unknown as AllocationManager +} + function createMockNetwork() { return { contracts: { @@ -168,11 +178,18 @@ function createDipsManager( network: Network, models: IndexerManagementModels, consumer: PendingRcaConsumer, + parent: AllocationManager = createMockParent(), + offerMonitor?: { offerExists: jest.Mock }, ): DipsManager { + const graphNode = { ensure: jest.fn().mockResolvedValue(undefined) } // eslint-disable-next-line @typescript-eslint/no-explicit-any - const dm = new DipsManager(logger, models, network, {} as any, null, {} as any) - // eslint-disable-next-line @typescript-eslint/no-explicit-any + const dm = new DipsManager(logger, models, network, graphNode as any, parent, {} as any) +// eslint-disable-next-line @typescript-eslint/no-explicit-any ;(dm as any).pendingRcaConsumer = consumer + if (offerMonitor !== undefined) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ;(dm as any).offerMonitor = offerMonitor + } return dm } @@ -583,4 +600,172 @@ describe('DipsManager.acceptPendingProposals', () => { expect(consumer.markAccepted).toHaveBeenCalledWith(proposal.id) }) }) + + describe('rule creation ordering (race condition fix)', () => { + test('upserts the DIPS indexing rule before broadcasting acceptIndexingAgreement', async () => { + const proposal = createMockProposal() + const allocation = createMockAllocation() + const consumer = createMockConsumer([proposal]) + const models = createMockModels() + const network = createMockNetwork() + ;(network.transactionManager.executeTransaction as jest.Mock).mockResolvedValue({ + hash: '0xtx', + status: 1, + }) + + const dm = createDipsManager(network, models, consumer) + + await dm.acceptPendingProposals([allocation]) + + const upsertOrder = (models.IndexingRule.upsert as jest.Mock).mock + .invocationCallOrder[0] + const executeOrder = (network.transactionManager.executeTransaction as jest.Mock) + .mock.invocationCallOrder[0] + + expect(upsertOrder).toBeDefined() + expect(executeOrder).toBeDefined() + expect(upsertOrder).toBeLessThan(executeOrder) + }) + + test('skips rule upsert and rejects proposal when deployment is blocklisted', async () => { + const proposal = createMockProposal() + const allocation = createMockAllocation() + const consumer = createMockConsumer([proposal]) + ;(consumer.getPendingProposalsForDeployment as jest.Mock).mockResolvedValue([]) + const models = createMockModels() + ;(models.IndexingRule.findAll as jest.Mock).mockResolvedValue([ + { + identifier: proposal.subgraphDeploymentId.ipfsHash, + identifierType: SubgraphIdentifierType.DEPLOYMENT, + decisionBasis: IndexingDecisionBasis.NEVER, + }, + ]) + const network = createMockNetwork() + + const dm = createDipsManager(network, models, consumer) + + await dm.acceptPendingProposals([allocation]) + + expect(consumer.markRejected).toHaveBeenCalledWith( + proposal.id, + 'deployment blocklisted', + ) + expect(models.IndexingRule.upsert).not.toHaveBeenCalled() + expect(network.transactionManager.executeTransaction).not.toHaveBeenCalled() + }) + + test('skips rule upsert when parent reports a matching rule already exists', async () => { + const proposal = createMockProposal() + const allocation = createMockAllocation() + const consumer = createMockConsumer([proposal]) + const models = createMockModels() + const network = createMockNetwork() + ;(network.transactionManager.executeTransaction as jest.Mock).mockResolvedValue({ + hash: '0xtx', + status: 1, + }) + + const parent = { + matchingRuleExists: jest.fn().mockResolvedValue(true), + } as unknown as AllocationManager + + const dm = createDipsManager(network, models, consumer, parent) + + await dm.acceptPendingProposals([allocation]) + + expect(models.IndexingRule.upsert).not.toHaveBeenCalled() + expect(network.transactionManager.executeTransaction).toHaveBeenCalled() + expect(consumer.markAccepted).toHaveBeenCalledWith(proposal.id) + }) + }) + + describe('offer-existence gate', () => { + test('stays pending when offer absent and deadline > now + safety margin', async () => { + const proposal = createMockProposal({ + // 5 minutes from now — well beyond the 30s safety margin + deadline: BigInt(Math.floor(Date.now() / 1000) + 300), + }) + const allocation = createMockAllocation() + const consumer = createMockConsumer([proposal]) + const models = createMockModels() + const network = createMockNetwork() + const offerMonitor = { offerExists: jest.fn().mockResolvedValue(false) } + + const dm = createDipsManager(network, models, consumer, undefined, offerMonitor) + + await dm.acceptPendingProposals([allocation]) + + expect(offerMonitor.offerExists).toHaveBeenCalledWith(proposal.id) + expect(network.transactionManager.executeTransaction).not.toHaveBeenCalled() + expect(consumer.markRejected).not.toHaveBeenCalled() + expect(consumer.markAccepted).not.toHaveBeenCalled() + }) + + test('marks rejected when offer absent and deadline within safety margin', async () => { + const proposal = createMockProposal({ + // 10 seconds from now — inside the 30s safety margin + deadline: BigInt(Math.floor(Date.now() / 1000) + 10), + }) + const allocation = createMockAllocation() + const consumer = createMockConsumer([proposal]) + const models = createMockModels() + const network = createMockNetwork() + const offerMonitor = { offerExists: jest.fn().mockResolvedValue(false) } + + const dm = createDipsManager(network, models, consumer, undefined, offerMonitor) + + await dm.acceptPendingProposals([allocation]) + + expect(offerMonitor.offerExists).toHaveBeenCalledWith(proposal.id) + expect(consumer.markRejected).toHaveBeenCalledWith( + proposal.id, + 'offer_never_landed', + ) + expect(network.transactionManager.executeTransaction).not.toHaveBeenCalled() + }) + + test('proceeds to acceptIndexingAgreement when offer is present', async () => { + const proposal = createMockProposal() + const allocation = createMockAllocation() + const consumer = createMockConsumer([proposal]) + const models = createMockModels() + const network = createMockNetwork() + ;(network.transactionManager.executeTransaction as jest.Mock).mockResolvedValue({ + hash: '0xtxhash', + status: 1, + }) + const offerMonitor = { offerExists: jest.fn().mockResolvedValue(true) } + + const dm = createDipsManager(network, models, consumer, undefined, offerMonitor) + + await dm.acceptPendingProposals([allocation]) + + expect(offerMonitor.offerExists).toHaveBeenCalledWith(proposal.id) + expect(network.transactionManager.executeTransaction).toHaveBeenCalled() + expect(consumer.markAccepted).toHaveBeenCalledWith(proposal.id) + }) + + test('bypasses gate when indexingPaymentsSubgraph is not configured', async () => { + const proposal = createMockProposal() + const allocation = createMockAllocation() + const consumer = createMockConsumer([proposal]) + const models = createMockModels() + const network = createMockNetwork() + ;(network.transactionManager.executeTransaction as jest.Mock).mockResolvedValue({ + hash: '0xtxhash', + status: 1, + }) + + // No offerMonitor passed — DipsManager constructor sets it to null + // because createMockNetwork() does not define indexingPaymentsSubgraph. + const dm = createDipsManager(network, models, consumer) + // eslint-disable-next-line @typescript-eslint/no-explicit-any + expect((dm as any).offerMonitor).toBeNull() + + await dm.acceptPendingProposals([allocation]) + + expect(network.transactionManager.executeTransaction).toHaveBeenCalled() + expect(consumer.markAccepted).toHaveBeenCalledWith(proposal.id) + }) + }) }) diff --git a/packages/indexer-common/src/indexing-fees/__tests__/offer-monitor.test.ts b/packages/indexer-common/src/indexing-fees/__tests__/offer-monitor.test.ts new file mode 100644 index 000000000..7fe9c723c --- /dev/null +++ b/packages/indexer-common/src/indexing-fees/__tests__/offer-monitor.test.ts @@ -0,0 +1,64 @@ +import { createLogger } from '@graphprotocol/common-ts' +import { OfferMonitor } from '../offer-monitor' + +const logger = createLogger({ + name: 'OfferMonitor.test', + async: false, + level: 'error', +}) + +describe('OfferMonitor', () => { + it('converts UUID-format agreement ids to bytes16 hex before querying', async () => { + const query = jest.fn().mockResolvedValue({ data: { offer: { id: '0xabc' } } }) + const subgraph = { query } as never + const monitor = new OfferMonitor(logger, subgraph) + + const exists = await monitor.offerExists( + 'bea99452-e465-e9d9-8a79-2356edcc7e92', + ) + + expect(exists).toBe(true) + expect(query).toHaveBeenCalledTimes(1) + expect(query.mock.calls[0][1]).toEqual({ + id: '0xbea99452e465e9d98a792356edcc7e92', + }) + }) + + it('passes through already-hex ids unchanged (lowercased)', async () => { + const query = jest.fn().mockResolvedValue({ data: { offer: { id: '0xabc' } } }) + const subgraph = { query } as never + const monitor = new OfferMonitor(logger, subgraph) + + await monitor.offerExists('0xBEA99452E465E9D98A792356EDCC7E92') + + expect(query.mock.calls[0][1]).toEqual({ + id: '0xbea99452e465e9d98a792356edcc7e92', + }) + }) + + it('returns false when the subgraph reports the offer is missing', async () => { + const query = jest.fn().mockResolvedValue({ data: { offer: null } }) + const subgraph = { query } as never + const monitor = new OfferMonitor(logger, subgraph) + + const exists = await monitor.offerExists( + 'bea99452-e465-e9d9-8a79-2356edcc7e92', + ) + + expect(exists).toBe(false) + }) + + it('treats subgraph errors as transient (not yet on-chain)', async () => { + const query = jest + .fn() + .mockResolvedValue({ error: new Error('subgraph hiccup') }) + const subgraph = { query } as never + const monitor = new OfferMonitor(logger, subgraph) + + const exists = await monitor.offerExists( + 'bea99452-e465-e9d9-8a79-2356edcc7e92', + ) + + expect(exists).toBe(false) + }) +}) diff --git a/packages/indexer-common/src/indexing-fees/__tests__/pending-rca-consumer.test.ts b/packages/indexer-common/src/indexing-fees/__tests__/pending-rca-consumer.test.ts index c2bd732bc..980be336b 100644 --- a/packages/indexer-common/src/indexing-fees/__tests__/pending-rca-consumer.test.ts +++ b/packages/indexer-common/src/indexing-fees/__tests__/pending-rca-consumer.test.ts @@ -42,7 +42,7 @@ function encodeTestPayload(overrides?: { [ { subgraphDeploymentId: TEST_DEPLOYMENT_BYTES32, - version: 1n, + version: 0n, terms: termsEncoded, }, ], diff --git a/packages/indexer-common/src/indexing-fees/dips.ts b/packages/indexer-common/src/indexing-fees/dips.ts index a5470b43b..f26f36d8e 100644 --- a/packages/indexer-common/src/indexing-fees/dips.ts +++ b/packages/indexer-common/src/indexing-fees/dips.ts @@ -7,6 +7,8 @@ import { import { Allocation, AllocationManager, + AllocationStatus, + DipsReceiptStatus, GraphNode, IndexerManagementModels, IndexingDecisionBasis, @@ -17,6 +19,7 @@ import { } from '@graphprotocol/indexer-common' import { PendingRcaProposal } from '../indexer-management/models/pending-rca-proposal' +import { OfferMonitor } from './offer-monitor' import { PendingRcaConsumer } from './pending-rca-consumer' import { DecodedRcaProposal } from './types' import { tryParseCustomError } from '../utils' @@ -29,12 +32,18 @@ import { } from './agreement-monitor' import { CollectionTracker } from './collection-tracker' +const DIPS_ACCEPTANCE_INTERVAL = 5_000 // POIs are computed against a recent-but-not-tip block to avoid reorg edge cases. const RECENT_BLOCK_OFFSET = 10 +// When the offer hasn't landed on-chain yet, keep retrying until the RCA +// deadline is within this window. Inside the window, give up cleanly so +// reassessment can pick a replacement before the deadline lapses. +const OFFER_GATE_DEADLINE_SAFETY_MARGIN_SECONDS = 30n export class DipsManager { declare pendingRcaConsumer: PendingRcaConsumer declare collectionTracker: CollectionTracker + declare offerMonitor: OfferMonitor | null constructor( private logger: Logger, private models: IndexerManagementModels, @@ -44,6 +53,13 @@ export class DipsManager { pendingRcaModel: typeof PendingRcaProposal, ) { this.pendingRcaConsumer = new PendingRcaConsumer(this.logger, pendingRcaModel) + + // Null when no indexing-payments-subgraph is configured; processProposal + // skips the offer-existence gate in that case. + this.offerMonitor = this.network.indexingPaymentsSubgraph + ? new OfferMonitor(this.logger, this.network.indexingPaymentsSubgraph) + : null + this.collectionTracker = new CollectionTracker( this.network.specification.indexerOptions.dipsCollectionTarget, ) @@ -276,6 +292,75 @@ export class DipsManager { return } + // Create the dips rule eagerly here rather than leaving it to the reconcile + // loop: the accept tx can confirm and clear the pending row before the next + // reconcile tick, which would leave the rule uncreated and graph-node never + // told to deploy the subgraph. + const allDeploymentRules = await this.models.IndexingRule.findAll({ + where: { identifierType: SubgraphIdentifierType.DEPLOYMENT }, + }) + const blocklisted = allDeploymentRules.find((r) => + this.isOnChainOptOutRule(r, proposal.subgraphDeploymentId), + ) + if (blocklisted) { + this.logger.info( + `Blocklisted deployment ${proposal.subgraphDeploymentId.toString()}, rejecting proposal ${ + proposal.id + }`, + ) + await consumer.markRejected(proposal.id, 'deployment blocklisted') + return + } + await this.upsertDipsRuleFor(proposal.subgraphDeploymentId, { + allocationLifetime: Math.max( + Number(proposal.minSecondsPerCollection), + Number(proposal.maxSecondsPerCollection), + ), + }) + + // Gate accept on the on-chain offer existing. If dipper's offer() tx was + // evicted (nonce collision, gas spike), rcaOffers is empty and + // acceptIndexingAgreement reverts with RecurringCollectorInvalidSigner — + // a transient state, retry next tick. Inside the safety margin, give up + // so reassessment can pick a replacement before the deadline lapses. + if (this.offerMonitor) { + const offerOnChain = await this.offerMonitor.offerExists(proposal.id) + if (!offerOnChain) { + if (proposal.deadline > now + OFFER_GATE_DEADLINE_SAFETY_MARGIN_SECONDS) { + this.logger.debug( + 'Offer not yet on-chain, waiting for next acceptance-loop tick', + { + proposalId: proposal.id, + deadline: proposal.deadline.toString(), + now: now.toString(), + }, + ) + return + } + this.logger.warn( + 'Offer never landed on-chain within the RCA deadline, rejecting proposal', + { + proposalId: proposal.id, + deadline: proposal.deadline.toString(), + now: now.toString(), + }, + ) + await consumer.markRejected(proposal.id, 'offer_never_landed') + await this.cleanupDipsRule(consumer, proposal) + return + } + } + + // Deploy the subgraph to graph-node before the accept multicall creates the + // allocation on-chain. The main reconcile loop reads `graph_node.indexingStatus` + // for the deployment; if graph-node has never been told to deploy it, + // indexingStatus is undefined and `failsHealthCheck` triggers a spurious + // unallocate of the allocation we just created. `ensure` is idempotent. + await this.graphNode.ensure( + `indexer-agent/${proposal.subgraphDeploymentId.ipfsHash.slice(-10)}`, + proposal.subgraphDeploymentId, + ) + const allocation = activeAllocations.find( (a) => a.subgraphDeployment.id.bytes32 === proposal.subgraphDeploymentId.bytes32, ) @@ -571,82 +656,89 @@ export class DipsManager { async collectAgreementPayments(): Promise { const logger = this.logger.child({ function: 'collectAgreementPayments' }) - const indexerAddress = this.network.specification.indexerOptions.address + try { + const indexerAddress = this.network.specification.indexerOptions.address - if (!this.network.indexingPaymentsSubgraph) { - logger.warn( - 'Indexing payments subgraph not configured, skipping agreement collection', + if (!this.network.indexingPaymentsSubgraph) { + logger.warn( + 'Indexing payments subgraph not configured, skipping agreement collection', + ) + return + } + const agreements = await fetchCollectableAgreements( + this.network.indexingPaymentsSubgraph, + indexerAddress, ) - return - } - const agreements = await fetchCollectableAgreements( - this.network.indexingPaymentsSubgraph, - indexerAddress, - ) - - if (agreements.length === 0) { - logger.debug('No collectable agreements found') - return - } - // Cancel any agreements whose deployments are blocklisted - await this.cancelBlocklistedAgreements(agreements) + if (agreements.length === 0) { + logger.debug('No collectable agreements found') + return + } - // Use chain timestamp for consistency with contract timing and subgraph data - const blockNumber = await this.network.networkProvider.getBlockNumber() - const block = await this.network.networkProvider.getBlock(blockNumber) - const nowSeconds = block ? Number(block.timestamp) : Math.floor(Date.now() / 1000) + // Cancel any agreements whose deployments are blocklisted + await this.cancelBlocklistedAgreements(agreements) - // Sync tracker state from subgraph data - for (const agreement of agreements) { - this.collectionTracker.track(agreement.id, { - lastCollectedAt: Number(agreement.lastCollectionAt), - minSecondsPerCollection: agreement.minSecondsPerCollection, - maxSecondsPerCollection: agreement.maxSecondsPerCollection, - }) - } - - const readyIds = this.collectionTracker.getReadyAgreements(nowSeconds) - if (readyIds.length === 0) { - logger.debug('No agreements ready for collection', { - total: agreements.length, - }) - return - } + // Use chain timestamp for consistency with contract timing and subgraph data + const blockNumber = await this.network.networkProvider.getBlockNumber() + const block = await this.network.networkProvider.getBlock(blockNumber) + const nowSeconds = block ? Number(block.timestamp) : Math.floor(Date.now() / 1000) + + // Sync tracker state from subgraph data + for (const agreement of agreements) { + this.collectionTracker.track(agreement.id, { + lastCollectedAt: Number(agreement.lastCollectionAt), + minSecondsPerCollection: agreement.minSecondsPerCollection, + maxSecondsPerCollection: agreement.maxSecondsPerCollection, + }) + } - logger.info( - `${readyIds.length} of ${agreements.length} agreement(s) ready for collection`, - ) + const readyIds = this.collectionTracker.getReadyAgreements(nowSeconds) + if (readyIds.length === 0) { + logger.debug('No agreements ready for collection', { + total: agreements.length, + }) + return + } - const readyAgreements = agreements.filter((a) => readyIds.includes(a.id)) + logger.info( + `${readyIds.length} of ${agreements.length} agreement(s) ready for collection`, + ) - for (const agreement of readyAgreements) { - try { - const result = await this.tryCollectAgreement(agreement, blockNumber, logger) - if (result === 'collected') { - this.collectionTracker.updateAfterCollection(agreement.id, nowSeconds) - this.cleanupFinishedAgreement(agreement, nowSeconds, logger) + const readyAgreements = agreements.filter((a) => readyIds.includes(a.id)) + + for (const agreement of readyAgreements) { + try { + const result = await this.tryCollectAgreement(agreement, blockNumber, logger) + if (result === 'collected') { + this.collectionTracker.updateAfterCollection(agreement.id, nowSeconds) + this.cleanupFinishedAgreement(agreement, nowSeconds, logger) + } + // 'paused' / 'unauthorized' are pre-flight checks; no on-chain attempt was + // made, so don't bump the tracker. Next tick will retry immediately. + } catch (err) { + const isDeterministic = this.isDeterministicError(err) + const errorDetail = isDeterministic + ? tryParseCustomError(err) + : err instanceof Error + ? err.message + : String(err) + // Throttle the retry so we don't hammer the chain on every poll cycle. + // Deterministic errors during collection are typically recoverable + // (subgraph sync, allocation reconcile, provision changes), so we + // don't auto-cancel; we just slow down. + this.collectionTracker.markAttempted(agreement.id, nowSeconds) + logger.warn('Failed to collect agreement, will retry after throttle', { + agreementId: agreement.id, + error: errorDetail, + deterministic: isDeterministic, + }) } - // 'paused' / 'unauthorized' are pre-flight checks; no on-chain attempt was - // made, so don't bump the tracker. Next tick will retry immediately. - } catch (err) { - const isDeterministic = this.isDeterministicError(err) - const errorDetail = isDeterministic - ? tryParseCustomError(err) - : err instanceof Error - ? err.message - : String(err) - // Throttle the retry so we don't hammer the chain on every poll cycle. - // Deterministic errors during collection are typically recoverable - // (subgraph sync, allocation reconcile, provision changes), so we - // don't auto-cancel; we just slow down. - this.collectionTracker.markAttempted(agreement.id, nowSeconds) - logger.warn('Failed to collect agreement, will retry after throttle', { - agreementId: agreement.id, - error: errorDetail, - deterministic: isDeterministic, - }) } + } catch (err) { + // Catch outer fetch failures (subgraph query, RPC getBlockNumber/getBlock, + // cancelBlocklistedAgreements) so a transient failure skips this tick rather + // than aborting the entire reconcile cycle for every network. + logger.warn('Skipping DIPs collection tick due to fetch failure', { err }) } } @@ -767,9 +859,20 @@ export class DipsManager { ): Promise { if (this.isDeterministicError(error)) { const parsedError = tryParseCustomError(error) + const callException = error as { + reason?: string + data?: string + message?: string + transaction?: { to?: string; data?: string } + } this.logger.warn('Rejecting proposal: deterministic contract error', { proposalId: proposal.id, + deployment: proposal.subgraphDeploymentId.ipfsHash, error: parsedError, + revertReason: callException.reason ?? null, + revertData: callException.data ?? null, + errorMessage: callException.message ?? null, + contractTarget: callException.transaction?.to ?? null, }) await consumer.markRejected(proposal.id, String(parsedError)) await this.cleanupDipsRule(consumer, proposal) @@ -827,4 +930,48 @@ export class DipsManager { const { deployments } = await this.getDipsTargetDeployments() return deployments } + startProposalAcceptanceLoop() { + if (!this.pendingRcaConsumer) { + this.logger.debug('No pending RCA consumer configured, skipping acceptance loop') + return + } + const consumer = this.pendingRcaConsumer + + sequentialTimerMap( + { + logger: this.logger, + milliseconds: DIPS_ACCEPTANCE_INTERVAL, + }, + async () => { + const proposals = await consumer.getPendingProposals() + if (proposals.length === 0) { + return + } + + this.logger.info('Processing pending RCA proposals for on-chain acceptance', { + count: proposals.length, + }) + + const activeAllocations = await this.network.networkMonitor.allocations( + AllocationStatus.ACTIVE, + ) + + for (const proposal of proposals) { + try { + await this.processProposal(consumer, proposal, activeAllocations) + } catch (error) { + this.logger.error('Unexpected error processing proposal', { + proposalId: proposal.id, + error, + }) + } + } + }, + { + onError: (err) => { + this.logger.error('Failed to process pending RCA proposals', { err }) + }, + }, + ) + } } diff --git a/packages/indexer-common/src/indexing-fees/index.ts b/packages/indexer-common/src/indexing-fees/index.ts index 0f8dba604..eb03983ff 100644 --- a/packages/indexer-common/src/indexing-fees/index.ts +++ b/packages/indexer-common/src/indexing-fees/index.ts @@ -1,3 +1,4 @@ export * from './dips' +export * from './offer-monitor' export * from './types' export * from './pending-rca-consumer' diff --git a/packages/indexer-common/src/indexing-fees/offer-monitor.ts b/packages/indexer-common/src/indexing-fees/offer-monitor.ts new file mode 100644 index 000000000..05a8925b5 --- /dev/null +++ b/packages/indexer-common/src/indexing-fees/offer-monitor.ts @@ -0,0 +1,61 @@ +import { Logger } from '@graphprotocol/common-ts' +import gql from 'graphql-tag' +import { SubgraphClient } from '../subgraph-client' + +const OFFER_EXISTS_QUERY = gql` + query offerExists($id: ID!) { + offer(id: $id) { + id + } + } +` + +// The pending_rca_proposals table stores the agreementId as a UUID +// (`bea99452-e465-e9d9-8a79-2356edcc7e92`); the subgraph keys Offer +// entities by the on-chain bytes16 hex (`0xbea99452...92`). +function toBytes16Id(agreementId: string): string { + if (agreementId.startsWith('0x')) { + return agreementId.toLowerCase() + } + return `0x${agreementId.replace(/-/g, '').toLowerCase()}` +} + +/** + * Checks the indexing-payments-subgraph for the presence of an `Offer` + * entity. Used by the DIPs accept path to gate `acceptIndexingAgreement` + * on dipper's `offer()` tx having landed on-chain; without this gate the + * contract reverts with `RecurringCollectorInvalidSigner` whenever the + * agent's poll beats dipper's submission. + * + * Subgraph errors are treated as "not yet" (transient) — better to wait + * one more tick than to false-positive a rejection on a momentary + * subgraph hiccup. + */ +export class OfferMonitor { + constructor( + private readonly logger: Logger, + private readonly subgraph: SubgraphClient, + ) {} + + async offerExists(agreementId: string): Promise { + try { + const result = await this.subgraph.query(OFFER_EXISTS_QUERY, { + id: toBytes16Id(agreementId), + }) + if (result.error) { + this.logger.debug( + 'Offer existence check failed (will retry on next tick)', + { agreementId, err: result.error }, + ) + return false + } + return Boolean(result.data?.offer) + } catch (err) { + this.logger.debug( + 'Offer existence check threw (will retry on next tick)', + { agreementId, err }, + ) + return false + } + } +} diff --git a/packages/indexer-common/src/network-specification.ts b/packages/indexer-common/src/network-specification.ts index 5e941d4b3..868b1d8f7 100644 --- a/packages/indexer-common/src/network-specification.ts +++ b/packages/indexer-common/src/network-specification.ts @@ -153,6 +153,10 @@ export const ProtocolSubgraphs = z networkSubgraph: Subgraph, epochSubgraph: Subgraph, tapSubgraph: OptionalSubgraph, + // Source of truth for on-chain RCA offers. The DIPs accept path + // queries this before calling acceptIndexingAgreement so the + // contract's rcaOffers check doesn't revert on a race where the + // offer tx hasn't landed yet. indexingPaymentsSubgraph: OptionalSubgraph, }) .strict() From e5280ec89e4f423e17c7a17489fdc23dc8b2fb94 Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Wed, 29 Apr 2026 12:28:58 +0800 Subject: [PATCH 02/11] perf(dips): parallelise accept loop and hoist deploy ensure A single slow submit stalled the serial accept loop. pMap with concurrency 4 parallelises, while the wallet's nonce queue keeps submits ordered. graphNode.ensure hoisted to rule creation, phase timers added, and ABI-fragment errors marked deterministic. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../indexer-common/src/indexing-fees/dips.ts | 114 +++++++++++++++--- 1 file changed, 94 insertions(+), 20 deletions(-) diff --git a/packages/indexer-common/src/indexing-fees/dips.ts b/packages/indexer-common/src/indexing-fees/dips.ts index f26f36d8e..6171c596c 100644 --- a/packages/indexer-common/src/indexing-fees/dips.ts +++ b/packages/indexer-common/src/indexing-fees/dips.ts @@ -17,6 +17,7 @@ import { SubgraphIdentifierType, upsertIndexingRule, } from '@graphprotocol/indexer-common' +import pMap from 'p-map' import { PendingRcaProposal } from '../indexer-management/models/pending-rca-proposal' import { OfferMonitor } from './offer-monitor' @@ -35,11 +36,18 @@ import { CollectionTracker } from './collection-tracker' const DIPS_ACCEPTANCE_INTERVAL = 5_000 // POIs are computed against a recent-but-not-tip block to avoid reorg edge cases. const RECENT_BLOCK_OFFSET = 10 +// Per-tick parallelism cap. Proposals target distinct agreementIds and the +// wallet's nonce queue serialises submissions, so concurrent processProposal +// calls are safe; small enough that a stuck call doesn't head-of-line others. +const DIPS_ACCEPT_CONCURRENCY = 4 // When the offer hasn't landed on-chain yet, keep retrying until the RCA // deadline is within this window. Inside the window, give up cleanly so // reassessment can pick a replacement before the deadline lapses. const OFFER_GATE_DEADLINE_SAFETY_MARGIN_SECONDS = 30n +const elapsedMs = (start: bigint): number => + Number(process.hrtime.bigint() - start) / 1_000_000 + export class DipsManager { declare pendingRcaConsumer: PendingRcaConsumer declare collectionTracker: CollectionTracker @@ -156,6 +164,15 @@ export class DipsManager { return } + // Deploy must precede the on-chain allocation: reconcile reads + // graph_node.indexingStatus, and an undefined status triggers + // failsHealthCheck → spurious unallocate. Idempotent; graph-node + // dedupes redundant calls across proposals sharing a deployment. + await this.graphNode.ensure( + `indexer-agent/${deploymentId.ipfsHash.slice(-10)}`, + deploymentId, + ) + const { amount } = await this.getDipsAllocationAmount(deploymentId) this.logger.info( `Creating DIPS indexing rule for deployment ${deploymentId.toString()}`, @@ -280,6 +297,17 @@ export class DipsManager { activeAllocations: Allocation[], ): Promise { const now = BigInt(Math.floor(Date.now() / 1000)) + const t0 = process.hrtime.bigint() + const phases: Record = {} + const logSummary = (outcome: string) => { + this.logger.info('processProposal completed', { + proposalId: proposal.id, + deployment: proposal.subgraphDeploymentId.ipfsHash, + outcome, + phases, + totalMs: elapsedMs(t0), + }) + } if (proposal.deadline <= now) { this.logger.info('Rejecting proposal: deadline expired', { @@ -289,6 +317,7 @@ export class DipsManager { }) await consumer.markRejected(proposal.id, 'deadline_expired') await this.cleanupDipsRule(consumer, proposal) + logSummary('rejected_deadline_expired') return } @@ -296,6 +325,7 @@ export class DipsManager { // loop: the accept tx can confirm and clear the pending row before the next // reconcile tick, which would leave the rule uncreated and graph-node never // told to deploy the subgraph. + const tRule = process.hrtime.bigint() const allDeploymentRules = await this.models.IndexingRule.findAll({ where: { identifierType: SubgraphIdentifierType.DEPLOYMENT }, }) @@ -309,6 +339,8 @@ export class DipsManager { }`, ) await consumer.markRejected(proposal.id, 'deployment blocklisted') + phases.ruleMs = elapsedMs(tRule) + logSummary('rejected_blocklisted') return } await this.upsertDipsRuleFor(proposal.subgraphDeploymentId, { @@ -317,6 +349,7 @@ export class DipsManager { Number(proposal.maxSecondsPerCollection), ), }) + phases.ruleMs = elapsedMs(tRule) // Gate accept on the on-chain offer existing. If dipper's offer() tx was // evicted (nonce collision, gas spike), rcaOffers is empty and @@ -324,7 +357,9 @@ export class DipsManager { // a transient state, retry next tick. Inside the safety margin, give up // so reassessment can pick a replacement before the deadline lapses. if (this.offerMonitor) { + const tOffer = process.hrtime.bigint() const offerOnChain = await this.offerMonitor.offerExists(proposal.id) + phases.offerMs = elapsedMs(tOffer) if (!offerOnChain) { if (proposal.deadline > now + OFFER_GATE_DEADLINE_SAFETY_MARGIN_SECONDS) { this.logger.debug( @@ -335,6 +370,7 @@ export class DipsManager { now: now.toString(), }, ) + logSummary('waiting_for_offer') return } this.logger.warn( @@ -347,29 +383,25 @@ export class DipsManager { ) await consumer.markRejected(proposal.id, 'offer_never_landed') await this.cleanupDipsRule(consumer, proposal) + logSummary('rejected_offer_never_landed') return } } - // Deploy the subgraph to graph-node before the accept multicall creates the - // allocation on-chain. The main reconcile loop reads `graph_node.indexingStatus` - // for the deployment; if graph-node has never been told to deploy it, - // indexingStatus is undefined and `failsHealthCheck` triggers a spurious - // unallocate of the allocation we just created. `ensure` is idempotent. - await this.graphNode.ensure( - `indexer-agent/${proposal.subgraphDeploymentId.ipfsHash.slice(-10)}`, - proposal.subgraphDeploymentId, - ) - const allocation = activeAllocations.find( (a) => a.subgraphDeployment.id.bytes32 === proposal.subgraphDeploymentId.bytes32, ) + const tAccept = process.hrtime.bigint() if (allocation) { await this.acceptWithExistingAllocation(consumer, proposal, allocation) } else { await this.acceptWithNewAllocation(consumer, proposal, activeAllocations) } + phases.acceptMs = elapsedMs(tAccept) + // The accept helpers swallow errors via handleAcceptError; per-outcome + // log lines from inside them tell the actual story. + logSummary('accept_attempted') } private async acceptWithExistingAllocation( @@ -857,6 +889,24 @@ export class DipsManager { proposal: DecodedRcaProposal, error: unknown, ): Promise { + // ABI-level mismatches are deterministic; retrying for the full RCA + // deadline only burns the budget. Mark rejected immediately so dipper + // reassessment can pick a working candidate. + const abiMismatchReason = this.classifyAbiMismatch(error) + if (abiMismatchReason !== null) { + const callException = error as { code?: string; message?: string } + this.logger.warn('Rejecting proposal: ABI mismatch (non-recoverable)', { + proposalId: proposal.id, + deployment: proposal.subgraphDeploymentId.ipfsHash, + reason: abiMismatchReason, + ethersCode: callException.code ?? null, + errorMessage: callException.message ?? null, + }) + await consumer.markRejected(proposal.id, abiMismatchReason) + await this.cleanupDipsRule(consumer, proposal) + return + } + if (this.isDeterministicError(error)) { const parsedError = tryParseCustomError(error) const callException = error as { @@ -884,6 +934,20 @@ export class DipsManager { } } + private classifyAbiMismatch(error: unknown): string | null { + const typedError = error as { code?: string; operation?: string } + if ( + typedError?.code === 'UNSUPPORTED_OPERATION' && + typedError?.operation === 'fragment' + ) { + return 'abi_fragment_mismatch' + } + if (typedError?.code === 'INVALID_ARGUMENT') { + return 'abi_invalid_argument' + } + return null + } + private isDeterministicError(error: unknown): boolean { const typedError = error as { code?: string } return typedError?.code === 'CALL_EXCEPTION' @@ -950,22 +1014,32 @@ export class DipsManager { this.logger.info('Processing pending RCA proposals for on-chain acceptance', { count: proposals.length, + concurrency: DIPS_ACCEPT_CONCURRENCY, }) const activeAllocations = await this.network.networkMonitor.allocations( AllocationStatus.ACTIVE, ) - for (const proposal of proposals) { - try { - await this.processProposal(consumer, proposal, activeAllocations) - } catch (error) { - this.logger.error('Unexpected error processing proposal', { - proposalId: proposal.id, - error, - }) - } - } + // Run up to DIPS_ACCEPT_CONCURRENCY proposals in parallel. Each + // processProposal call targets a distinct agreementId and has no + // shared mutable state with the others. Per-proposal failures are + // already isolated by handleAcceptError; the explicit try/catch + // here defends against any unexpected throw escaping that. + await pMap( + proposals, + async (proposal) => { + try { + await this.processProposal(consumer, proposal, activeAllocations) + } catch (error) { + this.logger.error('Unexpected error processing proposal', { + proposalId: proposal.id, + error, + }) + } + }, + { concurrency: DIPS_ACCEPT_CONCURRENCY, stopOnError: false }, + ) }, { onError: (err) => { From 53136e4b3ffb5135c9d42df66a695fc8552a2b2e Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Wed, 29 Apr 2026 12:37:33 +0800 Subject: [PATCH 03/11] feat(dips): self-reconcile allocations against indexing-payments If dipper expires an agreement or the agent restarts mid-flow, the local dips rule survives but no agreement backs it, so the indexer keeps indexing without payment. A 60s sweep removes dips rules with no matching Accepted agreement; reconcile then closes the allocation. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/indexer-management/allocations.ts | 1 + .../__tests__/sweep-allocations.test.ts | 188 ++++++++++++++++++ .../indexer-common/src/indexing-fees/dips.ts | 173 ++++++++++++++++ 3 files changed, 362 insertions(+) create mode 100644 packages/indexer-common/src/indexing-fees/__tests__/sweep-allocations.test.ts diff --git a/packages/indexer-common/src/indexer-management/allocations.ts b/packages/indexer-common/src/indexer-management/allocations.ts index c650d657d..d19e50047 100644 --- a/packages/indexer-common/src/indexer-management/allocations.ts +++ b/packages/indexer-common/src/indexer-management/allocations.ts @@ -176,6 +176,7 @@ export class AllocationManager { this.pendingRcaModel, ) this.dipsManager.startProposalAcceptanceLoop() + this.dipsManager.startAllocationSweepLoop() } } diff --git a/packages/indexer-common/src/indexing-fees/__tests__/sweep-allocations.test.ts b/packages/indexer-common/src/indexing-fees/__tests__/sweep-allocations.test.ts new file mode 100644 index 000000000..9b0f56619 --- /dev/null +++ b/packages/indexer-common/src/indexing-fees/__tests__/sweep-allocations.test.ts @@ -0,0 +1,188 @@ +import { createLogger, SubgraphDeploymentID } from '@graphprotocol/common-ts' +import { DipsManager } from '../dips' +import { + IndexerManagementModels, + Network, + AllocationManager, +} from '@graphprotocol/indexer-common' + +const logger = createLogger({ + name: 'DipsManager.sweep.test', + async: false, + level: 'error', +}) + +// SubgraphDeploymentID maps an IPFS hash to a known bytes32; we capture +// that mapping for the subgraph fixture and use the same hash on the rule. +const TEST_DEPLOYMENT_IPFS = 'QmPdbQaRCMhgouSZSW3sHZxU3M8KwcngWASvreAexzmmrh' +const RESOLVED_BYTES32 = new SubgraphDeploymentID(TEST_DEPLOYMENT_IPFS).bytes32 +const OTHER_DEPLOYMENT_IPFS = 'QmTzQ1JRkWErjk39mryYw2WVaphAZNAREyMchXzYQ7c15W' + +function nowSeconds(): number { + return Math.floor(Date.now() / 1000) +} + +function createMockModels(rules: Array<{ id: number; identifier: string }>) { + const destroy = jest.fn().mockResolvedValue(1) + return { + models: { + IndexingRule: { + findAll: jest.fn().mockResolvedValue(rules), + destroy, + }, + } as unknown as IndexerManagementModels, + destroy, + } +} + +function createMockNetwork( + subgraphResult: unknown, + indexerAddress = '0x5555555555555555555555555555555555555555', +) { + const query = jest.fn().mockResolvedValue(subgraphResult) + return { + network: { + specification: { + indexerOptions: { + address: indexerAddress, + dipsCollectionTarget: 1, + }, + networkIdentifier: 'eip155:1337', + }, + indexingPaymentsSubgraph: { query }, + } as unknown as Network, + query, + } +} + +function createDipsManager( + network: Network, + models: IndexerManagementModels, +): DipsManager { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return new DipsManager(logger, models, network, {} as any, {} as AllocationManager) +} + +describe('DipsManager.sweepDipsAllocations', () => { + test('removes a dips rule that has no matching Accepted agreement', async () => { + const { models, destroy } = createMockModels([ + { id: 42, identifier: TEST_DEPLOYMENT_IPFS }, + ]) + const { network } = createMockNetwork({ + data: { + _meta: { block: { timestamp: nowSeconds() } }, + // Indexer has no accepted agreements at all + indexingAgreements: [], + }, + }) + + const dm = createDipsManager(network, models) + await dm.sweepDipsAllocations() + + expect(destroy).toHaveBeenCalledTimes(1) + expect(destroy).toHaveBeenCalledWith({ where: { id: 42 } }) + }) + + test('keeps a dips rule whose deployment has an Accepted agreement', async () => { + const { models, destroy } = createMockModels([ + { id: 7, identifier: TEST_DEPLOYMENT_IPFS }, + ]) + const { network } = createMockNetwork({ + data: { + _meta: { block: { timestamp: nowSeconds() } }, + indexingAgreements: [ + { + id: '0xagreement1', + subgraphDeploymentId: RESOLVED_BYTES32, + }, + ], + }, + }) + + const dm = createDipsManager(network, models) + await dm.sweepDipsAllocations() + + expect(destroy).not.toHaveBeenCalled() + }) + + test('disables only the unbacked rule, leaves backed rules intact', async () => { + const { models, destroy } = createMockModels([ + { id: 1, identifier: TEST_DEPLOYMENT_IPFS }, + { id: 2, identifier: OTHER_DEPLOYMENT_IPFS }, + ]) + const { network } = createMockNetwork({ + data: { + _meta: { block: { timestamp: nowSeconds() } }, + indexingAgreements: [ + { + id: '0xagreement1', + // Only backs the first rule + subgraphDeploymentId: RESOLVED_BYTES32, + }, + ], + }, + }) + + const dm = createDipsManager(network, models) + await dm.sweepDipsAllocations() + + expect(destroy).toHaveBeenCalledTimes(1) + expect(destroy).toHaveBeenCalledWith({ where: { id: 2 } }) + }) + + test('skips the sweep when subgraph block timestamp is stale', async () => { + const { models, destroy } = createMockModels([ + { id: 99, identifier: TEST_DEPLOYMENT_IPFS }, + ]) + // Timestamp 10 minutes behind wall clock — beyond the 300s threshold + const stale = nowSeconds() - 600 + const { network } = createMockNetwork({ + data: { + _meta: { block: { timestamp: stale } }, + indexingAgreements: [], + }, + }) + + const dm = createDipsManager(network, models) + await dm.sweepDipsAllocations() + + // Stale subgraph: do not act on its data + expect(destroy).not.toHaveBeenCalled() + }) + + test('skips the sweep when subgraph query fails', async () => { + const { models, destroy } = createMockModels([ + { id: 5, identifier: TEST_DEPLOYMENT_IPFS }, + ]) + const { network } = createMockNetwork({ + error: new Error('connection refused'), + }) + + const dm = createDipsManager(network, models) + await dm.sweepDipsAllocations() + + expect(destroy).not.toHaveBeenCalled() + }) + + test('is a no-op when indexingPaymentsSubgraph is not configured', async () => { + const { models, destroy } = createMockModels([ + { id: 1, identifier: TEST_DEPLOYMENT_IPFS }, + ]) + const network = { + specification: { + indexerOptions: { + address: '0x5555555555555555555555555555555555555555', + dipsCollectionTarget: 1, + }, + networkIdentifier: 'eip155:1337', + }, + indexingPaymentsSubgraph: null, + } as unknown as Network + + const dm = createDipsManager(network, models) + await dm.sweepDipsAllocations() + + expect(destroy).not.toHaveBeenCalled() + }) +}) + diff --git a/packages/indexer-common/src/indexing-fees/dips.ts b/packages/indexer-common/src/indexing-fees/dips.ts index 6171c596c..dd4f45b28 100644 --- a/packages/indexer-common/src/indexing-fees/dips.ts +++ b/packages/indexer-common/src/indexing-fees/dips.ts @@ -17,6 +17,7 @@ import { SubgraphIdentifierType, upsertIndexingRule, } from '@graphprotocol/indexer-common' +import gql from 'graphql-tag' import pMap from 'p-map' import { PendingRcaProposal } from '../indexer-management/models/pending-rca-proposal' @@ -36,6 +37,12 @@ import { CollectionTracker } from './collection-tracker' const DIPS_ACCEPTANCE_INTERVAL = 5_000 // POIs are computed against a recent-but-not-tip block to avoid reorg edge cases. const RECENT_BLOCK_OFFSET = 10 +const DIPS_SWEEP_INTERVAL = 60_000 +// If the indexing-payments-subgraph is more than this many seconds behind +// wall-clock, treat its data as unreliable and skip the sweep this tick. +// Normal indexing lag should never approach this; anything older indicates +// the subgraph is broken / paused / disconnected. +const DIPS_SWEEP_STALENESS_THRESHOLD_SECONDS = 300 // Per-tick parallelism cap. Proposals target distinct agreementIds and the // wallet's nonce queue serialises submissions, so concurrent processProposal // calls are safe; small enough that a stuck call doesn't head-of-line others. @@ -1048,4 +1055,170 @@ export class DipsManager { }, ) } + + /** + * Query the indexing-payments-subgraph for the agent's accepted agreements + * and the subgraph's current chain timestamp. Used by the allocation + * sweep to verify that each `dips`-basis indexing rule has a paying + * agreement backing it. + */ + async fetchAcceptedAgreementsForSelf(): Promise<{ + deployments: Set + blockTimestamp: number | null + }> { + if (!this.network.indexingPaymentsSubgraph) { + return { deployments: new Set(), blockTimestamp: null } + } + const indexer = + this.network.specification.indexerOptions.address.toLowerCase() + const result = await this.network.indexingPaymentsSubgraph.query( + gql` + query selfAgreements($indexer: String!) { + _meta { + block { + timestamp + } + } + indexingAgreements( + where: { indexer: $indexer, state: Accepted } + first: 1000 + ) { + id + subgraphDeploymentId + } + } + `, + { indexer }, + ) + if (result.error) { + throw new Error(`indexing-payments query failed: ${result.error}`) + } + const data = result.data ?? {} + const deployments = new Set( + (data.indexingAgreements ?? []).map( + (a: { subgraphDeploymentId: string }) => + a.subgraphDeploymentId.toLowerCase(), + ), + ) + const blockTimestamp = data._meta?.block?.timestamp ?? null + return { deployments, blockTimestamp } + } + + /** + * Reconcile local `dips`-basis indexing rules against the + * indexing-payments-subgraph. Each rule represents a deployment the + * agent allocated to as part of a DIPs agreement. If the subgraph + * cannot confirm an Accepted agreement for that deployment, the rule + * is stale (the agent is allocated without payment, e.g. because + * dipper marked the agreement Expired or the original on-chain accept + * never linked back). The rule is deleted; the agent's normal + * reconciliation closes the allocation through its existing path. + * + * The subgraph block timestamp is checked first: if the subgraph is + * far behind wall-clock, the sweep is skipped this tick so we never + * disable rules based on stale data. + */ + async sweepDipsAllocations(): Promise { + if (!this.network.indexingPaymentsSubgraph) { + return + } + const logger = this.logger.child({ function: 'sweepDipsAllocations' }) + + let acceptedDeployments: Set + let blockTimestamp: number | null + try { + const result = await this.fetchAcceptedAgreementsForSelf() + acceptedDeployments = result.deployments + blockTimestamp = result.blockTimestamp + } catch (err) { + logger.warn('Skipping DIPs allocation sweep: subgraph query failed', { + err, + }) + return + } + + if (blockTimestamp === null) { + logger.warn( + 'Skipping DIPs allocation sweep: indexing-payments subgraph returned no _meta timestamp', + ) + return + } + + const nowSeconds = Math.floor(Date.now() / 1000) + const lag = nowSeconds - Number(blockTimestamp) + if (lag > DIPS_SWEEP_STALENESS_THRESHOLD_SECONDS) { + logger.warn( + 'Skipping DIPs allocation sweep: indexing-payments subgraph is stale', + { + subgraphTimestamp: blockTimestamp, + nowSeconds, + lagSeconds: lag, + thresholdSeconds: DIPS_SWEEP_STALENESS_THRESHOLD_SECONDS, + }, + ) + return + } + + const dipsRules = await this.models.IndexingRule.findAll({ + where: { + decisionBasis: IndexingDecisionBasis.DIPS, + identifierType: SubgraphIdentifierType.DEPLOYMENT, + }, + }) + + let removed = 0 + for (const rule of dipsRules) { + const deploymentBytes32 = new SubgraphDeploymentID(rule.identifier).bytes32 + const deploymentLower = deploymentBytes32.toLowerCase() + if (acceptedDeployments.has(deploymentLower)) { + continue + } + logger.warn( + 'Removing DIPs indexing rule with no backing agreement in indexing-payments-subgraph', + { + deployment: rule.identifier, + subgraphTimestamp: blockTimestamp, + }, + ) + await this.models.IndexingRule.destroy({ where: { id: rule.id } }) + removed += 1 + } + + if (removed > 0) { + logger.info('DIPs allocation sweep removed stale rules', { + removed, + rulesChecked: dipsRules.length, + acceptedAgreements: acceptedDeployments.size, + }) + } else { + logger.debug('DIPs allocation sweep: all dips rules backed', { + rulesChecked: dipsRules.length, + acceptedAgreements: acceptedDeployments.size, + }) + } + } + + startAllocationSweepLoop() { + if (!this.network.indexingPaymentsSubgraph) { + this.logger.debug( + 'No indexing-payments-subgraph configured, skipping DIPs allocation sweep loop', + ) + return + } + + sequentialTimerMap( + { + logger: this.logger, + milliseconds: DIPS_SWEEP_INTERVAL, + }, + async () => { + await this.sweepDipsAllocations() + }, + { + onError: (err) => { + this.logger.error('DIPs allocation sweep tick failed', { err }) + }, + }, + ) + } } From a0dddd08477564f391562e6d6be97403f193c0f2 Mon Sep 17 00:00:00 2001 From: Rembrandt Kuipers <50174308+RembrandtK@users.noreply.github.com> Date: Fri, 1 May 2026 14:13:17 +0100 Subject: [PATCH 04/11] chore(indexer-common): bump @graphprotocol/toolshed to 1.2.1-dips.1 (#1208) --- .github/workflows/indexer-agent-image.yml | 1 + .github/workflows/indexer-cli-image.yml | 1 + .../__tests__/offer-monitor.test.ts | 16 +++-------- .../__tests__/sweep-allocations.test.ts | 1 - .../indexer-common/src/indexing-fees/dips.ts | 28 +++++++------------ .../src/indexing-fees/offer-monitor.ts | 16 +++++------ 6 files changed, 24 insertions(+), 39 deletions(-) diff --git a/.github/workflows/indexer-agent-image.yml b/.github/workflows/indexer-agent-image.yml index e36f6ba5d..f64db6441 100644 --- a/.github/workflows/indexer-agent-image.yml +++ b/.github/workflows/indexer-agent-image.yml @@ -1,6 +1,7 @@ name: Indexer Agent Image on: + workflow_dispatch: push: branches: - main diff --git a/.github/workflows/indexer-cli-image.yml b/.github/workflows/indexer-cli-image.yml index c3b36fb83..4ded8c3dc 100644 --- a/.github/workflows/indexer-cli-image.yml +++ b/.github/workflows/indexer-cli-image.yml @@ -1,6 +1,7 @@ name: Indexer CLI Image on: + workflow_dispatch: push: branches: - main diff --git a/packages/indexer-common/src/indexing-fees/__tests__/offer-monitor.test.ts b/packages/indexer-common/src/indexing-fees/__tests__/offer-monitor.test.ts index 7fe9c723c..f978bec08 100644 --- a/packages/indexer-common/src/indexing-fees/__tests__/offer-monitor.test.ts +++ b/packages/indexer-common/src/indexing-fees/__tests__/offer-monitor.test.ts @@ -13,9 +13,7 @@ describe('OfferMonitor', () => { const subgraph = { query } as never const monitor = new OfferMonitor(logger, subgraph) - const exists = await monitor.offerExists( - 'bea99452-e465-e9d9-8a79-2356edcc7e92', - ) + const exists = await monitor.offerExists('bea99452-e465-e9d9-8a79-2356edcc7e92') expect(exists).toBe(true) expect(query).toHaveBeenCalledTimes(1) @@ -41,23 +39,17 @@ describe('OfferMonitor', () => { const subgraph = { query } as never const monitor = new OfferMonitor(logger, subgraph) - const exists = await monitor.offerExists( - 'bea99452-e465-e9d9-8a79-2356edcc7e92', - ) + const exists = await monitor.offerExists('bea99452-e465-e9d9-8a79-2356edcc7e92') expect(exists).toBe(false) }) it('treats subgraph errors as transient (not yet on-chain)', async () => { - const query = jest - .fn() - .mockResolvedValue({ error: new Error('subgraph hiccup') }) + const query = jest.fn().mockResolvedValue({ error: new Error('subgraph hiccup') }) const subgraph = { query } as never const monitor = new OfferMonitor(logger, subgraph) - const exists = await monitor.offerExists( - 'bea99452-e465-e9d9-8a79-2356edcc7e92', - ) + const exists = await monitor.offerExists('bea99452-e465-e9d9-8a79-2356edcc7e92') expect(exists).toBe(false) }) diff --git a/packages/indexer-common/src/indexing-fees/__tests__/sweep-allocations.test.ts b/packages/indexer-common/src/indexing-fees/__tests__/sweep-allocations.test.ts index 9b0f56619..b405e2aea 100644 --- a/packages/indexer-common/src/indexing-fees/__tests__/sweep-allocations.test.ts +++ b/packages/indexer-common/src/indexing-fees/__tests__/sweep-allocations.test.ts @@ -185,4 +185,3 @@ describe('DipsManager.sweepDipsAllocations', () => { expect(destroy).not.toHaveBeenCalled() }) }) - diff --git a/packages/indexer-common/src/indexing-fees/dips.ts b/packages/indexer-common/src/indexing-fees/dips.ts index dd4f45b28..4c5270356 100644 --- a/packages/indexer-common/src/indexing-fees/dips.ts +++ b/packages/indexer-common/src/indexing-fees/dips.ts @@ -1069,8 +1069,7 @@ export class DipsManager { if (!this.network.indexingPaymentsSubgraph) { return { deployments: new Set(), blockTimestamp: null } } - const indexer = - this.network.specification.indexerOptions.address.toLowerCase() + const indexer = this.network.specification.indexerOptions.address.toLowerCase() const result = await this.network.indexingPaymentsSubgraph.query( gql` query selfAgreements($indexer: String!) { @@ -1079,10 +1078,7 @@ export class DipsManager { timestamp } } - indexingAgreements( - where: { indexer: $indexer, state: Accepted } - first: 1000 - ) { + indexingAgreements(where: { indexer: $indexer, state: Accepted }, first: 1000) { id subgraphDeploymentId } @@ -1095,9 +1091,8 @@ export class DipsManager { } const data = result.data ?? {} const deployments = new Set( - (data.indexingAgreements ?? []).map( - (a: { subgraphDeploymentId: string }) => - a.subgraphDeploymentId.toLowerCase(), + (data.indexingAgreements ?? []).map((a: { subgraphDeploymentId: string }) => + a.subgraphDeploymentId.toLowerCase(), ), ) const blockTimestamp = data._meta?.block?.timestamp ?? null @@ -1147,15 +1142,12 @@ export class DipsManager { const nowSeconds = Math.floor(Date.now() / 1000) const lag = nowSeconds - Number(blockTimestamp) if (lag > DIPS_SWEEP_STALENESS_THRESHOLD_SECONDS) { - logger.warn( - 'Skipping DIPs allocation sweep: indexing-payments subgraph is stale', - { - subgraphTimestamp: blockTimestamp, - nowSeconds, - lagSeconds: lag, - thresholdSeconds: DIPS_SWEEP_STALENESS_THRESHOLD_SECONDS, - }, - ) + logger.warn('Skipping DIPs allocation sweep: indexing-payments subgraph is stale', { + subgraphTimestamp: blockTimestamp, + nowSeconds, + lagSeconds: lag, + thresholdSeconds: DIPS_SWEEP_STALENESS_THRESHOLD_SECONDS, + }) return } diff --git a/packages/indexer-common/src/indexing-fees/offer-monitor.ts b/packages/indexer-common/src/indexing-fees/offer-monitor.ts index 05a8925b5..e6b95f9b9 100644 --- a/packages/indexer-common/src/indexing-fees/offer-monitor.ts +++ b/packages/indexer-common/src/indexing-fees/offer-monitor.ts @@ -43,18 +43,18 @@ export class OfferMonitor { id: toBytes16Id(agreementId), }) if (result.error) { - this.logger.debug( - 'Offer existence check failed (will retry on next tick)', - { agreementId, err: result.error }, - ) + this.logger.debug('Offer existence check failed (will retry on next tick)', { + agreementId, + err: result.error, + }) return false } return Boolean(result.data?.offer) } catch (err) { - this.logger.debug( - 'Offer existence check threw (will retry on next tick)', - { agreementId, err }, - ) + this.logger.debug('Offer existence check threw (will retry on next tick)', { + agreementId, + err, + }) return false } } From b00011566be8d7067da403cb2e6e0075eebbe5cf Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Tue, 5 May 2026 20:25:17 +0800 Subject: [PATCH 05/11] refactor(agent): drop unused activeAllocations from dips collect loop The reconcile-cycle callback iterated networks via mapNetworkMapped with activeAllocations as input, but the body only calls dipsManager.collectAgreementPayments which doesn't read it. Switch to the simpler map() that doesn't require a per-network value. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/indexer-agent/src/agent.ts | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/packages/indexer-agent/src/agent.ts b/packages/indexer-agent/src/agent.ts index 36100ab7c..0dc389dca 100644 --- a/packages/indexer-agent/src/agent.ts +++ b/packages/indexer-agent/src/agent.ts @@ -709,18 +709,15 @@ export class Agent { return } - await this.multiNetworks.mapNetworkMapped( - activeAllocations, - async ({ network, operator }, activeAllocations: Allocation[]) => { - if (network.specification.indexerOptions.enableDips) { - if (!operator.dipsManager) { - throw new Error('DipsManager is not available') - } - - await operator.dipsManager.collectAgreementPayments() + await this.multiNetworks.map(async ({ network, operator }) => { + if (network.specification.indexerOptions.enableDips) { + if (!operator.dipsManager) { + throw new Error('DipsManager is not available') } - }, - ) + + await operator.dipsManager.collectAgreementPayments() + } + }) }, ) } From 9061fd9dbd21a305a0f50a7ea07b4e31ab07784d Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Wed, 13 May 2026 17:57:26 +0800 Subject: [PATCH 06/11] fix(dips): release allocation protection on agreement cancel The pre-unallocate protection check treated both Accepted and CanceledByPayer as active, leaving allocations stranded on chain after dipper canceled the agreement. Narrow it to Accepted only so the agent closes the allocation as the cancel intends. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/indexer-common/src/indexer-management/monitor.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/indexer-common/src/indexer-management/monitor.ts b/packages/indexer-common/src/indexer-management/monitor.ts index 6de5f7e2f..aa793de97 100644 --- a/packages/indexer-common/src/indexer-management/monitor.ts +++ b/packages/indexer-common/src/indexer-management/monitor.ts @@ -63,11 +63,16 @@ export class NetworkMonitor { if (!this.indexingPaymentsSubgraph) { return false } + // Only `Accepted` blocks unallocation. Once the agreement is canceled + // (by payer or by us), the on-chain agreement is already gone, so + // protecting the allocation only strands it. The agent's separate + // collectAgreementPayments loop handles any final collect on + // `CanceledByPayer` agreements independently of allocation lifetime. const result = await this.indexingPaymentsSubgraph.checkedQuery( gql` query indexingAgreements($allocationId: Bytes!) { indexingAgreements( - where: { allocationId: $allocationId, state_in: [Accepted, CanceledByPayer] } + where: { allocationId: $allocationId, state: Accepted } first: 1 ) { id From d7ce1f929ada9a081a5fa7680084ba11ecf4979c Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Wed, 13 May 2026 18:04:31 +0800 Subject: [PATCH 07/11] fix(dips): drop unused DipsReceiptStatus import DipsReceiptStatus was only used by the DipsCollector class which was removed during the earlier rebase onto main-dips. ESLint's no-unused-vars rule flagged the leftover import. Drop it. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/indexer-common/src/indexing-fees/dips.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/indexer-common/src/indexing-fees/dips.ts b/packages/indexer-common/src/indexing-fees/dips.ts index 4c5270356..51412da17 100644 --- a/packages/indexer-common/src/indexing-fees/dips.ts +++ b/packages/indexer-common/src/indexing-fees/dips.ts @@ -8,7 +8,6 @@ import { Allocation, AllocationManager, AllocationStatus, - DipsReceiptStatus, GraphNode, IndexerManagementModels, IndexingDecisionBasis, From b398e3c305d8d56a4c1c7c6c1a9a6f04469052d9 Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Wed, 13 May 2026 18:08:57 +0800 Subject: [PATCH 08/11] fix(dips): restore sequentialTimerMap import and sweep test arity Two compile errors surfaced after the rebase onto main-dips. dips.ts uses sequentialTimerMap in the acceptance and sweep loops but the import was dropped. sweep-allocations.test.ts constructs DipsManager with 5 args; the constructor on main-dips takes 6. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../indexing-fees/__tests__/sweep-allocations.test.ts | 9 ++++++++- packages/indexer-common/src/indexing-fees/dips.ts | 1 + 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/packages/indexer-common/src/indexing-fees/__tests__/sweep-allocations.test.ts b/packages/indexer-common/src/indexing-fees/__tests__/sweep-allocations.test.ts index b405e2aea..3e6026949 100644 --- a/packages/indexer-common/src/indexing-fees/__tests__/sweep-allocations.test.ts +++ b/packages/indexer-common/src/indexing-fees/__tests__/sweep-allocations.test.ts @@ -60,7 +60,14 @@ function createDipsManager( models: IndexerManagementModels, ): DipsManager { // eslint-disable-next-line @typescript-eslint/no-explicit-any - return new DipsManager(logger, models, network, {} as any, {} as AllocationManager) + return new DipsManager( + logger, + models, + network, + {} as any, // eslint-disable-line @typescript-eslint/no-explicit-any + {} as AllocationManager, + {} as any, // eslint-disable-line @typescript-eslint/no-explicit-any + ) } describe('DipsManager.sweepDipsAllocations', () => { diff --git a/packages/indexer-common/src/indexing-fees/dips.ts b/packages/indexer-common/src/indexing-fees/dips.ts index 51412da17..9dae1d83c 100644 --- a/packages/indexer-common/src/indexing-fees/dips.ts +++ b/packages/indexer-common/src/indexing-fees/dips.ts @@ -32,6 +32,7 @@ import { SubgraphIndexingAgreement, } from './agreement-monitor' import { CollectionTracker } from './collection-tracker' +import { sequentialTimerMap } from '../sequential-timer' const DIPS_ACCEPTANCE_INTERVAL = 5_000 // POIs are computed against a recent-but-not-tip block to avoid reorg edge cases. From 986146b1dea1d44dabda57f2cd70deb355687fa3 Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Wed, 13 May 2026 18:23:02 +0800 Subject: [PATCH 09/11] fix(agent): make migration 24 idempotent against pre-existing dips enum The 19-add-dips-to-decision-basis migration was deleted and folded into migration 24-add-dips-schema. DBs that already ran 19 carry the enum value but no 24 marker, so 24 re-runs and fails on duplicate enum. Switch to ADD VALUE IF NOT EXISTS to keep 24 idempotent. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../indexer-agent/src/db/migrations/24-add-dips-schema.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/indexer-agent/src/db/migrations/24-add-dips-schema.ts b/packages/indexer-agent/src/db/migrations/24-add-dips-schema.ts index d3bb6fbb4..a9b75f60b 100644 --- a/packages/indexer-agent/src/db/migrations/24-add-dips-schema.ts +++ b/packages/indexer-agent/src/db/migrations/24-add-dips-schema.ts @@ -16,11 +16,13 @@ export async function up({ context }: Context): Promise { // 1. Add 'dips' to the IndexingRules.decisionBasis enum. // Skipped on fresh DBs — sequelize.sync() will create the enum already // including 'dips' from the model definition. Existing prod DBs need this - // ALTER to add the value to a pre-existing enum type. + // ALTER to add the value to a pre-existing enum type. IF NOT EXISTS keeps + // the migration idempotent against DBs that already have the value from + // the now-deleted migration 19-add-dips-to-decision-basis. if (await queryInterface.tableExists('IndexingRules')) { logger.info(`Adding 'dips' to enum_IndexingRules_decisionBasis`) await queryInterface.sequelize.query( - `ALTER TYPE "enum_IndexingRules_decisionBasis" ADD VALUE 'dips'`, + `ALTER TYPE "enum_IndexingRules_decisionBasis" ADD VALUE IF NOT EXISTS 'dips'`, ) } else { logger.debug( From e15eb665695cae494201d7f6b99109d9a5c72a9b Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Fri, 15 May 2026 19:36:39 +0800 Subject: [PATCH 10/11] style(dips): apply prettier to accept-proposals test Run yarn format so the check step's git diff --exit-code no longer trips on trailing whitespace and a mis-indented eslint-disable comment. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/indexing-fees/__tests__/accept-proposals.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts b/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts index 23cf4bf4a..a43587ceb 100644 --- a/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts +++ b/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts @@ -183,8 +183,8 @@ function createDipsManager( ): DipsManager { const graphNode = { ensure: jest.fn().mockResolvedValue(undefined) } // eslint-disable-next-line @typescript-eslint/no-explicit-any - const dm = new DipsManager(logger, models, network, graphNode as any, parent, {} as any) -// eslint-disable-next-line @typescript-eslint/no-explicit-any + const dm = new DipsManager(logger, models, network, graphNode as any, parent, {} as any) + // eslint-disable-next-line @typescript-eslint/no-explicit-any ;(dm as any).pendingRcaConsumer = consumer if (offerMonitor !== undefined) { // eslint-disable-next-line @typescript-eslint/no-explicit-any From 766e71deb2f10d287a041235424d2894040624bf Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Fri, 15 May 2026 22:39:47 +0800 Subject: [PATCH 11/11] refactor(dips): start accept and sweep loops only inside the agent Move the two background loops out of AllocationManager's constructor and into the agent's bootstrap. Short-lived consumers like the CLI and jest setups go through the same client path and previously leaked timers that survived test teardown. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/indexer-agent/src/commands/start.ts | 8 ++++++++ .../indexer-common/src/indexer-management/allocations.ts | 5 +++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/packages/indexer-agent/src/commands/start.ts b/packages/indexer-agent/src/commands/start.ts index 746e84177..ccd241678 100644 --- a/packages/indexer-agent/src/commands/start.ts +++ b/packages/indexer-agent/src/commands/start.ts @@ -801,6 +801,14 @@ export async function run( new Operator(logger, indexerManagementClient, spec), ) + // Start DIPs background loops only on the long-running agent. The CLI and + // jest setups go through the same management-client path but don't want + // these timers running for their short-lived processes. + for (const operator of operators) { + operator.dipsManager?.startProposalAcceptanceLoop() + operator.dipsManager?.startAllocationSweepLoop() + } + // -------------------------------------------------------------------------------- // * The Agent itself // -------------------------------------------------------------------------------- diff --git a/packages/indexer-common/src/indexer-management/allocations.ts b/packages/indexer-common/src/indexer-management/allocations.ts index d19e50047..f744ab3b1 100644 --- a/packages/indexer-common/src/indexer-management/allocations.ts +++ b/packages/indexer-common/src/indexer-management/allocations.ts @@ -167,6 +167,9 @@ export class AllocationManager { private pendingRcaModel: typeof PendingRcaProposal, ) { if (this.network.specification.indexerOptions.enableDips) { + // Construction is intentionally side-effect-free: the agent owns + // when the accept/sweep loops start so short-lived consumers (CLI, + // jest setups) don't leak background timers. this.dipsManager = new DipsManager( this.logger, this.models, @@ -175,8 +178,6 @@ export class AllocationManager { this, this.pendingRcaModel, ) - this.dipsManager.startProposalAcceptanceLoop() - this.dipsManager.startAllocationSweepLoop() } }