Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)

### Added

- Added Mempool sorting [#395](https://github.com/proto-kit/framework/pull/395)
- Introduced block explorer [#381](https://github.com/proto-kit/framework/pull/381)
- Added CircuitAnalysisModule for easy analysis of protocol circuits [#379](https://github.com/proto-kit/framework/pull/379)
- Separated settlement and bridging functionally, so now settlement can be used without bridging [#376](https://github.com/proto-kit/framework/pull/376)
Expand Down
2 changes: 1 addition & 1 deletion packages/indexer/src/tasks/IndexPendingTxTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export class IndexPendingTxTask

public async compute(input: PendingTransaction): Promise<string | void> {
try {
await this.transactionStorage.pushUserTransaction(input);
await this.transactionStorage.pushUserTransaction(input, 0);
return "";
} catch (err) {
log.error("Failed to process pending tx task", err);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- CreateTable
CREATE TABLE "TransactionPriority" (
"transactionHash" TEXT NOT NULL,
"priority" BIGINT NOT NULL,

CONSTRAINT "TransactionPriority_pkey" PRIMARY KEY ("transactionHash")
);

-- AddForeignKey
ALTER TABLE "TransactionPriority" ADD CONSTRAINT "TransactionPriority_transactionHash_fkey" FOREIGN KEY ("transactionHash") REFERENCES "Transaction"("hash") ON DELETE RESTRICT ON UPDATE CASCADE;
12 changes: 12 additions & 0 deletions packages/persistance/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ model Transaction {
executionResult TransactionExecutionResult?

IncomingMessageBatchTransaction IncomingMessageBatchTransaction[]

priority TransactionPriority?
}

model TransactionPriority {
transactionHash String

priority BigInt

Transaction Transaction @relation(fields: [transactionHash], references: [hash])

@@id([transactionHash])
}

model TransactionExecutionResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ export class PrismaTransactionStorage implements TransactionStorage {
equals: false,
},
},
orderBy: {
priority: {
priority: "desc",
},
},
skip: offset,
take: limit,
});
Expand All @@ -56,13 +61,27 @@ export class PrismaTransactionStorage implements TransactionStorage {
}
}

public async pushUserTransaction(tx: PendingTransaction): Promise<boolean> {
public async pushUserTransaction(
tx: PendingTransaction,
priority: number
): Promise<boolean> {
const { prismaClient } = this.connection;

const result = await prismaClient.transaction.createMany({
data: [this.transactionMapper.mapOut(tx)],
skipDuplicates: true,
});
const transactionData = this.transactionMapper.mapOut(tx);

const [result] = await prismaClient.$transaction([
prismaClient.transaction.createMany({
data: [transactionData],
skipDuplicates: true,
}),

prismaClient.transactionPriority.create({
data: {
priority,
transactionHash: transactionData.hash,
},
}),
]);

return result.count === 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ describe("prisma integration", () => {
PrismaTransactionStorage
);

const txs = await txResolver.getPendingUserTransactions();
const txs = await txResolver.getPendingUserTransactions(0);

expectDefined(transaction.transaction);

Expand Down
4 changes: 1 addition & 3 deletions packages/sdk/test/fees-multi-zkprograms.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,7 @@ describe("check fee analyzer", () => {
},
},
Sequencer: {
Mempool: {
validationEnabled: true,
},
Mempool: {},
},
});

Expand Down
2 changes: 2 additions & 0 deletions packages/sequencer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ export * from "./mempool/Mempool";
export * from "./mempool/PendingTransaction";
export * from "./mempool/CompressedSignature";
export * from "./mempool/private/PrivateMempool";
export * from "./mempool/sorting/MempoolSorting";
export * from "./mempool/sorting/DefaultMempoolSorting";
export * from "./sequencer/executor/Sequencer";
export * from "./sequencer/executor/Sequenceable";
export * from "./sequencer/SequencerIdProvider";
Expand Down
44 changes: 40 additions & 4 deletions packages/sequencer/src/mempool/private/PrivateMempool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,38 @@ import { TransactionValidator } from "../verification/TransactionValidator";
import { Tracer } from "../../logging/Tracer";
import { trace } from "../../logging/trace";
import { IncomingMessagesService } from "../../settlement/messages/IncomingMessagesService";
import { MempoolSorting } from "../sorting/MempoolSorting";
import { DefaultMempoolSorting } from "../sorting/DefaultMempoolSorting";

type PrivateMempoolConfig = {
type?: "hybrid" | "private" | "based";
};

@sequencerModule()
export class PrivateMempool extends SequencerModule implements Mempool {
export class PrivateMempool
extends SequencerModule<PrivateMempoolConfig>
implements Mempool
{
public readonly events = new EventEmitter<MempoolEvents>();

private readonly mempoolSorting: MempoolSorting;

public constructor(
private readonly transactionValidator: TransactionValidator,
@inject("TransactionStorage")
private readonly transactionStorage: TransactionStorage,
@inject("IncomingMessagesService", { isOptional: true })
private readonly messageService: IncomingMessagesService | undefined,
@inject("Tracer") public readonly tracer: Tracer
@inject("Tracer") public readonly tracer: Tracer,
@inject("MempoolSorting", { isOptional: true })
mempoolSorting: MempoolSorting | undefined
) {
super();
this.mempoolSorting = mempoolSorting ?? new DefaultMempoolSorting();
}

private type() {
return this.config.type ?? "hybrid";
}

public async length(): Promise<number> {
Expand All @@ -36,7 +54,12 @@ export class PrivateMempool extends SequencerModule implements Mempool {
public async add(tx: PendingTransaction): Promise<boolean> {
const [txValid, error] = this.transactionValidator.validateTx(tx);
if (txValid) {
const success = await this.transactionStorage.pushUserTransaction(tx);
const sortingValue = this.mempoolSorting!.presortingPriority(tx);

const success = await this.transactionStorage.pushUserTransaction(
tx,
sortingValue
);
if (success) {
this.events.emit("mempool-transaction-added", tx);
log.trace(`Transaction added to mempool: ${tx.hash().toString()}`);
Expand Down Expand Up @@ -69,14 +92,27 @@ export class PrivateMempool extends SequencerModule implements Mempool {
offset?: number,
limit?: number
): Promise<PendingTransaction[]> {
return await this.transactionStorage.getPendingUserTransactions(
if (this.type() === "based") {
return [];
}

let txs = await this.transactionStorage.getPendingUserTransactions(
offset ?? 0,
limit
);

if (this.mempoolSorting.enablePostSorting()) {
txs = this.mempoolSorting.postSorting(txs);
}

return txs;
}

@trace("mempool.get_mandatory_txs")
public async getMandatoryTxs(): Promise<PendingTransaction[]> {
if (this.type() === "private") {
return [];
}
return (await this.messageService?.getPendingMessages()) ?? [];
}

Expand Down
32 changes: 32 additions & 0 deletions packages/sequencer/src/mempool/sorting/DefaultMempoolSorting.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { noop } from "@proto-kit/common";

import { PendingTransaction } from "../PendingTransaction";
import {
SequencerModule,
sequencerModule,
} from "../../sequencer/builder/SequencerModule";

import { MempoolSorting } from "./MempoolSorting";

@sequencerModule()
export class DefaultMempoolSorting
extends SequencerModule
implements MempoolSorting
{
public async start() {
noop();
}

public enablePostSorting(): boolean {
return false;
}

public postSorting(transactions: PendingTransaction[]): PendingTransaction[] {
return transactions;
}

public presortingPriority(tx: PendingTransaction): number {
// This means we order by first in, first out in the db
return Date.UTC(2500, 0) - Date.now();
}
}
26 changes: 26 additions & 0 deletions packages/sequencer/src/mempool/sorting/MempoolSorting.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { PendingTransaction } from "../PendingTransaction";

export interface MempoolSorting {
/**
* Presorting happens on the backend (i.e. the DB), before the data travels to the sequencer.
* It's very fast, but limited to only integer sorting.
* The value returned here has to be static per transaction, since it will be sorted and
* compared on the DB-side.
*
* @param tx
* @returns Priority of the transaction - larger is better (therefore will be
* put in the block first)
*/
presortingPriority(tx: PendingTransaction): number;

/**
* Indicate whether to do pre-sorting (as it's expensive depending on your block size)
*/
enablePostSorting(): boolean;

/**
* Postsorting happens on the sequencer-side. It's less fast but can take in any two
* transactions and directly compare them based on arbitrary logic
*/
postSorting(transactions: PendingTransaction[]): PendingTransaction[];
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { InMemoryBatchStorage } from "./InMemoryBatchStorage";

@injectable()
export class InMemoryTransactionStorage implements TransactionStorage {
private queue: PendingTransaction[] = [];
private queue: { tx: PendingTransaction; sortingValue: number }[] = [];

private latestScannedBlock = -1;

Expand All @@ -21,12 +21,17 @@ export class InMemoryTransactionStorage implements TransactionStorage {

public async removeTx(hashes: string[]) {
const hashSet = new Set(hashes);
this.queue = this.queue.filter((tx) => {
this.queue = this.queue.filter(({ tx }) => {
const hash = tx.hash().toString();
return !hashSet.has(hash);
});
}

private sortQueue() {
// Sort in-place and descending
this.queue.sort(({ sortingValue: a }, { sortingValue: b }) => b - a);
}

public async getPendingUserTransactions(
offset: number,
limit?: number
Expand All @@ -42,25 +47,30 @@ export class InMemoryTransactionStorage implements TransactionStorage {
if (block !== undefined) {
const hashes = block.transactions.map((tx) => tx.tx.hash().toString());
this.queue = this.queue.filter(
(tx) => !hashes.includes(tx.hash().toString())
({ tx }) => !hashes.includes(tx.hash().toString())
);
}
}
this.latestScannedBlock = nextHeight - 1;

this.sortQueue();

const from = offset ?? 0;
const to = limit !== undefined ? from + limit : undefined;

return this.queue.slice(from, to);
return this.queue.slice(from, to).map(({ tx }) => tx);
}

public async pushUserTransaction(tx: PendingTransaction): Promise<boolean> {
public async pushUserTransaction(
tx: PendingTransaction,
priority: number
): Promise<boolean> {
const notInQueue =
this.queue.find(
(tx2) => tx2.hash().toString() === tx.hash().toString()
({ tx: tx2 }) => tx2.hash().toString() === tx.hash().toString()
) === undefined;
if (notInQueue) {
this.queue.push(tx);
this.queue.push({ tx, sortingValue: priority });
}
return notInQueue;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { PendingTransaction } from "../../mempool/PendingTransaction";

export interface TransactionStorage {
pushUserTransaction: (tx: PendingTransaction) => Promise<boolean>;
pushUserTransaction: (
tx: PendingTransaction,
priority: number
) => Promise<boolean>;

getPendingUserTransactions: (
offset: number,
Expand Down
4 changes: 1 addition & 3 deletions packages/sequencer/test-integration/benchmarks/tps.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ export async function createAppChain() {
maximumBlockSize: 100,
},
BlockTrigger: {},
Mempool: {
validationEnabled: false,
},
Mempool: {},
},
Signer: {
signer: PrivateKey.random(),
Expand Down
Loading