Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
122 commits
Select commit Hold shift + click to select a range
21db667
extract getContractInstance from PXEOracleInterface
Dec 16, 2025
077c625
Merge branch 'next' into martin/refactor-pxe-oracle-interface-away
Dec 16, 2025
511fb18
extract getFunctionArtifact from PXEOracleInterface
Dec 16, 2025
3510ad5
extract getDebugFunctionName from PXEOracleInterface
Dec 16, 2025
cec0289
extract getNotes from PXEOracleInterface
Dec 17, 2025
cef673b
merge
Dec 17, 2025
2ebef27
extract getKeyValidationRequest from PXEOracleInterface
Dec 17, 2025
61dec41
extract getCompleteAddress from PXEOracleInterface
Dec 17, 2025
d1beaca
calculateDirectionalAppTaggingSecret
Dec 17, 2025
13118f9
getSharedSecret
Dec 17, 2025
cad9544
getL1ToL2MembershipWitness
Dec 17, 2025
37ea1da
fix regression
Dec 17, 2025
aabc3a0
getMembershipWitness
Dec 17, 2025
8e79091
getLowNullifierMembershipWitness
Dec 17, 2025
6b9cd95
fix some regressions
Dec 17, 2025
6844a44
fix regression
Dec 17, 2025
31b5cea
getBlock
Dec 17, 2025
b8f8de3
getNulliferMembershipWitness and getNullifierMembershipWitnessAtLates…
Dec 17, 2025
7514360
getPublicDataWitness
Dec 17, 2025
19c6fd7
getPublicStorageAt
Dec 17, 2025
95918da
fix regressions
Dec 18, 2025
245af9a
assertCompatibleOracleVersion
Dec 18, 2025
cd0313d
remove getSenders
Dec 18, 2025
3bf3ac2
storeCapsule
Dec 18, 2025
a022b02
rest of capsule delegates
Dec 18, 2025
a94eb8e
getStats
Dec 18, 2025
ee24653
almost the rest of the frigging owl
Dec 18, 2025
c99cdf6
getNullifierIndex
Dec 18, 2025
86db9f5
remove aztec node getter from PXEOracleInterface
Dec 18, 2025
ba6c7ed
note and event validation
Dec 18, 2025
2b60f30
good bye PXEOracleInterface
Dec 18, 2025
46c5d3c
move getContractInstance to utility execution oracle
Dec 18, 2025
74fdb15
move getNotes to UtilityExecutionOracle
Dec 18, 2025
f4c1c6b
bring some more stuff inside oracles
Dec 18, 2025
2a3c9de
getMembershipWitness
Dec 18, 2025
8ae2fb2
more
Dec 18, 2025
79d4fda
common => utility_execution_oracle 1/
Dec 18, 2025
71d42be
common => utility_execution_oracle 2/
Dec 18, 2025
7b6e829
common => utility_execution_oracle 3/
Dec 18, 2025
9074c82
common => utility_execution_oracle 4/
Dec 18, 2025
fb87a5a
common => utility_execution_oracle 5/
Dec 18, 2025
014fca7
common => utility_execution_oracle 6/
Dec 18, 2025
b2cfbfb
common => utility_execution_oracle 7/
Dec 18, 2025
5b7d6cd
common => utility_execution_oracle 8/
Dec 18, 2025
78c8966
common => utility_execution_oracle 9/
Dec 18, 2025
750862b
common => utility_execution_oracle 10/
Dec 19, 2025
57362d2
introduce NoteSynchronizer
Dec 19, 2025
66c7836
deliverNote
Dec 19, 2025
eec0dd3
rename NoteSynchronizer => NoteService
Dec 19, 2025
375364b
EventService
Dec 19, 2025
6ac2b9c
refactor validateEnqueuedNotesAndEvents
Dec 19, 2025
4a0d9d4
getFunctionArtifactWithDebugMetadata
Dec 19, 2025
88e3f33
good bye common
Dec 19, 2025
9a6bcbd
simplify readCurrentClassId
Dec 19, 2025
455db79
help shaking trees
Dec 19, 2025
80708ce
fix weird lint issue
Dec 19, 2025
2fe98d5
LogService
Dec 19, 2025
0b97781
bulkRetrieveLogs
Dec 19, 2025
343adc2
MembershipWitnessService
Dec 19, 2025
3e4cca4
more MembershipWitnessService
Dec 19, 2025
20b47ff
getBlock
Dec 19, 2025
5f9a2e9
getNotes
Dec 19, 2025
dbc48fc
TreeMembershipService
Dec 19, 2025
9fd681e
PublicStorageService
Dec 19, 2025
719255e
port docs
Dec 19, 2025
ba5c24b
good bye ExecutionDataProvider
Dec 19, 2025
268c6dd
wanted
Dec 19, 2025
bb4422b
merge
Dec 19, 2025
5b0a17e
reduce barrel imports
Dec 19, 2025
e03955a
POC: optimize playground build
Dec 22, 2025
bd84fe7
remove report
Dec 22, 2025
2567d4c
Revert "remove report"
Dec 22, 2025
fb2c21e
Revert "POC: optimize playground build"
Dec 22, 2025
328a86b
hunt down barrel imports
Dec 22, 2025
c3d269b
went too far in the shaking
Dec 22, 2025
ef40d3b
sync from PXE instead of from contracts
Dec 22, 2025
91241dd
make it bootstrap
Dec 22, 2025
b33eb0f
Merge branch 'next' into martin/f-162-explicit-contract-sync
Dec 22, 2025
9cfbcc1
guard against external sync_private_state invocations
Dec 23, 2025
864a62c
more guards
Dec 23, 2025
ee980bf
decomplect sync in txe
Dec 23, 2025
33a4acd
refactor to unify where sync happens
Dec 23, 2025
43101f3
wip
Dec 23, 2025
b28b007
fix getPrivateEvents
Dec 23, 2025
814ea1d
Sync private state before simulating utility in TXE
Jan 2, 2026
88979d2
Merge branch 'next' into martin/f-162-explicit-contract-sync
Jan 2, 2026
a79eec1
Fix PXE unit test
Jan 2, 2026
beb5666
Call syncPrivateState on txePrivateCallNewFlow
Jan 2, 2026
dd934b7
Sync notes when entering private state in TXE
Jan 2, 2026
4ac3539
fix circuit recorder tests
Jan 2, 2026
9ef873e
fix multiple blobs test
Jan 2, 2026
d52fee0
first part of PXE db integrity through staged writes
Jan 2, 2026
16556e8
adapt providers to use staging phase
Jan 2, 2026
560aeaf
integrate with PXE main code
Jan 2, 2026
17934e4
final pieces of integration
Jan 2, 2026
2f4cc9c
remove context from block synchronizer
Jan 2, 2026
c814836
fix lint issues
Jan 2, 2026
bdb30a1
fix for IndexedDB serialization
Jan 2, 2026
39ce555
tidy up
Jan 5, 2026
1861653
fix dep cycle
Jan 5, 2026
63a0854
Merge branch 'martin/f-162-explicit-contract-sync' into martin/pxe-db…
Jan 5, 2026
3d74dc9
rename
Jan 5, 2026
a603749
handle store transaction at commitJob level
Jan 5, 2026
b11c1c0
less granularity on commit blast
Jan 5, 2026
fde4d08
rename provider => store
Jan 5, 2026
52aa605
refactor PrivateEventDataProvider to use in-memory data
Jan 5, 2026
e6da298
make e2e changes more minimal
Jan 5, 2026
766b859
remove outdated comment
Jan 5, 2026
014c614
tweak e2e test
Jan 5, 2026
1c6fa2a
Merge branch 'next' into martin/f-162-explicit-contract-sync
Jan 5, 2026
397b0d5
Merge branch 'martin/f-162-explicit-contract-sync' into martin/pxe-db…
Jan 5, 2026
da7e500
fix RecipientTaggingDataProvider
Jan 5, 2026
bfe2d1d
merge
Jan 6, 2026
ea2a7f5
more renames
Jan 6, 2026
027cfe1
make anchor block store work in memory
Jan 6, 2026
7592652
rename commitStaged => commit
Jan 6, 2026
89eca7c
make sender tagging store committable
Jan 6, 2026
bb241d4
another pass at note store
Jan 6, 2026
42c3ac4
polish job_coordinator
Jan 6, 2026
ca369a6
little refactors to JobCoordinator
Jan 6, 2026
768b61f
make getNotes support staged writes
Jan 6, 2026
c70fbe3
remove recover feature
Jan 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion yarn-project/end-to-end/src/e2e_multiple_blobs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ describe('e2e_multiple_blobs', () => {
await aztecNodeAdmin.setConfig({ minTxsPerBlock: TX_COUNT });

const provenTxs = [
// 1 contract deployment tx.
// 1 contract deployment tx (publishes public_dispatch bytecode: ~1,931 fields).
await publishContractClass(wallet, AvmTestContract.artifact),
// 2 private function broadcast txs. We pick [2] because it has large bytecode (~1,807 fields),
// which combined with the contract class publication exceeds FIELDS_PER_BLOB (4,096).
Expand Down
4 changes: 4 additions & 0 deletions yarn-project/pxe/src/block_synchronizer/block_synchronizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ export class BlockSynchronizer implements L2BlockStreamEventHandler {
public async handleBlockStreamEvent(event: L2BlockStreamEvent): Promise<void> {
await this.l2TipsStore.handleBlockStreamEvent(event);

// NOTE: Sync operations write directly to main storage (no staging) because they update
// our local view to match chain state. This is idempotent - if a job fails mid-sync,
// the next sync will bring us back to the correct state.

switch (event.type) {
case 'blocks-added': {
const lastBlock = event.blocks.at(-1)!.block;
Expand Down
2 changes: 2 additions & 0 deletions yarn-project/pxe/src/job_coordinator/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { JobContext } from './job_context.js';
export { JobCoordinator, type StagedStore } from './job_coordinator.js';
46 changes: 46 additions & 0 deletions yarn-project/pxe/src/job_coordinator/job_context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* JobContext represents an active job in the PXE.
*
* During a job, all database writes go to a staging area (prefixed keys).
* Only on job success are they promoted to main storage. This provides
* job-level atomicity without requiring cross-store transactions.
*/
export class JobContext {
/** Unique identifier for this job */
readonly jobId: string;

/** Type of job (for logging/debugging) */
readonly jobType: string;

/** Prefix for staging keys: "job_{jobId}:" */
readonly stagingPrefix: string;

/** Timestamp when job started */
readonly startedAt: number;

constructor(jobId: string, jobType: string) {
this.jobId = jobId;
this.jobType = jobType;
this.stagingPrefix = `job_${jobId}:`;
this.startedAt = Date.now();
}

/**
* Generates a staging key from a main key.
* Prepends the staging prefix to the key.
*/
stagingKey(mainKey: string): string {
return `${this.stagingPrefix}${mainKey}`;
}

/**
* Extracts the main key from a staging key.
* Removes the staging prefix from the key.
*/
mainKey(stagingKey: string): string {
if (!stagingKey.startsWith(this.stagingPrefix)) {
throw new Error(`Key "${stagingKey}" does not have staging prefix "${this.stagingPrefix}"`);
}
return stagingKey.substring(this.stagingPrefix.length);
}
}
135 changes: 135 additions & 0 deletions yarn-project/pxe/src/job_coordinator/job_coordinator.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import type { AztecAsyncKVStore } from '@aztec/kv-store';
import { openTmpStore } from '@aztec/kv-store/lmdb-v2';

import { jest } from '@jest/globals';

import { JobContext } from './job_context.js';
import { JobCoordinator, type StagedStore } from './job_coordinator.js';

describe('JobCoordinator', () => {
let store: AztecAsyncKVStore;
let coordinator: JobCoordinator;

beforeEach(async () => {
store = await openTmpStore('job_coordinator_test');
coordinator = new JobCoordinator(store);
});

describe('beginJob', () => {
it('creates a new job context', () => {
const context = coordinator.beginJob('test_job');

expect(context).toBeInstanceOf(JobContext);
expect(context.jobType).toBe('test_job');
expect(context.jobId).toBeDefined();
expect(context.stagingPrefix).toContain('job_');
});

// Note: we could eventually be relax this if we want more concurrency,
// but it's good to start with this guardrail
it('throws if job already in progress', () => {
coordinator.beginJob('first_job');
expect(() => coordinator.beginJob('second_job')).toThrow(/already in progress/);
});

it('tracks job in progress', () => {
coordinator.beginJob('test_job');
expect(coordinator.hasJobInProgress()).toBe(true);
});
});

describe('commitJob', () => {
it('clears job marker on commit', async () => {
const context = coordinator.beginJob('test_job');
await coordinator.commitJob(context);
expect(coordinator.hasJobInProgress()).toBe(false);
});

it('throws if no matching job in progress', async () => {
const context = coordinator.beginJob('test_job');
await coordinator.commitJob(context);
await expect(coordinator.commitJob(context)).rejects.toThrow(/no matching job/);
});

it('calls commit on registered stores', async () => {
const commitMock = jest.fn<() => Promise<void>>().mockResolvedValue(undefined);
const discardStagedMock = jest.fn<() => Promise<void>>().mockResolvedValue(undefined);
const mockStore: StagedStore = {
storeName: 'mock_store',
commit: commitMock,
discardStaged: discardStagedMock,
};

coordinator.registerStore(mockStore);

const context = coordinator.beginJob('test_job');

await coordinator.commitJob(context);

expect(commitMock).toHaveBeenCalledWith(context);
});
});

describe('abortJob', () => {
it('clears job marker on abort', async () => {
const context = coordinator.beginJob('test_job');

await coordinator.abortJob(context);

expect(coordinator.hasJobInProgress()).toBe(false);
});

it('calls discardStaged on all registered stores', async () => {
const commitMock = jest.fn<() => Promise<void>>().mockResolvedValue(undefined);
const discardStagedMock = jest.fn<() => Promise<void>>().mockResolvedValue(undefined);
const mockStore: StagedStore = {
storeName: 'mock_store',
commit: commitMock,
discardStaged: discardStagedMock,
};

coordinator.registerStore(mockStore);

const context = coordinator.beginJob('test_job');

await coordinator.abortJob(context);

expect(discardStagedMock).toHaveBeenCalledWith(context.stagingPrefix);
});
});

describe('registerStore', () => {
it('throws on duplicate registration', () => {
const commitMock = jest.fn<() => Promise<void>>().mockResolvedValue(undefined);
const discardStagedMock = jest.fn<() => Promise<void>>().mockResolvedValue(undefined);
const mockStore: StagedStore = {
storeName: 'mock_store',
commit: commitMock,
discardStaged: discardStagedMock,
};

coordinator.registerStore(mockStore);

expect(() => coordinator.registerStore(mockStore)).toThrow(/already registered/);
});
});
});

describe('JobContext', () => {
it('generates staging keys', () => {
const context = new JobContext('abc123', 'test');
expect(context.stagingKey('notes')).toBe('job_abc123:notes');
expect(context.stagingKey('data:key')).toBe('job_abc123:data:key');
});

it('extracts main keys from staging keys', () => {
const context = new JobContext('abc123', 'test');
expect(context.mainKey('job_abc123:notes')).toBe('notes');
expect(context.mainKey('job_abc123:data:key')).toBe('data:key');
});

it('throws when extracting main key from non-staging key', () => {
const context = new JobContext('abc123', 'test');
expect(() => context.mainKey('notes')).toThrow(/does not have staging prefix/);
});
});
160 changes: 160 additions & 0 deletions yarn-project/pxe/src/job_coordinator/job_coordinator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import { randomBytes } from '@aztec/foundation/crypto/random';
import { createLogger } from '@aztec/foundation/log';
import type { AztecAsyncKVStore } from '@aztec/kv-store';

import { JobContext } from './job_context.js';

/**
* Interface that data providers must implement to support staged writes.
*/
export interface StagedStore {
/** Unique name identifying this provider (used for tracking affected stores) */
readonly storeName: string;

/**
* Commits staged data to main storage.
* Should be called within a transaction for atomicity.
*
* @param context - The job context containing the staging prefix
*/
commit(context: JobContext): Promise<void>;

/**
* Discards staged data without committing.
* Called on abort or during recovery.
*
* @param stagingPrefix - The prefix used for staging keys
*/
discardStaged(stagingPrefix: string): Promise<void>;
}

/**
* JobCoordinator manages job lifecycle and provides crash resilience for PXE operations.
*
* It uses a staged writes pattern:
* 1. When a job begins, a context is created with a unique staging prefix
* 2. During the job, all writes go to staging (prefixed keys)
* 3. On commit, staging is promoted to main storage
* 4. On abort, staged data is discarded
*
* Note: PXE should only rely on a single JobCoordinator instance, so it can eventually
* orchestrate concurrent jobs. Right now it doesn't make a difference because we're
* using a job queue with concurrency=1.
*/
export class JobCoordinator {
private readonly log = createLogger('pxe:job_coordinator');

/** The underlying KV store */
kvStore: AztecAsyncKVStore;

/** Current job context (in-memory only) */
#currentJob: JobContext | undefined;

/** All registered staged stores */
#stores: Map<string, StagedStore> = new Map();

constructor(kvStore: AztecAsyncKVStore) {
this.kvStore = kvStore;
}

/**
* Registers a staged store.
* Must be called during initialization for all stores that need staging support.
*/
registerStore(store: StagedStore): void {
if (this.#stores.has(store.storeName)) {
throw new Error(`Store "${store.storeName}" is already registered`);
}
this.#stores.set(store.storeName, store);
this.log.debug(`Registered staged store: ${store.storeName}`);
}

/**
* Registers multiple staged stores.
*/
registerStores(stores: StagedStore[]): void {
for (const store of stores) {
this.registerStore(store);
}
}

/**
* Begins a new job and returns a context for staged writes.
*
* @param jobType - Type of job (for logging/debugging)
* @returns JobContext to pass to write operations
*/
beginJob(jobType: string): JobContext {
if (this.#currentJob) {
throw new Error(
`Cannot begin job "${jobType}": job "${this.#currentJob.jobType}" (${this.#currentJob.jobId}) is already in progress. ` +
`This should not happen - ensure jobs are properly committed or aborted.`,
);
}

const jobId = randomBytes(8).toString('hex');
const context = new JobContext(jobId, jobType);
this.#currentJob = context;

this.log.debug(`Started job ${jobId} (type: ${jobType})`);
return context;
}

/**
* Commits a job by promoting all staged data to main storage.
*
* @param context - The job context returned from beginJob
*/
async commitJob(context: JobContext): Promise<void> {
if (!this.#currentJob || this.#currentJob.jobId !== context.jobId) {
throw new Error(
`Cannot commit job ${context.jobId}: no matching job in progress. ` +
`Current job: ${this.#currentJob ? this.#currentJob.jobId : 'none'}`,
);
}

this.log.debug(`Committing job ${context.jobId}`);

// Commit all stores atomically in a single transaction.
// Each store's commit is a no-op if it has no staged data (but that's up to each store to handle).
await this.kvStore.transactionAsync(async () => {
for (const store of this.#stores.values()) {
await store.commit(context);
}
});

this.#currentJob = undefined;
this.log.debug(`Job ${context.jobId} committed successfully`);
}

/**
* Aborts a job by discarding all staged data.
*
* @param context - The job context returned from beginJob
*/
async abortJob(context: JobContext): Promise<void> {
if (!this.#currentJob || this.#currentJob.jobId !== context.jobId) {
// Job may have already been aborted or never started properly
this.log.warn(`Abort called for job ${context.jobId} but current job is ${this.#currentJob?.jobId ?? 'none'}`);
}

this.log.debug(`Aborting job ${context.jobId}`);

// Discard staging atomically
await this.kvStore.transactionAsync(async () => {
for (const store of this.#stores.values()) {
await store.discardStaged(context.stagingPrefix);
}
});

this.#currentJob = undefined;
this.log.debug(`Job ${context.jobId} aborted`);
}

/**
* Checks if there's a job currently in progress.
*/
hasJobInProgress(): boolean {
return this.#currentJob !== undefined;
}
}
Loading
Loading