From b1d2906e1d6ae6eb084899896936ca43d3025785 Mon Sep 17 00:00:00 2001 From: just-mitch <68168980+just-mitch@users.noreply.github.com> Date: Thu, 4 Jan 2024 12:36:02 -0500 Subject: [PATCH] feat: serialize synchronize and simulateTx calls by the pxe via SerialQueue (#3817) Serialize calls by the PXE to: - synchronize with the aztec node - simulate transactions by use of a SerialQueue Synchronization is greedy, meaning it will sync as much as possible while its job is processing. --- .../foundation/src/fifo/memory_fifo.ts | 4 +- .../pxe/src/pxe_service/pxe_service.ts | 63 +++++++++++------ .../pxe/src/synchronizer/synchronizer.test.ts | 7 +- .../pxe/src/synchronizer/synchronizer.ts | 68 +++++++++++++------ 4 files changed, 96 insertions(+), 46 deletions(-) diff --git a/yarn-project/foundation/src/fifo/memory_fifo.ts b/yarn-project/foundation/src/fifo/memory_fifo.ts index 95f1b95fa77..50af730cb05 100644 --- a/yarn-project/foundation/src/fifo/memory_fifo.ts +++ b/yarn-project/foundation/src/fifo/memory_fifo.ts @@ -91,9 +91,9 @@ export class MemoryFifo { /** * Process items from the queue using a provided handler function. - * The function iterates over items in the queue, invoking the handler for each item until the queue is empty or flushing. + * The function iterates over items in the queue, invoking the handler for each item until the queue is empty and flushing. * If the handler throws an error, it will be caught and logged as 'Queue handler exception:', but the iteration will continue. - * The process function returns a promise that resolves when there are no more items in the queue or the queue is flushing. + * The process function returns a promise that resolves when there are no more items in the queue and the queue is flushing. * * @param handler - A function that takes an item of type T and returns a Promise after processing the item. * @returns A Promise that resolves when the queue is finished processing. diff --git a/yarn-project/pxe/src/pxe_service/pxe_service.ts b/yarn-project/pxe/src/pxe_service/pxe_service.ts index e761155a41c..db3a905ee7b 100644 --- a/yarn-project/pxe/src/pxe_service/pxe_service.ts +++ b/yarn-project/pxe/src/pxe_service/pxe_service.ts @@ -18,9 +18,10 @@ import { PublicCallRequest, } from '@aztec/circuits.js'; import { computeCommitmentNonce, siloNullifier } from '@aztec/circuits.js/abis'; -import { encodeArguments } from '@aztec/foundation/abi'; +import { DecodedReturn, encodeArguments } from '@aztec/foundation/abi'; import { padArrayEnd } from '@aztec/foundation/collection'; import { Fr } from '@aztec/foundation/fields'; +import { SerialQueue } from '@aztec/foundation/fifo'; import { DebugLogger, createDebugLogger } from '@aztec/foundation/log'; import { NoirWasmVersion } from '@aztec/noir-compiler/versions'; import { @@ -70,6 +71,9 @@ export class PXEService implements PXE { private simulator: AcirSimulator; private log: DebugLogger; private sandboxVersion: string; + // serialize synchronizer and calls to simulateTx. + // ensures that state is not changed while simulating + private jobQueue = new SerialQueue(); constructor( private keyStore: KeyStore, @@ -79,7 +83,7 @@ export class PXEService implements PXE { logSuffix?: string, ) { this.log = createDebugLogger(logSuffix ? `aztec:pxe_service_${logSuffix}` : `aztec:pxe_service`); - this.synchronizer = new Synchronizer(node, db, logSuffix); + this.synchronizer = new Synchronizer(node, db, this.jobQueue, logSuffix); this.contractDataOracle = new ContractDataOracle(db, node); this.simulator = getAcirSimulator(db, node, keyStore, this.contractDataOracle); @@ -93,7 +97,11 @@ export class PXEService implements PXE { */ public async start() { const { l2BlockPollingIntervalMS } = this.config; - await this.synchronizer.start(1, l2BlockPollingIntervalMS); + this.synchronizer.start(1, l2BlockPollingIntervalMS); + this.jobQueue.start(); + this.log.info('Started Job Queue'); + await this.jobQueue.syncPoint(); + this.log.info('Synced Job Queue'); await this.restoreNoteProcessors(); const info = await this.getNodeInfo(); this.log.info(`Started PXE connected to chain ${info.chainId} version ${info.protocolVersion}`); @@ -121,8 +129,10 @@ export class PXEService implements PXE { * @returns A Promise resolving once the server has been stopped successfully. */ public async stop() { + await this.jobQueue.cancel(); + this.log.info('Cancelled Job Queue'); await this.synchronizer.stop(); - this.log.info('Stopped'); + this.log.info('Stopped Synchronizer'); } /** Returns an estimate of the db size in bytes. */ @@ -336,18 +346,21 @@ export class PXEService implements PXE { throw new Error(`Unspecified internal are not allowed`); } - // We get the contract address from origin, since contract deployments are signalled as origin from their own address - // TODO: Is this ok? Should it be changed to be from ZERO? - const deployedContractAddress = txRequest.txContext.isContractDeploymentTx ? txRequest.origin : undefined; - const newContract = deployedContractAddress ? await this.db.getContract(deployedContractAddress) : undefined; + // all simulations must be serialized w.r.t. the synchronizer + return await this.jobQueue.put(async () => { + // We get the contract address from origin, since contract deployments are signalled as origin from their own address + // TODO: Is this ok? Should it be changed to be from ZERO? + const deployedContractAddress = txRequest.txContext.isContractDeploymentTx ? txRequest.origin : undefined; + const newContract = deployedContractAddress ? await this.db.getContract(deployedContractAddress) : undefined; - const tx = await this.#simulateAndProve(txRequest, newContract); - if (simulatePublic) { - await this.#simulatePublicCalls(tx); - } - this.log.info(`Executed local simulation for ${await tx.getTxHash()}`); + const tx = await this.#simulateAndProve(txRequest, newContract); + if (simulatePublic) { + await this.#simulatePublicCalls(tx); + } + this.log.info(`Executed local simulation for ${await tx.getTxHash()}`); - return tx; + return tx; + }); } public async sendTx(tx: Tx): Promise { @@ -360,13 +373,21 @@ export class PXEService implements PXE { return txHash; } - public async viewTx(functionName: string, args: any[], to: AztecAddress, _from?: AztecAddress) { - // TODO - Should check if `from` has the permission to call the view function. - const functionCall = await this.#getFunctionCall(functionName, args, to); - const executionResult = await this.#simulateUnconstrained(functionCall); - - // TODO - Return typed result based on the function artifact. - return executionResult; + public async viewTx( + functionName: string, + args: any[], + to: AztecAddress, + _from?: AztecAddress, + ): Promise { + // all simulations must be serialized w.r.t. the synchronizer + return await this.jobQueue.put(async () => { + // TODO - Should check if `from` has the permission to call the view function. + const functionCall = await this.#getFunctionCall(functionName, args, to); + const executionResult = await this.#simulateUnconstrained(functionCall); + + // TODO - Return typed result based on the function artifact. + return executionResult; + }); } public async getTxReceipt(txHash: TxHash): Promise { diff --git a/yarn-project/pxe/src/synchronizer/synchronizer.test.ts b/yarn-project/pxe/src/synchronizer/synchronizer.test.ts index 36f9202588a..e28d78be85c 100644 --- a/yarn-project/pxe/src/synchronizer/synchronizer.test.ts +++ b/yarn-project/pxe/src/synchronizer/synchronizer.test.ts @@ -1,5 +1,6 @@ import { BlockHeader, CompleteAddress, EthAddress, Fr, GrumpkinScalar } from '@aztec/circuits.js'; import { Grumpkin } from '@aztec/circuits.js/barretenberg'; +import { SerialQueue } from '@aztec/foundation/fifo'; import { TestKeyStore } from '@aztec/key-store'; import { AztecLmdbStore } from '@aztec/kv-store'; import { AztecNode, INITIAL_L2_BLOCK_NUM, L2Block, MerkleTreeId } from '@aztec/types'; @@ -17,6 +18,7 @@ describe('Synchronizer', () => { let synchronizer: TestSynchronizer; let roots: Record; let blockHeader: BlockHeader; + let jobQueue: SerialQueue; beforeEach(async () => { blockHeader = BlockHeader.random(); @@ -31,7 +33,8 @@ describe('Synchronizer', () => { aztecNode = mock(); database = new KVPxeDatabase(await AztecLmdbStore.create(EthAddress.random())); - synchronizer = new TestSynchronizer(aztecNode, database); + jobQueue = new SerialQueue(); + synchronizer = new TestSynchronizer(aztecNode, database, jobQueue); }); it('sets tree roots from aztec node on initial sync', async () => { @@ -128,7 +131,7 @@ class TestSynchronizer extends Synchronizer { return super.initialSync(); } - public workNoteProcessorCatchUp(): Promise { + public workNoteProcessorCatchUp(): Promise { return super.workNoteProcessorCatchUp(); } } diff --git a/yarn-project/pxe/src/synchronizer/synchronizer.ts b/yarn-project/pxe/src/synchronizer/synchronizer.ts index 61257d01047..266a7c48eb5 100644 --- a/yarn-project/pxe/src/synchronizer/synchronizer.ts +++ b/yarn-project/pxe/src/synchronizer/synchronizer.ts @@ -1,5 +1,6 @@ import { AztecAddress, BlockHeader, Fr, PublicKey } from '@aztec/circuits.js'; import { computeGlobalsHash } from '@aztec/circuits.js/abis'; +import { SerialQueue } from '@aztec/foundation/fifo'; import { DebugLogger, createDebugLogger } from '@aztec/foundation/log'; import { InterruptibleSleep } from '@aztec/foundation/sleep'; import { AztecNode, INITIAL_L2_BLOCK_NUM, KeyStore, L2BlockContext, L2BlockL2Logs, LogType } from '@aztec/types'; @@ -24,7 +25,7 @@ export class Synchronizer { private log: DebugLogger; private noteProcessorsToCatchUp: NoteProcessor[] = []; - constructor(private node: AztecNode, private db: PxeDatabase, logSuffix = '') { + constructor(private node: AztecNode, private db: PxeDatabase, private jobQueue: SerialQueue, logSuffix = '') { this.log = createDebugLogger(logSuffix ? `aztec:pxe_synchronizer_${logSuffix}` : 'aztec:pxe_synchronizer'); } @@ -36,23 +37,35 @@ export class Synchronizer { * @param limit - The maximum number of encrypted, unencrypted logs and blocks to fetch in each iteration. * @param retryInterval - The time interval (in ms) to wait before retrying if no data is available. */ - public async start(limit = 1, retryInterval = 1000) { + public start(limit = 1, retryInterval = 1000) { if (this.running) { return; } this.running = true; - await this.initialSync(); + this.jobQueue + .put(() => this.initialSync()) + .catch(err => { + this.log.error(`Error in synchronizer initial sync`, err); + this.running = false; + throw err; + }); const run = async () => { while (this.running) { - if (this.noteProcessorsToCatchUp.length > 0) { - // There is a note processor that needs to catch up. We hijack the main loop to catch up the note processor. - await this.workNoteProcessorCatchUp(limit, retryInterval); - } else { - // No note processor needs to catch up. We continue with the normal flow. - await this.work(limit, retryInterval); - } + await this.jobQueue.put(async () => { + let moreWork = true; + while (moreWork && this.running) { + if (this.noteProcessorsToCatchUp.length > 0) { + // There is a note processor that needs to catch up. We hijack the main loop to catch up the note processor. + moreWork = await this.workNoteProcessorCatchUp(limit); + } else { + // No note processor needs to catch up. We continue with the normal flow. + moreWork = await this.work(limit); + } + } + }); + await this.interruptibleSleep.sleep(retryInterval); } }; @@ -70,26 +83,29 @@ export class Synchronizer { await this.db.setBlockData(latestBlockNumber, latestBlockHeader); } - protected async work(limit = 1, retryInterval = 1000): Promise { + /** + * Fetches encrypted logs and blocks from the Aztec node and processes them for all note processors. + * + * @param limit - The maximum number of encrypted, unencrypted logs and blocks to fetch in each iteration. + * @returns true if there could be more work, false if we're caught up or there was an error. + */ + protected async work(limit = 1): Promise { const from = this.getSynchedBlockNumber() + 1; try { let encryptedLogs = await this.node.getLogs(from, limit, LogType.ENCRYPTED); if (!encryptedLogs.length) { - await this.interruptibleSleep.sleep(retryInterval); - return; + return false; } let unencryptedLogs = await this.node.getLogs(from, limit, LogType.UNENCRYPTED); if (!unencryptedLogs.length) { - await this.interruptibleSleep.sleep(retryInterval); - return; + return false; } // Note: If less than `limit` encrypted logs is returned, then we fetch only that number of blocks. const blocks = await this.node.getBlocks(from, encryptedLogs.length); if (!blocks.length) { - await this.interruptibleSleep.sleep(retryInterval); - return; + return false; } if (blocks.length !== encryptedLogs.length) { @@ -120,13 +136,21 @@ export class Synchronizer { for (const noteProcessor of this.noteProcessors) { await noteProcessor.process(blockContexts, encryptedLogs); } + return true; } catch (err) { this.log.error(`Error in synchronizer work`, err); - await this.interruptibleSleep.sleep(retryInterval); + return false; } } - protected async workNoteProcessorCatchUp(limit = 1, retryInterval = 1000): Promise { + /** + * Catch up a note processor that is lagging behind the main sync, + * e.g. because we just added a new account. + * + * @param limit - the maximum number of encrypted, unencrypted logs and blocks to fetch in each iteration. + * @returns true if there could be more work, false if we're caught up or there was an error. + */ + protected async workNoteProcessorCatchUp(limit = 1): Promise { const noteProcessor = this.noteProcessorsToCatchUp[0]; const toBlockNumber = this.getSynchedBlockNumber(); @@ -134,7 +158,8 @@ export class Synchronizer { // Note processor already synched, nothing to do this.noteProcessorsToCatchUp.shift(); this.noteProcessors.push(noteProcessor); - return; + // could be more work if there are more note processors to catch up + return true; } const from = noteProcessor.status.syncedToBlock + 1; @@ -184,9 +209,10 @@ export class Synchronizer { this.noteProcessorsToCatchUp.shift(); this.noteProcessors.push(noteProcessor); } + return true; } catch (err) { this.log.error(`Error in synchronizer workNoteProcessorCatchUp`, err); - await this.interruptibleSleep.sleep(retryInterval); + return false; } }