diff --git a/apps/ensindexer/package.json b/apps/ensindexer/package.json index 1db3a2711..0f22cb35c 100644 --- a/apps/ensindexer/package.json +++ b/apps/ensindexer/package.json @@ -29,6 +29,7 @@ "@ensnode/ensnode-sdk": "workspace:*", "@ensnode/ensrainbow-sdk": "workspace:*", "@ensnode/ponder-metadata": "workspace:*", + "@ensnode/ponder-sdk": "workspace:*", "caip": "catalog:", "date-fns": "catalog:", "deepmerge-ts": "^7.1.5", diff --git a/apps/ensindexer/src/lib/indexing-status-builder/chain-block-refs.ts b/apps/ensindexer/src/lib/indexing-status-builder/chain-block-refs.ts new file mode 100644 index 000000000..b4d54e595 --- /dev/null +++ b/apps/ensindexer/src/lib/indexing-status-builder/chain-block-refs.ts @@ -0,0 +1,121 @@ +import type { PublicClient } from "viem"; + +import { bigIntToNumber, deserializeBlockRef } from "@ensnode/ensnode-sdk"; +import type { + BlockNumber, + BlockRef, + Blockrange, + ChainId, + ChainIndexingMetrics, +} from "@ensnode/ponder-sdk"; + +/** + * Fetch block ref from RPC. + * + * @param publicClient for a chain + * @param blockNumber + * + * @throws error if data validation fails. + */ +async function fetchBlockRef( + publicClient: PublicClient, + blockNumber: BlockNumber, +): Promise { + try { + const block = await publicClient.getBlock({ blockNumber: BigInt(blockNumber) }); + + return deserializeBlockRef({ + number: bigIntToNumber(block.number), + timestamp: bigIntToNumber(block.timestamp), + }); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : "Unknown error"; + throw new Error(`Failed to fetch block ref for block number ${blockNumber}: ${errorMessage}`); + } +} + +/** + * Chain Block Refs + * + * Represents information about indexing scope for an indexed chain. + */ +export interface ChainBlockRefs { + /** + * Based on Ponder Configuration + */ + config: { + startBlock: BlockRef; + + endBlock: BlockRef | null; + }; + + /** + * Based on Ponder runtime metrics + */ + backfillEndBlock: BlockRef; +} + +/** + * Get {@link IndexedChainBlockRefs} for indexed chains. + * + * Guaranteed to include {@link ChainBlockRefs} for each indexed chain. + */ +export async function getChainsBlockRefs( + chainIds: ChainId[], + chainsConfigBlockrange: Record, + chainsIndexingMetrics: Map, + publicClients: Record, +): Promise> { + const chainsBlockRefs = new Map(); + + for (const chainId of chainIds) { + const blockrange = chainsConfigBlockrange[chainId]; + const startBlock = blockrange?.startBlock; + const endBlock = blockrange?.endBlock; + const publicClient = publicClients[chainId]; + const indexingMetrics = chainsIndexingMetrics.get(chainId); + + if (typeof startBlock !== "number") { + throw new Error(`startBlock not found for chain ${chainId}`); + } + + if (typeof publicClient === "undefined") { + throw new Error(`publicClient not found for chain ${chainId}`); + } + + if (typeof indexingMetrics === "undefined") { + throw new Error(`indexingMetrics not found for chain ${chainId}`); + } + + const historicalTotalBlocks = indexingMetrics.backfillSyncBlocksTotal; + + if (typeof historicalTotalBlocks !== "number") { + throw new Error(`No historical total blocks metric found for chain ${chainId}`); + } + + const backfillEndBlock = startBlock + historicalTotalBlocks - 1; + + try { + // fetch relevant block refs using RPC + const [startBlockRef, endBlockRef, backfillEndBlockRef] = await Promise.all([ + fetchBlockRef(publicClient, startBlock), + endBlock ? fetchBlockRef(publicClient, endBlock) : null, + fetchBlockRef(publicClient, backfillEndBlock), + ]); + + const chainBlockRef = { + config: { + startBlock: startBlockRef, + endBlock: endBlockRef, + }, + backfillEndBlock: backfillEndBlockRef, + } satisfies ChainBlockRefs; + + chainsBlockRefs.set(chainId, chainBlockRef); + } catch { + throw new Error(`Could not get BlockRefs for chain ${chainId}`); + } + } + + return chainsBlockRefs; +} diff --git a/apps/ensindexer/src/lib/indexing-status-builder/chain-indexing-status-snapshot.ts b/apps/ensindexer/src/lib/indexing-status-builder/chain-indexing-status-snapshot.ts new file mode 100644 index 000000000..376ac0115 --- /dev/null +++ b/apps/ensindexer/src/lib/indexing-status-builder/chain-indexing-status-snapshot.ts @@ -0,0 +1,154 @@ +import { + ChainIndexingConfigTypeIds, + ChainIndexingStatusIds, + type ChainIndexingStatusSnapshot, + type ChainIndexingStatusSnapshotBackfill, + type ChainIndexingStatusSnapshotCompleted, + type ChainIndexingStatusSnapshotFollowing, + type ChainIndexingStatusSnapshotQueued, + createIndexingConfig, +} from "@ensnode/ensnode-sdk"; +import { + type ChainId, + type ChainIndexingMetrics, + type ChainIndexingStatus, + isBlockRefEqualTo, +} from "@ensnode/ponder-sdk"; + +import type { ChainBlockRefs } from "./chain-block-refs"; +import { validateChainIndexingStatusSnapshot } from "./validate/chain-indexing-status-snapshot"; + +/** + * Build Chain Indexing Status Snapshot + * + * Builds {@link ChainIndexingStatusSnapshot} for a chain based on: + * - block refs based on chain configuration and RPC data, + * - current indexing status, + * - current indexing metrics. + */ +export function buildChainIndexingStatusSnapshot( + chainId: ChainId, + chainBlockRefs: ChainBlockRefs, + chainIndexingMetrics: ChainIndexingMetrics, + chainIndexingStatus: ChainIndexingStatus, +): ChainIndexingStatusSnapshot { + const { checkpointBlock } = chainIndexingStatus; + const config = createIndexingConfig( + chainBlockRefs.config.startBlock, + chainBlockRefs.config.endBlock, + ); + + // TODO: Use `ChainIndexingMetrics` data model from PR #1612. + // This updated data model includes `type` field to distinguish + // between different chain indexing phases, for example: + // Queued, Backfill, Realtime, Completed. + + // In omnichain ordering, if the startBlock is the same as the + // status block, the chain has not started yet. + if (isBlockRefEqualTo(chainBlockRefs.config.startBlock, checkpointBlock)) { + return validateChainIndexingStatusSnapshot({ + chainStatus: ChainIndexingStatusIds.Queued, + config, + } satisfies ChainIndexingStatusSnapshotQueued); + } + + if (chainIndexingMetrics.indexingCompleted) { + // TODO: move that invariant to validation schema + if (config.configType !== ChainIndexingConfigTypeIds.Definite) { + throw new Error( + `The '${ChainIndexingStatusIds.Completed}' indexing status for chain ID '${chainId}' can be only created with the '${ChainIndexingConfigTypeIds.Definite}' indexing config type.`, + ); + } + + return validateChainIndexingStatusSnapshot({ + chainStatus: ChainIndexingStatusIds.Completed, + latestIndexedBlock: checkpointBlock, + config, + } satisfies ChainIndexingStatusSnapshotCompleted); + } + + if (chainIndexingMetrics.indexingRealtime) { + // TODO: move that invariant to validation schema + if (config.configType !== ChainIndexingConfigTypeIds.Indefinite) { + throw new Error( + `The '${ChainIndexingStatusIds.Following}' indexing status for chain ID '${chainId}' can be only created with the '${ChainIndexingConfigTypeIds.Indefinite}' indexing config type.`, + ); + } + + return validateChainIndexingStatusSnapshot({ + chainStatus: ChainIndexingStatusIds.Following, + latestIndexedBlock: checkpointBlock, + latestKnownBlock: chainIndexingMetrics.latestSyncedBlock, + config: { + configType: config.configType, + startBlock: config.startBlock, + }, + } satisfies ChainIndexingStatusSnapshotFollowing); + } + + return validateChainIndexingStatusSnapshot({ + chainStatus: ChainIndexingStatusIds.Backfill, + latestIndexedBlock: checkpointBlock, + backfillEndBlock: chainBlockRefs.backfillEndBlock, + config, + } satisfies ChainIndexingStatusSnapshotBackfill); +} + +/** + * Build Chain Indexing Status Snapshots + * + * Builds {@link ChainIndexingStatusSnapshot} for each indexed chain based on: + * - block refs based on chain configuration and RPC data, + * - current indexing status, + * - current indexing metrics. + * + * @param indexedChainIds list of indexed chain IDs to build snapshots for. + * @param chainsBlockRefs block refs for indexed chains. + * @param chainsIndexingMetrics indexing metrics for indexed chains. + * @param chainsIndexingStatus indexing status for indexed chains. + * + * @returns record of {@link ChainIndexingStatusSnapshot} keyed by chain ID. + * + * @throws error if any of the required data is missing or if data validation fails. + */ +export function buildChainIndexingStatusSnapshots( + indexedChainIds: ChainId[], + chainsBlockRefs: Map, + chainsIndexingMetrics: Map, + chainsIndexingStatus: Map, +): Map { + const chainStatusSnapshots = new Map(); + + // Build chain indexing status snapshot for each indexed chain. + for (const chainId of indexedChainIds) { + const chainBlockRefs = chainsBlockRefs.get(chainId); + const chainIndexingStatus = chainsIndexingStatus.get(chainId); + const chainIndexingMetrics = chainsIndexingMetrics.get(chainId); + + // Invariant: block refs must be defined for the chain + if (!chainBlockRefs) { + throw new Error(`Block refs must be defined for chain ID ${chainId}`); + } + + // Invariant: chainIndexingStatus must be defined for the chain + if (!chainIndexingStatus) { + throw new Error(`Indexing status must be defined for chain ID ${chainId}`); + } + + // Invariant: chainIndexingMetrics must be defined for the chain + if (!chainIndexingMetrics) { + throw new Error(`Indexing metrics must be defined for chain ID ${chainId}`); + } + + const chainStatusSnapshot = buildChainIndexingStatusSnapshot( + chainId, + chainBlockRefs, + chainIndexingMetrics, + chainIndexingStatus, + ); + + chainStatusSnapshots.set(chainId, chainStatusSnapshot); + } + + return chainStatusSnapshots; +} diff --git a/apps/ensindexer/src/lib/indexing-status-builder/corss-chain-indexing-status-snapshot.ts b/apps/ensindexer/src/lib/indexing-status-builder/corss-chain-indexing-status-snapshot.ts new file mode 100644 index 000000000..6a06db9cd --- /dev/null +++ b/apps/ensindexer/src/lib/indexing-status-builder/corss-chain-indexing-status-snapshot.ts @@ -0,0 +1,18 @@ +import { + type CrossChainIndexingStatusSnapshotOmnichain, + CrossChainIndexingStrategyIds, + type OmnichainIndexingStatusSnapshot, +} from "@ensnode/ensnode-sdk"; +import type { UnixTimestamp } from "@ensnode/ponder-sdk"; + +export function buildCrossChainIndexingStatusSnapshotOmnichain( + omnichainSnapshot: OmnichainIndexingStatusSnapshot, + snapshotTime: UnixTimestamp, +): CrossChainIndexingStatusSnapshotOmnichain { + return { + strategy: CrossChainIndexingStrategyIds.Omnichain, + slowestChainIndexingCursor: omnichainSnapshot.omnichainIndexingCursor, + snapshotTime, + omnichainSnapshot, + }; +} diff --git a/apps/ensindexer/src/lib/indexing-status-builder/omnichain-indexing-status-snapshot.ts b/apps/ensindexer/src/lib/indexing-status-builder/omnichain-indexing-status-snapshot.ts new file mode 100644 index 000000000..ef167d000 --- /dev/null +++ b/apps/ensindexer/src/lib/indexing-status-builder/omnichain-indexing-status-snapshot.ts @@ -0,0 +1,68 @@ +import { + type ChainIndexingStatusSnapshotBackfill, + type ChainIndexingStatusSnapshotCompleted, + type ChainIndexingStatusSnapshotQueued, + getOmnichainIndexingCursor, + getOmnichainIndexingStatus, + OmnichainIndexingStatusIds, + type OmnichainIndexingStatusSnapshot, + type OmnichainIndexingStatusSnapshotBackfill, + type OmnichainIndexingStatusSnapshotCompleted, + type OmnichainIndexingStatusSnapshotFollowing, + type OmnichainIndexingStatusSnapshotUnstarted, +} from "@ensnode/ensnode-sdk"; +import type { ChainId, PonderIndexingMetrics, PonderIndexingStatus } from "@ensnode/ponder-sdk"; + +import type { ChainBlockRefs } from "./chain-block-refs"; +import { buildChainIndexingStatusSnapshots } from "./chain-indexing-status-snapshot"; +import { validateOmnichainIndexingStatusSnapshot } from "./validate/omnichain-indexing-status-snapshot"; + +export function buildOmnichainIndexingStatusSnapshot( + indexedChainIds: ChainId[], + chainsBlockRefs: Map, + ponderIndexingMetrics: PonderIndexingMetrics, + ponderIndexingStatus: PonderIndexingStatus, +): OmnichainIndexingStatusSnapshot { + const chainStatusSnapshots = buildChainIndexingStatusSnapshots( + indexedChainIds, + chainsBlockRefs, + ponderIndexingMetrics.chains, + ponderIndexingStatus.chains, + ); + const chains = Array.from(chainStatusSnapshots.values()); + const omnichainStatus = getOmnichainIndexingStatus(chains); + const omnichainIndexingCursor = getOmnichainIndexingCursor(chains); + + switch (omnichainStatus) { + case OmnichainIndexingStatusIds.Unstarted: { + return validateOmnichainIndexingStatusSnapshot({ + omnichainStatus: OmnichainIndexingStatusIds.Unstarted, + chains: chainStatusSnapshots as Map, // narrowing the type here, will be validated in the following 'check' step + omnichainIndexingCursor, + } satisfies OmnichainIndexingStatusSnapshotUnstarted); + } + + case OmnichainIndexingStatusIds.Backfill: { + return validateOmnichainIndexingStatusSnapshot({ + omnichainStatus: OmnichainIndexingStatusIds.Backfill, + chains: chainStatusSnapshots as Map, // narrowing the type here, will be validated in the following 'check' step + omnichainIndexingCursor, + } satisfies OmnichainIndexingStatusSnapshotBackfill); + } + + case OmnichainIndexingStatusIds.Completed: { + return validateOmnichainIndexingStatusSnapshot({ + omnichainStatus: OmnichainIndexingStatusIds.Completed, + chains: chainStatusSnapshots as Map, // narrowing the type here, will be validated in the following 'check' step + omnichainIndexingCursor, + } satisfies OmnichainIndexingStatusSnapshotCompleted); + } + + case OmnichainIndexingStatusIds.Following: + return validateOmnichainIndexingStatusSnapshot({ + omnichainStatus: OmnichainIndexingStatusIds.Following, + chains: chainStatusSnapshots, + omnichainIndexingCursor, + } satisfies OmnichainIndexingStatusSnapshotFollowing); + } +} diff --git a/apps/ensindexer/src/lib/indexing-status-builder/validate/chain-indexing-status-snapshot.ts b/apps/ensindexer/src/lib/indexing-status-builder/validate/chain-indexing-status-snapshot.ts new file mode 100644 index 000000000..0e4fd1e9c --- /dev/null +++ b/apps/ensindexer/src/lib/indexing-status-builder/validate/chain-indexing-status-snapshot.ts @@ -0,0 +1,7 @@ +import type { ChainIndexingStatusSnapshot } from "@ensnode/ensnode-sdk"; + +export function validateChainIndexingStatusSnapshot( + unvalidatedSnapshot: ChainIndexingStatusSnapshot, +): ChainIndexingStatusSnapshot { + return unvalidatedSnapshot; +} diff --git a/apps/ensindexer/src/lib/indexing-status-builder/validate/omnichain-indexing-status-snapshot.ts b/apps/ensindexer/src/lib/indexing-status-builder/validate/omnichain-indexing-status-snapshot.ts new file mode 100644 index 000000000..20fd23a92 --- /dev/null +++ b/apps/ensindexer/src/lib/indexing-status-builder/validate/omnichain-indexing-status-snapshot.ts @@ -0,0 +1,7 @@ +import type { OmnichainIndexingStatusSnapshot } from "@ensnode/ensnode-sdk"; + +export function validateOmnichainIndexingStatusSnapshot( + unvalidatedSnapshot: OmnichainIndexingStatusSnapshot, +): OmnichainIndexingStatusSnapshot { + return unvalidatedSnapshot; +} diff --git a/packages/ponder-sdk/src/blocks.ts b/packages/ponder-sdk/src/blocks.ts index 05d985073..7f072c855 100644 --- a/packages/ponder-sdk/src/blocks.ts +++ b/packages/ponder-sdk/src/blocks.ts @@ -25,3 +25,70 @@ export const schemaBlockRef = z.object({ * Reference to a block. */ export type BlockRef = z.infer; + +/** + * Compare two {@link BlockRef} object to check + * if blockA is before blockB. + */ +export function isBlockRefBefore(blockA: BlockRef, blockB: BlockRef) { + return blockA.number < blockB.number && blockA.timestamp < blockB.timestamp; +} + +/** + * Compare two {@link BlockRef} object to check + * if blockA is equal to blockB. + */ +export function isBlockRefEqualTo(blockA: BlockRef, blockB: BlockRef) { + return blockA.number === blockB.number && blockA.timestamp === blockB.timestamp; +} + +/** + * Compare two {@link BlockRef} object to check + * if blockA is before or equal to blockB. + */ +export function isBlockRefBeforeOrEqualTo(blockA: BlockRef, blockB: BlockRef) { + return isBlockRefBefore(blockA, blockB) || isBlockRefEqualTo(blockA, blockB); +} + +/** + * Block range + * + * Represents a range of blocks + */ +export interface Blockrange { + /** + * Start block number + * + * Guaranteed to be lower than `endBlock` when both are present. + */ + startBlock?: BlockNumber; + + /** + * End block number + * + * Guaranteed to be greater than `startBlock` when both are present. + */ + endBlock?: BlockNumber; +} + +/** + * Block range with required start block + * + * Represents a range of blocks where the start block is required and the end + * block is optional. + */ +export interface BlockrangeWithStartBlock { + /** + * Start block number + * + * Guaranteed to be lower than `endBlock` when both are present. + */ + startBlock: BlockNumber; + + /** + * End block number + * + * Guaranteed to be greater than `startBlock` when both are present. + */ + endBlock?: BlockNumber; +} diff --git a/packages/ponder-sdk/src/deserialize/indexing-status.ts b/packages/ponder-sdk/src/deserialize/indexing-status.ts index 9f998c160..fa85060e2 100644 --- a/packages/ponder-sdk/src/deserialize/indexing-status.ts +++ b/packages/ponder-sdk/src/deserialize/indexing-status.ts @@ -9,11 +9,10 @@ import { prettifyError, z } from "zod/v4"; import type { ParsePayload } from "zod/v4/core"; -import type { BlockRef } from "../blocks"; import { schemaBlockRef } from "../blocks"; import type { ChainId } from "../chains"; import { schemaChainId } from "../chains"; -import type { PonderIndexingStatus } from "../indexing-status"; +import type { ChainIndexingStatus, PonderIndexingStatus } from "../indexing-status"; const schemaSerializedChainName = z.string(); @@ -54,10 +53,10 @@ export type SerializedPonderIndexingStatus = z.infer(); + const chains = new Map(); for (const [, chainData] of Object.entries(data)) { - chains.set(chainData.id, chainData.block); + chains.set(chainData.id, { checkpointBlock: chainData.block }); } return { diff --git a/packages/ponder-sdk/src/index.ts b/packages/ponder-sdk/src/index.ts index 683a3b2e3..6c161e332 100644 --- a/packages/ponder-sdk/src/index.ts +++ b/packages/ponder-sdk/src/index.ts @@ -1,3 +1,7 @@ +export * from "./blocks"; +export * from "./chains"; export * from "./client"; export * from "./indexing-metrics"; export * from "./indexing-status"; +export * from "./numbers"; +export * from "./time"; diff --git a/packages/ponder-sdk/src/indexing-status.ts b/packages/ponder-sdk/src/indexing-status.ts index 8bd444b4d..8305e930c 100644 --- a/packages/ponder-sdk/src/indexing-status.ts +++ b/packages/ponder-sdk/src/indexing-status.ts @@ -1,6 +1,23 @@ import type { BlockRef } from "./blocks"; import type { ChainId } from "./chains"; +/** + * Chain Indexing Status + * + * Represents the indexing status for a specific chain in a Ponder app. + */ +export interface ChainIndexingStatus { + /** + * Checkpoint Block + * + * During omnichain indexing, a Ponder app indexes the chain and keeps track of the latest indexed block for each chain. + * This is represented by the `checkpointBlock` property, which is a reference to either: + * - the first block to be indexed for the chain (if indexing is queued), or + * - the last indexed block for the chain (if indexing is in progress). + */ + checkpointBlock: BlockRef; +} + /** * Ponder Indexing Status * @@ -12,9 +29,6 @@ export interface PonderIndexingStatus { * * Guarantees: * - Includes entry for at least one indexed chain. - * - BlockRef corresponds to either: - * - The first block to be indexed (when chain indexing is currently queued). - * - The last indexed block (when chain indexing is currently in progress). */ - chains: Map; + chains: Map; } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 04a45864e..e85c8012e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -459,6 +459,9 @@ importers: '@ensnode/ponder-metadata': specifier: workspace:* version: link:../../packages/ponder-metadata + '@ensnode/ponder-sdk': + specifier: workspace:* + version: link:../../packages/ponder-sdk caip: specifier: 'catalog:' version: 1.1.1