Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/indexer-agent-image.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: Indexer Agent Image

on:
workflow_dispatch:
push:
branches:
- main
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/indexer-cli-image.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: Indexer CLI Image

on:
workflow_dispatch:
push:
branches:
- main
Expand Down
18 changes: 8 additions & 10 deletions packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -709,17 +709,15 @@ export class Agent {
return
}

await this.multiNetworks.mapNetworkMapped(
activeAllocations,
async ({ network, operator }, activeAllocations: Allocation[]) => {
if (network.specification.indexerOptions.enableDips) {
await operator.dipsManager!.acceptPendingProposals(
activeAllocations,
)
await operator.dipsManager!.collectAgreementPayments()
await this.multiNetworks.map(async ({ network, operator }) => {
if (network.specification.indexerOptions.enableDips) {
if (!operator.dipsManager) {
throw new Error('DipsManager is not available')
}
},
)

await operator.dipsManager.collectAgreementPayments()
}
})
},
)
}
Expand Down
8 changes: 8 additions & 0 deletions packages/indexer-agent/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,14 @@ export async function run(
new Operator(logger, indexerManagementClient, spec),
)

// Start DIPs background loops only on the long-running agent. The CLI and
// jest setups go through the same management-client path but don't want
// these timers running for their short-lived processes.
for (const operator of operators) {
operator.dipsManager?.startProposalAcceptanceLoop()
operator.dipsManager?.startAllocationSweepLoop()
}

// --------------------------------------------------------------------------------
// * The Agent itself
// --------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ export async function up({ context }: Context): Promise<void> {
// 1. Add 'dips' to the IndexingRules.decisionBasis enum.
// Skipped on fresh DBs — sequelize.sync() will create the enum already
// including 'dips' from the model definition. Existing prod DBs need this
// ALTER to add the value to a pre-existing enum type.
// ALTER to add the value to a pre-existing enum type. IF NOT EXISTS keeps
// the migration idempotent against DBs that already have the value from
// the now-deleted migration 19-add-dips-to-decision-basis.
if (await queryInterface.tableExists('IndexingRules')) {
logger.info(`Adding 'dips' to enum_IndexingRules_decisionBasis`)
await queryInterface.sequelize.query(
`ALTER TYPE "enum_IndexingRules_decisionBasis" ADD VALUE 'dips'`,
`ALTER TYPE "enum_IndexingRules_decisionBasis" ADD VALUE IF NOT EXISTS 'dips'`,
)
} else {
logger.debug(
Expand Down
3 changes: 3 additions & 0 deletions packages/indexer-common/src/indexer-management/allocations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ export class AllocationManager {
private pendingRcaModel: typeof PendingRcaProposal,
) {
if (this.network.specification.indexerOptions.enableDips) {
// Construction is intentionally side-effect-free: the agent owns
// when the accept/sweep loops start so short-lived consumers (CLI,
// jest setups) don't leak background timers.
this.dipsManager = new DipsManager(
this.logger,
this.models,
Expand Down
7 changes: 6 additions & 1 deletion packages/indexer-common/src/indexer-management/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,16 @@ export class NetworkMonitor {
if (!this.indexingPaymentsSubgraph) {
return false
}
// Only `Accepted` blocks unallocation. Once the agreement is canceled
// (by payer or by us), the on-chain agreement is already gone, so
// protecting the allocation only strands it. The agent's separate
// collectAgreementPayments loop handles any final collect on
// `CanceledByPayer` agreements independently of allocation lifetime.
const result = await this.indexingPaymentsSubgraph.checkedQuery(
gql`
query indexingAgreements($allocationId: Bytes!) {
indexingAgreements(
where: { allocationId: $allocationId, state_in: [Accepted, CanceledByPayer] }
where: { allocationId: $allocationId, state: Accepted }
first: 1
) {
id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import { PendingRcaConsumer } from '../pending-rca-consumer'
import { DecodedRcaProposal } from '../types'
import {
Allocation,
AllocationManager,
AllocationStatus,
IndexerManagementModels,
IndexingDecisionBasis,
Network,
SubgraphIdentifierType,
} from '@graphprotocol/indexer-common'

let logger: Logger
Expand Down Expand Up @@ -108,10 +111,17 @@ function createMockModels() {
findOne: jest.fn().mockResolvedValue(null),
findAll: jest.fn().mockResolvedValue([]),
destroy: jest.fn().mockResolvedValue(1),
upsert: jest.fn().mockResolvedValue([{ id: 1 }, true]),
},
} as unknown as IndexerManagementModels
}

function createMockParent() {
return {
matchingRuleExists: jest.fn().mockResolvedValue(false),
} as unknown as AllocationManager
}

function createMockNetwork() {
return {
contracts: {
Expand Down Expand Up @@ -168,11 +178,18 @@ function createDipsManager(
network: Network,
models: IndexerManagementModels,
consumer: PendingRcaConsumer,
parent: AllocationManager = createMockParent(),
offerMonitor?: { offerExists: jest.Mock },
): DipsManager {
const graphNode = { ensure: jest.fn().mockResolvedValue(undefined) }
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const dm = new DipsManager(logger, models, network, {} as any, null, {} as any)
const dm = new DipsManager(logger, models, network, graphNode as any, parent, {} as any)
// eslint-disable-next-line @typescript-eslint/no-explicit-any
;(dm as any).pendingRcaConsumer = consumer
if (offerMonitor !== undefined) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
;(dm as any).offerMonitor = offerMonitor
}
return dm
}

Expand Down Expand Up @@ -583,4 +600,172 @@ describe('DipsManager.acceptPendingProposals', () => {
expect(consumer.markAccepted).toHaveBeenCalledWith(proposal.id)
})
})

describe('rule creation ordering (race condition fix)', () => {
test('upserts the DIPS indexing rule before broadcasting acceptIndexingAgreement', async () => {
const proposal = createMockProposal()
const allocation = createMockAllocation()
const consumer = createMockConsumer([proposal])
const models = createMockModels()
const network = createMockNetwork()
;(network.transactionManager.executeTransaction as jest.Mock).mockResolvedValue({
hash: '0xtx',
status: 1,
})

const dm = createDipsManager(network, models, consumer)

await dm.acceptPendingProposals([allocation])

const upsertOrder = (models.IndexingRule.upsert as jest.Mock).mock
.invocationCallOrder[0]
const executeOrder = (network.transactionManager.executeTransaction as jest.Mock)
.mock.invocationCallOrder[0]

expect(upsertOrder).toBeDefined()
expect(executeOrder).toBeDefined()
expect(upsertOrder).toBeLessThan(executeOrder)
})

test('skips rule upsert and rejects proposal when deployment is blocklisted', async () => {
const proposal = createMockProposal()
const allocation = createMockAllocation()
const consumer = createMockConsumer([proposal])
;(consumer.getPendingProposalsForDeployment as jest.Mock).mockResolvedValue([])
const models = createMockModels()
;(models.IndexingRule.findAll as jest.Mock).mockResolvedValue([
{
identifier: proposal.subgraphDeploymentId.ipfsHash,
identifierType: SubgraphIdentifierType.DEPLOYMENT,
decisionBasis: IndexingDecisionBasis.NEVER,
},
])
const network = createMockNetwork()

const dm = createDipsManager(network, models, consumer)

await dm.acceptPendingProposals([allocation])

expect(consumer.markRejected).toHaveBeenCalledWith(
proposal.id,
'deployment blocklisted',
)
expect(models.IndexingRule.upsert).not.toHaveBeenCalled()
expect(network.transactionManager.executeTransaction).not.toHaveBeenCalled()
})

test('skips rule upsert when parent reports a matching rule already exists', async () => {
const proposal = createMockProposal()
const allocation = createMockAllocation()
const consumer = createMockConsumer([proposal])
const models = createMockModels()
const network = createMockNetwork()
;(network.transactionManager.executeTransaction as jest.Mock).mockResolvedValue({
hash: '0xtx',
status: 1,
})

const parent = {
matchingRuleExists: jest.fn().mockResolvedValue(true),
} as unknown as AllocationManager

const dm = createDipsManager(network, models, consumer, parent)

await dm.acceptPendingProposals([allocation])

expect(models.IndexingRule.upsert).not.toHaveBeenCalled()
expect(network.transactionManager.executeTransaction).toHaveBeenCalled()
expect(consumer.markAccepted).toHaveBeenCalledWith(proposal.id)
})
})

describe('offer-existence gate', () => {
test('stays pending when offer absent and deadline > now + safety margin', async () => {
const proposal = createMockProposal({
// 5 minutes from now — well beyond the 30s safety margin
deadline: BigInt(Math.floor(Date.now() / 1000) + 300),
})
const allocation = createMockAllocation()
const consumer = createMockConsumer([proposal])
const models = createMockModels()
const network = createMockNetwork()
const offerMonitor = { offerExists: jest.fn().mockResolvedValue(false) }

const dm = createDipsManager(network, models, consumer, undefined, offerMonitor)

await dm.acceptPendingProposals([allocation])

expect(offerMonitor.offerExists).toHaveBeenCalledWith(proposal.id)
expect(network.transactionManager.executeTransaction).not.toHaveBeenCalled()
expect(consumer.markRejected).not.toHaveBeenCalled()
expect(consumer.markAccepted).not.toHaveBeenCalled()
})

test('marks rejected when offer absent and deadline within safety margin', async () => {
const proposal = createMockProposal({
// 10 seconds from now — inside the 30s safety margin
deadline: BigInt(Math.floor(Date.now() / 1000) + 10),
})
const allocation = createMockAllocation()
const consumer = createMockConsumer([proposal])
const models = createMockModels()
const network = createMockNetwork()
const offerMonitor = { offerExists: jest.fn().mockResolvedValue(false) }

const dm = createDipsManager(network, models, consumer, undefined, offerMonitor)

await dm.acceptPendingProposals([allocation])

expect(offerMonitor.offerExists).toHaveBeenCalledWith(proposal.id)
expect(consumer.markRejected).toHaveBeenCalledWith(
proposal.id,
'offer_never_landed',
)
expect(network.transactionManager.executeTransaction).not.toHaveBeenCalled()
})

test('proceeds to acceptIndexingAgreement when offer is present', async () => {
const proposal = createMockProposal()
const allocation = createMockAllocation()
const consumer = createMockConsumer([proposal])
const models = createMockModels()
const network = createMockNetwork()
;(network.transactionManager.executeTransaction as jest.Mock).mockResolvedValue({
hash: '0xtxhash',
status: 1,
})
const offerMonitor = { offerExists: jest.fn().mockResolvedValue(true) }

const dm = createDipsManager(network, models, consumer, undefined, offerMonitor)

await dm.acceptPendingProposals([allocation])

expect(offerMonitor.offerExists).toHaveBeenCalledWith(proposal.id)
expect(network.transactionManager.executeTransaction).toHaveBeenCalled()
expect(consumer.markAccepted).toHaveBeenCalledWith(proposal.id)
})

test('bypasses gate when indexingPaymentsSubgraph is not configured', async () => {
const proposal = createMockProposal()
const allocation = createMockAllocation()
const consumer = createMockConsumer([proposal])
const models = createMockModels()
const network = createMockNetwork()
;(network.transactionManager.executeTransaction as jest.Mock).mockResolvedValue({
hash: '0xtxhash',
status: 1,
})

// No offerMonitor passed — DipsManager constructor sets it to null
// because createMockNetwork() does not define indexingPaymentsSubgraph.
const dm = createDipsManager(network, models, consumer)
// eslint-disable-next-line @typescript-eslint/no-explicit-any
expect((dm as any).offerMonitor).toBeNull()

await dm.acceptPendingProposals([allocation])

expect(network.transactionManager.executeTransaction).toHaveBeenCalled()
expect(consumer.markAccepted).toHaveBeenCalledWith(proposal.id)
})
})
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { createLogger } from '@graphprotocol/common-ts'
import { OfferMonitor } from '../offer-monitor'

const logger = createLogger({
name: 'OfferMonitor.test',
async: false,
level: 'error',
})

describe('OfferMonitor', () => {
it('converts UUID-format agreement ids to bytes16 hex before querying', async () => {
const query = jest.fn().mockResolvedValue({ data: { offer: { id: '0xabc' } } })
const subgraph = { query } as never
const monitor = new OfferMonitor(logger, subgraph)

const exists = await monitor.offerExists('bea99452-e465-e9d9-8a79-2356edcc7e92')

expect(exists).toBe(true)
expect(query).toHaveBeenCalledTimes(1)
expect(query.mock.calls[0][1]).toEqual({
id: '0xbea99452e465e9d98a792356edcc7e92',
})
})

it('passes through already-hex ids unchanged (lowercased)', async () => {
const query = jest.fn().mockResolvedValue({ data: { offer: { id: '0xabc' } } })
const subgraph = { query } as never
const monitor = new OfferMonitor(logger, subgraph)

await monitor.offerExists('0xBEA99452E465E9D98A792356EDCC7E92')

expect(query.mock.calls[0][1]).toEqual({
id: '0xbea99452e465e9d98a792356edcc7e92',
})
})

it('returns false when the subgraph reports the offer is missing', async () => {
const query = jest.fn().mockResolvedValue({ data: { offer: null } })
const subgraph = { query } as never
const monitor = new OfferMonitor(logger, subgraph)

const exists = await monitor.offerExists('bea99452-e465-e9d9-8a79-2356edcc7e92')

expect(exists).toBe(false)
})

it('treats subgraph errors as transient (not yet on-chain)', async () => {
const query = jest.fn().mockResolvedValue({ error: new Error('subgraph hiccup') })
const subgraph = { query } as never
const monitor = new OfferMonitor(logger, subgraph)

const exists = await monitor.offerExists('bea99452-e465-e9d9-8a79-2356edcc7e92')

expect(exists).toBe(false)
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ function encodeTestPayload(overrides?: {
[
{
subgraphDeploymentId: TEST_DEPLOYMENT_BYTES32,
version: 1n,
version: 0n,
terms: termsEncoded,
},
],
Expand Down
Loading
Loading