Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/api/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import process from "node:process";
import { ChainsService } from "@/chains";
import { parsePort } from "@/common";
import { Logger } from "@/core/logger";
import { GasManagerService } from "@/gas-manager";
import { HealthCheckService } from "@/health-check";
import { NodeService } from "@/node";
import { RpcManagerService } from "@/rpc-manager";
Expand All @@ -25,6 +26,7 @@ export async function bootstrap() {
const healthCheckService = Container.get(HealthCheckService);
const nodeService = Container.get(NodeService);
const rpcManagerService = Container.get(RpcManagerService);
const gasManagerService = Container.get(GasManagerService);

await nodeService.readVersion();

Expand Down Expand Up @@ -76,6 +78,11 @@ export async function bootstrap() {
);
break;
}

case "gasInfoSync": {
gasManagerService.syncGasInfo(data.chainId, data.gasInfo);
break;
}
}
});
}
49 changes: 49 additions & 0 deletions src/master/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { BatcherService } from "@/batcher";
import { ChainsService } from "@/chains";
import { Logger } from "@/core/logger";
import { ExecutorService } from "@/executor";
import { GasManagerService } from "@/gas-manager";
import { HealthCheckService } from "@/health-check";
import { NodeService } from "@/node";
import { RpcChainConfig, RpcManagerService } from "@/rpc-manager";
Expand All @@ -16,12 +17,16 @@ export async function bootstrap() {
const logger = Container.get(Logger);
const batcherService = Container.get(BatcherService);
const chainsService = Container.get(ChainsService);
const gasManagerService = Container.get(GasManagerService);
const healthCheckService = Container.get(HealthCheckService);
const nodeService = Container.get(NodeService);
const workersService = Container.get(WorkersService);
const rpcManagerService = Container.get(RpcManagerService);

// Chain settings will be initialized
await chainsService.initialize();

// Workers will be spawned
await workersService.initialize();

const chainsSettings = chainsService.getChainsSettings();
Expand Down Expand Up @@ -98,9 +103,52 @@ export async function bootstrap() {
},
);
})
// RPC providers will be initialized
.setup(rpcConfigs, true);

const gasManagerConfigs = chainsSettings.map((chainSettings) => ({
chainId: chainSettings.chainId,
gasFetchInterval: chainSettings.gasCacheDuration,
}));

const threadGasInfoSync = true;

// Gas manager will be initialized
await gasManagerService.initialize(gasManagerConfigs, threadGasInfoSync);

gasManagerService.on("sync", (gasInfoByChain) => {
workersService.notifyClusterWorkers({
type: "gasInfoSync",
data: gasInfoByChain,
});

workersService.notifyThreadWorkers(
{
type: "gasInfoSync",
data: gasInfoByChain,
},
{
chainId: gasInfoByChain.chainId,
workerType: "simulator",
},
);

workersService.notifyThreadWorkers(
{
type: "gasInfoSync",
data: gasInfoByChain,
},
{
chainId: gasInfoByChain.chainId,
workerType: "executor",
},
);
});

// Batcher will be initialized and this can be initialzed anytime after chain service.
await batcherService.initialize();

// node service will be initialized which depends on RPC manager.
await nodeService.initialize();

await healthCheckService
Expand Down Expand Up @@ -153,5 +201,6 @@ export async function bootstrap() {
);
}
})
// Finally the health service is initialized and workers will be activated for execution processing
.initialize();
}
121 changes: 103 additions & 18 deletions src/modules/executor/executor.processor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ChainsService } from "@/chains";
import {
BadRequestException,
SomethingWentWrongException,
sanitizeUrl,
unixTimestamp,
Expand All @@ -12,6 +13,7 @@ import { EntryPointService } from "@/entry-point";
import { GasConditions } from "@/gas-estimator";
import { GasEstimatorServiceV2 } from "@/gas-estimator/gas-estimator-v2.service";
import { NODE_ACCOUNT_TOKEN, type NodeAccount } from "@/node";
import { NonceManagerService } from "@/nonce-manager";
import { RpcManagerService } from "@/rpc-manager";
import { trackSimulationTransactionData } from "@/simulator/utils";
import { StorageService } from "@/storage";
Expand All @@ -31,7 +33,6 @@ import {
stringify,
toHex,
} from "viem";
import { base } from "viem/chains";
import {
BLOCK_GAS_LIMIT_EXCEEDS_ERROR_MESSAGES,
GAS_LIMIT_ERROR_MESSAGES,
Expand Down Expand Up @@ -67,6 +68,7 @@ export class ExecutorProcessor implements Processor<ExecutorJob> {
private readonly userOpService: UserOpService,
private readonly entryPointService: EntryPointService,
private readonly rpcManagerService: RpcManagerService,
private readonly nonceManagerService: NonceManagerService,
) {
logger.setCaller(ExecutorProcessor);
}
Expand All @@ -76,6 +78,8 @@ export class ExecutorProcessor implements Processor<ExecutorJob> {
let { meeUserOps, batchGasLimit, previousTxHash } = data;
const { chainId } = this.chainsService;

let forceFetchNonce = false;

return await withTrace(
"executionPhase",
async () => {
Expand Down Expand Up @@ -118,6 +122,24 @@ export class ExecutorProcessor implements Processor<ExecutorJob> {
{ chainId, batchHash },
)();

if (transactionReceipt) {
// If the transaction receipt is being fetched for prevTxHash ? It means, the original execution has failed to get txReceipt due to
// RPC provider delay in synchronization. So the nonce might also never be marked as used in this case and increment might also never happened
// Getting the current nonce to mark it as used.
const prevUsedNonce = await this.nonceManagerService.getNonce(
this.nodeAccount.address,
chainId,
);

// If the transaction receipt is fetched. It guaruntees that the transaction has been mined irrespective of whether it is successful or reverted on chain.
// So we mark the nonce as used and increase the current nonce to sync the cache to have a fresh nonce for next executions
this.nonceManagerService.markNonceAsUsed(
this.nodeAccount.address,
chainId,
prevUsedNonce,
);
}

// This is non retriable scenario. As a result, the entire userOp list is simulated again
// and faulty userOps will be removed and retried with a new job.
if (transactionReceipt?.status === "reverted") {
Expand Down Expand Up @@ -148,6 +170,14 @@ export class ExecutorProcessor implements Processor<ExecutorJob> {
// This will mark the job as completed and end the job here
return true;
} catch (error) {
// If there is prevTxHash ? It means the transaction is either dropped or pending in the mempool.
// With nonce manager, if somehow we got a futuristic nonce eg: current nonce is 1 but we sent the tx with nonce 2
// In this case the tx will be stuck without errors. If this worst case happens, it will usually endup as tx receipt timeout error
// So it there is any tx receipt timeout error and it is never resolved ? It maybe something RPC side as well but for safety purpose
// and to handle this future nonce case. We forcefully fetch the latest nonce from RPC and retry the tx.
// In this case the stuct transaction will become invalid after the correct nonce based tx is executed due to SCA nonce duplication
forceFetchNonce = true;

this.logger.trace(
{
chainId,
Expand Down Expand Up @@ -328,12 +358,26 @@ export class ExecutorProcessor implements Processor<ExecutorJob> {
);

const [feeData, nonce] = await Promise.all([
this.gasEstimatorService.getCurrentGasConditions(chainId),
this.rpcManagerService.executeRequest(chainId, (chainClient) => {
return chainClient.getTransactionCount({
address: this.nodeAccount.address,
});
}),
withTrace(
"executorPhase.getCurrentGasConditions",
() => this.gasEstimatorService.getCurrentGasConditions(chainId),
{
chainId,
},
)(),
withTrace(
"executorPhase.getNonce",
() =>
this.nonceManagerService.getNonce(
this.nodeAccount.address,
chainId,
forceFetchNonce,
),
{
chainId,
workerAddress: this.nodeAccount.address,
},
)(),
]);

let executionOptions = {
Expand Down Expand Up @@ -502,6 +546,7 @@ export class ExecutorProcessor implements Processor<ExecutorJob> {
if (
error instanceof UnrecoverableError ||
error instanceof SomethingWentWrongException ||
error instanceof BadRequestException ||
error instanceof Error
) {
errorMessage = error.message;
Expand Down Expand Up @@ -871,6 +916,16 @@ export class ExecutorProcessor implements Processor<ExecutorJob> {
return { transactionReceipt: undefined, txHash };
}

if (transactionReceipt) {
// If the transaction receipt is fetched. It guaruntees that the transaction has been mined irrespective of whether it is successful or reverted on chain.
// So we mark the nonce as used and increase the current nonce to sync the cache to have a fresh nonce for next executions
this.nonceManagerService.markNonceAsUsed(
this.nodeAccount.address,
chainId,
nonce,
);
}

// This will end up as non retriable error. As a result, the entire userOp list is simulated again
// and faulty userOps will be removed and retried with a new job.
if (transactionReceipt.status === "reverted") {
Expand All @@ -884,6 +939,7 @@ export class ExecutorProcessor implements Processor<ExecutorJob> {
if (
error instanceof Error ||
error instanceof SomethingWentWrongException ||
error instanceof BadRequestException ||
error instanceof ContractFunctionExecutionError ||
error instanceof ContractFunctionRevertedError
) {
Expand Down Expand Up @@ -1125,7 +1181,7 @@ export class ExecutorProcessor implements Processor<ExecutorJob> {
if (errorMessage.includes(configuredErrorMessage.toLowerCase())) {
return {
isRetriableError: false,
errorType: USER_OP_EXECUTION_ERRORS.PRIORITY_FEE_TOO_HIGH,
errorType: USER_OP_EXECUTION_ERRORS.BLOCK_GAS_LIMIT_EXCEEDS_ERROR,
};
}
}
Expand Down Expand Up @@ -1195,23 +1251,52 @@ export class ExecutorProcessor implements Processor<ExecutorJob> {
switch (errorType) {
case USER_OP_EXECUTION_ERRORS.GAS_TOO_LOW:
case USER_OP_EXECUTION_ERRORS.MAX_FEE_TOO_LOW:
case USER_OP_EXECUTION_ERRORS.PRIORITY_FEE_TOO_HIGH:
case USER_OP_EXECUTION_ERRORS.NONCE_EXPIRED: {
const nonce = await this.rpcManagerService.executeRequest(
chainId,
(chainClient) => {
return chainClient.getTransactionCount({
address: this.nodeAccount.address,
});
// If it is a nonce error, we ignore cache and forcefully fetch the new nonce from RPC call
const forceFetchNonce =
errorType === USER_OP_EXECUTION_ERRORS.NONCE_EXPIRED;

const nonce = await withTrace(
"executorPhase.getNonceOnRetry",
async () =>
await this.nonceManagerService.getNonce(
this.nodeAccount.address,
chainId,
forceFetchNonce,
),
{
chainId,
workerAddress: this.nodeAccount.address,
forceFetchNonce,
},
);
)();

// If it is a gas related error, we ignore cache and forcefully fetch the new new gas info from RPC call
const forceFetchGasConditions =
errorType === USER_OP_EXECUTION_ERRORS.GAS_TOO_LOW ||
errorType === USER_OP_EXECUTION_ERRORS.PRIORITY_FEE_TOO_HIGH ||
errorType === USER_OP_EXECUTION_ERRORS.MAX_FEE_TOO_LOW;

const currentGasConditions = await withTrace(
"executorPhase.getCurrentGasConditionsOnRetry",
async () =>
await this.gasEstimatorService.getCurrentGasConditions(
chainId,
forceFetchGasConditions,
),
{
chainId,
forceFetchGasConditions,
},
)();

return {
maxFeePerGas:
(options.executeOptions.maxFeePerGas *
(100n + options.percentage)) /
(currentGasConditions.maxFeePerGas * (100n + options.percentage)) /
100n,
maxPriorityFeePerGas:
(options.executeOptions.maxPriorityFeePerGas *
(currentGasConditions.maxPriorityFeePerGas *
(100n + options.percentage)) /
100n,
nonce,
Expand Down
Loading