diff --git a/CHANGELOG.md b/CHANGELOG.md index c2eb240f2..a34052abc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ### Added +- Added Mempool sorting [#395](https://github.com/proto-kit/framework/pull/395) - Introduced block explorer [#381](https://github.com/proto-kit/framework/pull/381) - Added CircuitAnalysisModule for easy analysis of protocol circuits [#379](https://github.com/proto-kit/framework/pull/379) - Separated settlement and bridging functionally, so now settlement can be used without bridging [#376](https://github.com/proto-kit/framework/pull/376) diff --git a/packages/indexer/src/tasks/IndexPendingTxTask.ts b/packages/indexer/src/tasks/IndexPendingTxTask.ts index db468660d..9d448c6a7 100644 --- a/packages/indexer/src/tasks/IndexPendingTxTask.ts +++ b/packages/indexer/src/tasks/IndexPendingTxTask.ts @@ -30,7 +30,7 @@ export class IndexPendingTxTask public async compute(input: PendingTransaction): Promise { try { - await this.transactionStorage.pushUserTransaction(input); + await this.transactionStorage.pushUserTransaction(input, 0); return ""; } catch (err) { log.error("Failed to process pending tx task", err); diff --git a/packages/persistance/prisma/migrations/20260117134313_transaction_priority/migration.sql b/packages/persistance/prisma/migrations/20260117134313_transaction_priority/migration.sql new file mode 100644 index 000000000..e04752925 --- /dev/null +++ b/packages/persistance/prisma/migrations/20260117134313_transaction_priority/migration.sql @@ -0,0 +1,10 @@ +-- CreateTable +CREATE TABLE "TransactionPriority" ( + "transactionHash" TEXT NOT NULL, + "priority" BIGINT NOT NULL, + + CONSTRAINT "TransactionPriority_pkey" PRIMARY KEY ("transactionHash") +); + +-- AddForeignKey +ALTER TABLE "TransactionPriority" ADD CONSTRAINT "TransactionPriority_transactionHash_fkey" FOREIGN KEY ("transactionHash") REFERENCES "Transaction"("hash") ON DELETE RESTRICT ON UPDATE CASCADE; diff --git a/packages/persistance/prisma/schema.prisma b/packages/persistance/prisma/schema.prisma index d0ed1472e..a5abc4338 100644 --- a/packages/persistance/prisma/schema.prisma +++ b/packages/persistance/prisma/schema.prisma @@ -54,6 +54,18 @@ model Transaction { executionResult TransactionExecutionResult? IncomingMessageBatchTransaction IncomingMessageBatchTransaction[] + + priority TransactionPriority? +} + +model TransactionPriority { + transactionHash String + + priority BigInt + + Transaction Transaction @relation(fields: [transactionHash], references: [hash]) + + @@id([transactionHash]) } model TransactionExecutionResult { diff --git a/packages/persistance/src/services/prisma/PrismaTransactionStorage.ts b/packages/persistance/src/services/prisma/PrismaTransactionStorage.ts index 49256eef0..8f49f0384 100644 --- a/packages/persistance/src/services/prisma/PrismaTransactionStorage.ts +++ b/packages/persistance/src/services/prisma/PrismaTransactionStorage.ts @@ -34,6 +34,11 @@ export class PrismaTransactionStorage implements TransactionStorage { equals: false, }, }, + orderBy: { + priority: { + priority: "desc", + }, + }, skip: offset, take: limit, }); @@ -56,13 +61,27 @@ export class PrismaTransactionStorage implements TransactionStorage { } } - public async pushUserTransaction(tx: PendingTransaction): Promise { + public async pushUserTransaction( + tx: PendingTransaction, + priority: number + ): Promise { const { prismaClient } = this.connection; - const result = await prismaClient.transaction.createMany({ - data: [this.transactionMapper.mapOut(tx)], - skipDuplicates: true, - }); + const transactionData = this.transactionMapper.mapOut(tx); + + const [result] = await prismaClient.$transaction([ + prismaClient.transaction.createMany({ + data: [transactionData], + skipDuplicates: true, + }), + + prismaClient.transactionPriority.create({ + data: { + priority, + transactionHash: transactionData.hash, + }, + }), + ]); return result.count === 1; } diff --git a/packages/persistance/test-integration/PrismaBlockProduction.test.ts b/packages/persistance/test-integration/PrismaBlockProduction.test.ts index f6299f6b1..e9d335156 100644 --- a/packages/persistance/test-integration/PrismaBlockProduction.test.ts +++ b/packages/persistance/test-integration/PrismaBlockProduction.test.ts @@ -251,7 +251,7 @@ describe("prisma integration", () => { PrismaTransactionStorage ); - const txs = await txResolver.getPendingUserTransactions(); + const txs = await txResolver.getPendingUserTransactions(0); expectDefined(transaction.transaction); diff --git a/packages/sdk/test/fees-multi-zkprograms.test.ts b/packages/sdk/test/fees-multi-zkprograms.test.ts index 53d4389f9..e6921b708 100644 --- a/packages/sdk/test/fees-multi-zkprograms.test.ts +++ b/packages/sdk/test/fees-multi-zkprograms.test.ts @@ -185,9 +185,7 @@ describe("check fee analyzer", () => { }, }, Sequencer: { - Mempool: { - validationEnabled: true, - }, + Mempool: {}, }, }); diff --git a/packages/sequencer/src/index.ts b/packages/sequencer/src/index.ts index a0fee0179..5821284bf 100644 --- a/packages/sequencer/src/index.ts +++ b/packages/sequencer/src/index.ts @@ -3,6 +3,8 @@ export * from "./mempool/Mempool"; export * from "./mempool/PendingTransaction"; export * from "./mempool/CompressedSignature"; export * from "./mempool/private/PrivateMempool"; +export * from "./mempool/sorting/MempoolSorting"; +export * from "./mempool/sorting/DefaultMempoolSorting"; export * from "./sequencer/executor/Sequencer"; export * from "./sequencer/executor/Sequenceable"; export * from "./sequencer/SequencerIdProvider"; diff --git a/packages/sequencer/src/mempool/private/PrivateMempool.ts b/packages/sequencer/src/mempool/private/PrivateMempool.ts index 3df1225c6..c2e3c4e14 100644 --- a/packages/sequencer/src/mempool/private/PrivateMempool.ts +++ b/packages/sequencer/src/mempool/private/PrivateMempool.ts @@ -12,20 +12,38 @@ import { TransactionValidator } from "../verification/TransactionValidator"; import { Tracer } from "../../logging/Tracer"; import { trace } from "../../logging/trace"; import { IncomingMessagesService } from "../../settlement/messages/IncomingMessagesService"; +import { MempoolSorting } from "../sorting/MempoolSorting"; +import { DefaultMempoolSorting } from "../sorting/DefaultMempoolSorting"; + +type PrivateMempoolConfig = { + type?: "hybrid" | "private" | "based"; +}; @sequencerModule() -export class PrivateMempool extends SequencerModule implements Mempool { +export class PrivateMempool + extends SequencerModule + implements Mempool +{ public readonly events = new EventEmitter(); + private readonly mempoolSorting: MempoolSorting; + public constructor( private readonly transactionValidator: TransactionValidator, @inject("TransactionStorage") private readonly transactionStorage: TransactionStorage, @inject("IncomingMessagesService", { isOptional: true }) private readonly messageService: IncomingMessagesService | undefined, - @inject("Tracer") public readonly tracer: Tracer + @inject("Tracer") public readonly tracer: Tracer, + @inject("MempoolSorting", { isOptional: true }) + mempoolSorting: MempoolSorting | undefined ) { super(); + this.mempoolSorting = mempoolSorting ?? new DefaultMempoolSorting(); + } + + private type() { + return this.config.type ?? "hybrid"; } public async length(): Promise { @@ -36,7 +54,12 @@ export class PrivateMempool extends SequencerModule implements Mempool { public async add(tx: PendingTransaction): Promise { const [txValid, error] = this.transactionValidator.validateTx(tx); if (txValid) { - const success = await this.transactionStorage.pushUserTransaction(tx); + const sortingValue = this.mempoolSorting!.presortingPriority(tx); + + const success = await this.transactionStorage.pushUserTransaction( + tx, + sortingValue + ); if (success) { this.events.emit("mempool-transaction-added", tx); log.trace(`Transaction added to mempool: ${tx.hash().toString()}`); @@ -69,14 +92,27 @@ export class PrivateMempool extends SequencerModule implements Mempool { offset?: number, limit?: number ): Promise { - return await this.transactionStorage.getPendingUserTransactions( + if (this.type() === "based") { + return []; + } + + let txs = await this.transactionStorage.getPendingUserTransactions( offset ?? 0, limit ); + + if (this.mempoolSorting.enablePostSorting()) { + txs = this.mempoolSorting.postSorting(txs); + } + + return txs; } @trace("mempool.get_mandatory_txs") public async getMandatoryTxs(): Promise { + if (this.type() === "private") { + return []; + } return (await this.messageService?.getPendingMessages()) ?? []; } diff --git a/packages/sequencer/src/mempool/sorting/DefaultMempoolSorting.ts b/packages/sequencer/src/mempool/sorting/DefaultMempoolSorting.ts new file mode 100644 index 000000000..d8666ad7e --- /dev/null +++ b/packages/sequencer/src/mempool/sorting/DefaultMempoolSorting.ts @@ -0,0 +1,32 @@ +import { noop } from "@proto-kit/common"; + +import { PendingTransaction } from "../PendingTransaction"; +import { + SequencerModule, + sequencerModule, +} from "../../sequencer/builder/SequencerModule"; + +import { MempoolSorting } from "./MempoolSorting"; + +@sequencerModule() +export class DefaultMempoolSorting + extends SequencerModule + implements MempoolSorting +{ + public async start() { + noop(); + } + + public enablePostSorting(): boolean { + return false; + } + + public postSorting(transactions: PendingTransaction[]): PendingTransaction[] { + return transactions; + } + + public presortingPriority(tx: PendingTransaction): number { + // This means we order by first in, first out in the db + return Date.UTC(2500, 0) - Date.now(); + } +} diff --git a/packages/sequencer/src/mempool/sorting/MempoolSorting.ts b/packages/sequencer/src/mempool/sorting/MempoolSorting.ts new file mode 100644 index 000000000..fde5370e9 --- /dev/null +++ b/packages/sequencer/src/mempool/sorting/MempoolSorting.ts @@ -0,0 +1,26 @@ +import { PendingTransaction } from "../PendingTransaction"; + +export interface MempoolSorting { + /** + * Presorting happens on the backend (i.e. the DB), before the data travels to the sequencer. + * It's very fast, but limited to only integer sorting. + * The value returned here has to be static per transaction, since it will be sorted and + * compared on the DB-side. + * + * @param tx + * @returns Priority of the transaction - larger is better (therefore will be + * put in the block first) + */ + presortingPriority(tx: PendingTransaction): number; + + /** + * Indicate whether to do pre-sorting (as it's expensive depending on your block size) + */ + enablePostSorting(): boolean; + + /** + * Postsorting happens on the sequencer-side. It's less fast but can take in any two + * transactions and directly compare them based on arbitrary logic + */ + postSorting(transactions: PendingTransaction[]): PendingTransaction[]; +} diff --git a/packages/sequencer/src/storage/inmemory/InMemoryTransactionStorage.ts b/packages/sequencer/src/storage/inmemory/InMemoryTransactionStorage.ts index 10cadc0c9..90732770c 100644 --- a/packages/sequencer/src/storage/inmemory/InMemoryTransactionStorage.ts +++ b/packages/sequencer/src/storage/inmemory/InMemoryTransactionStorage.ts @@ -9,7 +9,7 @@ import { InMemoryBatchStorage } from "./InMemoryBatchStorage"; @injectable() export class InMemoryTransactionStorage implements TransactionStorage { - private queue: PendingTransaction[] = []; + private queue: { tx: PendingTransaction; sortingValue: number }[] = []; private latestScannedBlock = -1; @@ -21,12 +21,17 @@ export class InMemoryTransactionStorage implements TransactionStorage { public async removeTx(hashes: string[]) { const hashSet = new Set(hashes); - this.queue = this.queue.filter((tx) => { + this.queue = this.queue.filter(({ tx }) => { const hash = tx.hash().toString(); return !hashSet.has(hash); }); } + private sortQueue() { + // Sort in-place and descending + this.queue.sort(({ sortingValue: a }, { sortingValue: b }) => b - a); + } + public async getPendingUserTransactions( offset: number, limit?: number @@ -42,25 +47,30 @@ export class InMemoryTransactionStorage implements TransactionStorage { if (block !== undefined) { const hashes = block.transactions.map((tx) => tx.tx.hash().toString()); this.queue = this.queue.filter( - (tx) => !hashes.includes(tx.hash().toString()) + ({ tx }) => !hashes.includes(tx.hash().toString()) ); } } this.latestScannedBlock = nextHeight - 1; + this.sortQueue(); + const from = offset ?? 0; const to = limit !== undefined ? from + limit : undefined; - return this.queue.slice(from, to); + return this.queue.slice(from, to).map(({ tx }) => tx); } - public async pushUserTransaction(tx: PendingTransaction): Promise { + public async pushUserTransaction( + tx: PendingTransaction, + priority: number + ): Promise { const notInQueue = this.queue.find( - (tx2) => tx2.hash().toString() === tx.hash().toString() + ({ tx: tx2 }) => tx2.hash().toString() === tx.hash().toString() ) === undefined; if (notInQueue) { - this.queue.push(tx); + this.queue.push({ tx, sortingValue: priority }); } return notInQueue; } diff --git a/packages/sequencer/src/storage/repositories/TransactionStorage.ts b/packages/sequencer/src/storage/repositories/TransactionStorage.ts index 522d63eb5..0a8696bcd 100644 --- a/packages/sequencer/src/storage/repositories/TransactionStorage.ts +++ b/packages/sequencer/src/storage/repositories/TransactionStorage.ts @@ -1,7 +1,10 @@ import { PendingTransaction } from "../../mempool/PendingTransaction"; export interface TransactionStorage { - pushUserTransaction: (tx: PendingTransaction) => Promise; + pushUserTransaction: ( + tx: PendingTransaction, + priority: number + ) => Promise; getPendingUserTransactions: ( offset: number, diff --git a/packages/sequencer/test-integration/benchmarks/tps.test.ts b/packages/sequencer/test-integration/benchmarks/tps.test.ts index b3eca0093..68e2dd5a0 100644 --- a/packages/sequencer/test-integration/benchmarks/tps.test.ts +++ b/packages/sequencer/test-integration/benchmarks/tps.test.ts @@ -104,9 +104,7 @@ export async function createAppChain() { maximumBlockSize: 100, }, BlockTrigger: {}, - Mempool: { - validationEnabled: false, - }, + Mempool: {}, }, Signer: { signer: PrivateKey.random(), diff --git a/packages/sequencer/test/integration/Mempool.test.ts b/packages/sequencer/test/integration/Block-order.test.ts similarity index 88% rename from packages/sequencer/test/integration/Mempool.test.ts rename to packages/sequencer/test/integration/Block-order.test.ts index 2b252f783..e13aaf566 100644 --- a/packages/sequencer/test/integration/Mempool.test.ts +++ b/packages/sequencer/test/integration/Block-order.test.ts @@ -1,4 +1,4 @@ -import { log, TypedClass } from "@proto-kit/common"; +import { expectDefined, log, TypedClass } from "@proto-kit/common"; import { Runtime } from "@proto-kit/module"; import { Protocol } from "@proto-kit/protocol"; import { Bool, PrivateKey, UInt64 } from "o1js"; @@ -14,6 +14,7 @@ import { StorageDependencyFactory, VanillaTaskWorkerModules, AppChain, + ManualBlockTrigger, } from "../../src"; import { DefaultTestingSequencerModules, @@ -23,19 +24,21 @@ import { import { Balance } from "./mocks/Balance"; import { createTransaction } from "./utils"; -// TODO Reenable with next PR -describe.skip.each([["InMemory", InMemoryDatabase]])( - "Mempool test", +describe.each([["InMemory", InMemoryDatabase]])( + "Block Ordering test: %s", ( testName, Database: TypedClass ) => { let appChain: ReturnType; let sequencer: Sequencer< - DefaultTestingSequencerModules & { Database: typeof Database } + DefaultTestingSequencerModules & { + Database: typeof Database; + } >; let runtime: Runtime<{ Balance: typeof Balance }>; let mempool: PrivateMempool; + let trigger: ManualBlockTrigger; async function mempoolAddTransactions( userPrivateKey: PrivateKey, @@ -95,9 +98,7 @@ describe.skip.each([["InMemory", InMemoryDatabase]])( Sequencer: { Database: {}, BlockTrigger: {}, - Mempool: { - validationEnabled: true, - }, + Mempool: {}, FeeStrategy: {}, BatchProducerModule: {}, BlockProducerModule: {}, @@ -116,6 +117,7 @@ describe.skip.each([["InMemory", InMemoryDatabase]])( sequencer = appChain.sequencer; mempool = sequencer.resolve("Mempool"); + trigger = sequencer.resolve("BlockTrigger"); }); afterEach(async () => { @@ -123,7 +125,7 @@ describe.skip.each([["InMemory", InMemoryDatabase]])( }); it("transactions are returned in right order - simple", async () => { - expect.assertions(13); + expect.assertions(14); await mempoolAddTransactions(user1PrivateKey, 0); await mempoolAddTransactions(user2PrivateKey, 0); @@ -132,7 +134,9 @@ describe.skip.each([["InMemory", InMemoryDatabase]])( await mempoolAddTransactions(user2PrivateKey, 1); await mempoolAddTransactions(user3PrivateKey, 1); - const txs = await mempool.getTxs(0); + const block = await trigger.produceBlock(); + expectDefined(block); + const txs = block.transactions.map((x) => x.tx); expect(txs).toHaveLength(6); expect(txs[0].nonce.toBigInt()).toStrictEqual(0n); @@ -150,7 +154,7 @@ describe.skip.each([["InMemory", InMemoryDatabase]])( }); it("transactions are returned in right order - medium", async () => { - expect.assertions(13); + expect.assertions(14); log.setLevel("TRACE"); @@ -161,7 +165,9 @@ describe.skip.each([["InMemory", InMemoryDatabase]])( await mempoolAddTransactions(user2PrivateKey, 1); await mempoolAddTransactions(user3PrivateKey, 0); - const txs = await mempool.getTxs(0); + const block = await trigger.produceBlock(); + expectDefined(block); + const txs = block.transactions.map((x) => x.tx); expect(txs).toHaveLength(6); expect(txs[0].nonce.toBigInt()).toStrictEqual(0n); @@ -179,7 +185,7 @@ describe.skip.each([["InMemory", InMemoryDatabase]])( }); it("transactions are returned in right order - harder", async () => { - expect.assertions(13); + expect.assertions(14); await mempoolAddTransactions(user1PrivateKey, 0); await mempoolAddTransactions(user2PrivateKey, 1); @@ -188,7 +194,9 @@ describe.skip.each([["InMemory", InMemoryDatabase]])( await mempoolAddTransactions(user3PrivateKey, 0); await mempoolAddTransactions(user1PrivateKey, 1); - const txs = await mempool.getTxs(0); + const block = await trigger.produceBlock(); + expectDefined(block); + const txs = block.transactions.map((x) => x.tx); expect(txs).toHaveLength(6); expect(txs[0].nonce.toBigInt()).toStrictEqual(0n); @@ -206,7 +214,7 @@ describe.skip.each([["InMemory", InMemoryDatabase]])( }); it("transactions are returned in right order - hardest", async () => { - expect.assertions(13); + expect.assertions(14); await mempoolAddTransactions(user1PrivateKey, 0); await mempoolAddTransactions(user1PrivateKey, 4); @@ -217,7 +225,9 @@ describe.skip.each([["InMemory", InMemoryDatabase]])( await mempoolAddTransactions(user3PrivateKey, 0); await mempoolAddTransactions(user1PrivateKey, 1); - const txs = await mempool.getTxs(0); + const block = await trigger.produceBlock(); + expectDefined(block); + const txs = block.transactions.map((x) => x.tx); expect(txs).toHaveLength(6); expect(txs[0].nonce.toBigInt()).toStrictEqual(0n); diff --git a/packages/sequencer/test/integration/BlockProductionSize.test.ts b/packages/sequencer/test/integration/BlockProductionSize.test.ts index 109b576c5..67d6d7ca6 100644 --- a/packages/sequencer/test/integration/BlockProductionSize.test.ts +++ b/packages/sequencer/test/integration/BlockProductionSize.test.ts @@ -70,9 +70,7 @@ describe("block limit", () => { Sequencer: { Database: {}, BlockTrigger: {}, - Mempool: { - validationEnabled: true, - }, + Mempool: {}, BatchProducerModule: {}, BlockProducerModule: { maximumBlockSize: maxBlockSize,