Skip to content

Commit

Permalink
fix: remove serial queue in broker facade (#11956)
Browse files Browse the repository at this point in the history
This PR removes the serial queue used to send proving jobs from the
orchestrator to the broker. This will allow requests to be run in
parallel and which will benefit from batching on the broker side.
  • Loading branch information
alexghr authored Feb 13, 2025
1 parent 6386f4e commit 3485b52
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 103 deletions.
8 changes: 7 additions & 1 deletion yarn-project/prover-client/src/mocks/test_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import { AvmFinalizedCallResult } from '../../../simulator/src/avm/avm_contract_
import { type AvmPersistableStateManager } from '../../../simulator/src/avm/journal/journal.js';
import { buildBlock } from '../block_builder/light.js';
import { ProvingOrchestrator } from '../orchestrator/index.js';
import { BrokerCircuitProverFacade } from '../proving_broker/broker_prover_facade.js';
import { TestBroker } from '../test/mock_prover.js';
import { getEnvironmentConfig, getSimulationProvider, makeGlobals, updateExpectedTreesFromTxs } from './fixtures.js';

Expand All @@ -53,6 +54,7 @@ export class TestContext {
public globalVariables: GlobalVariables,
public prover: ServerCircuitProver,
public broker: TestBroker,
public brokerProverFacade: BrokerCircuitProverFacade,
public orchestrator: TestProvingOrchestrator,
public blockNumber: number,
public directoriesToCleanup: string[],
Expand Down Expand Up @@ -114,9 +116,11 @@ export class TestContext {
}

const broker = new TestBroker(proverCount, localProver);
const orchestrator = new TestProvingOrchestrator(ws, broker.facade, Fr.ZERO);
const facade = new BrokerCircuitProverFacade(broker);
const orchestrator = new TestProvingOrchestrator(ws, facade, Fr.ZERO);

await broker.start();
facade.start();

return new this(
publicTxSimulator,
Expand All @@ -126,6 +130,7 @@ export class TestContext {
globalVariables,
localProver,
broker,
facade,
orchestrator,
blockNumber,
directoriesToCleanup,
Expand All @@ -148,6 +153,7 @@ export class TestContext {
}

async cleanup() {
await this.brokerProverFacade.stop();
await this.broker.stop();
for (const dir of this.directoriesToCleanup.filter(x => x !== '')) {
await fs.rm(dir, { recursive: true, force: true });
Expand Down
192 changes: 95 additions & 97 deletions yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import {
import { sha256 } from '@aztec/foundation/crypto';
import { createLogger } from '@aztec/foundation/log';
import { RunningPromise, promiseWithResolvers } from '@aztec/foundation/promise';
import { SerialQueue } from '@aztec/foundation/queue';
import { truncate } from '@aztec/foundation/string';

import { InlineProofStore, type ProofStore } from './proof_store/index.js';
Expand All @@ -52,18 +51,17 @@ const SNAPSHOT_SYNC_CHECK_MAX_REQUEST_SIZE = 1000;

type ProvingJob = {
id: ProvingJobId;
inputsUri?: ProofUri;
type: ProvingRequestType;
inputsUri: ProofUri;
promise: PromiseWithResolvers<any>;
abortFn?: () => void;
deferred: PromiseWithResolvers<any>;
abortFn: () => void;
signal?: AbortSignal;
};

export class BrokerCircuitProverFacade implements ServerCircuitProver {
private jobs: Map<ProvingJobId, ProvingJob> = new Map();
private runningPromise?: RunningPromise;
private timeOfLastSnapshotSync = Date.now();
private queue: SerialQueue = new SerialQueue();
private jobsToRetrieve: Set<ProvingJobId> = new Set();

constructor(
Expand All @@ -74,45 +72,26 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver {
private log = createLogger('prover-client:broker-circuit-prover-facade'),
) {}

private enqueueJob<T extends ProvingRequestType>(
/**
* This is a critical section. This function can not be async since it writes
* to the jobs map which acts as a mutex, ensuring a job is only ever created once.
*
* This could be called in a SerialQueue if it needs to become async.
*/
private getOrCreateProvingJob<T extends ProvingRequestType>(
id: ProvingJobId,
type: T,
inputs: ProvingJobInputsMap[T],
epochNumber = 0,
signal?: AbortSignal,
): Promise<ProvingJobResultsMap[T]> {
if (!this.queue) {
throw new Error('BrokerCircuitProverFacade not started');
}
return this.queue!.put(() => this._enqueueJob(id, type, inputs, epochNumber, signal)).then(
({ enqueuedPromise }) => enqueuedPromise,
);
}

private async _enqueueJob<T extends ProvingRequestType>(
id: ProvingJobId,
type: T,
inputs: ProvingJobInputsMap[T],
epochNumber = 0,
signal?: AbortSignal,
): Promise<{ enqueuedPromise: Promise<ProvingJobResultsMap[T]> }> {
): { job: ProvingJob; isEnqueued: boolean } {
// Check if there is already a promise for this job
const existingPromise = this.jobs.get(id);
if (existingPromise) {
const existingJob = this.jobs.get(id);
if (existingJob) {
this.log.verbose(`Job already found in facade id=${id} type=${ProvingRequestType[type]}`, {
provingJobId: id,
provingJobType: ProvingRequestType[type],
epochNumber,
});
return { enqueuedPromise: existingPromise.promise.promise as Promise<ProvingJobResultsMap[T]> };
return { job: existingJob, isEnqueued: true };
}
const inputsUri = await this.proofStore.saveProofInput(id, type, inputs);
const jobStatus = await this.broker.enqueueProvingJob({
id,
type,
inputsUri,
epochNumber,
});

// Create a promise for this job id, regardless of whether it was enqueued at the broker
// The running promise will monitor for the job to be completed and resolve it either way
Expand All @@ -124,62 +103,89 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver {
const job: ProvingJob = {
id,
type,
inputsUri,
promise,
deferred: promise,
abortFn,
signal,
};

this.jobs.set(id, job);
return { job, isEnqueued: false };
}

private async enqueueJob<T extends ProvingRequestType>(
id: ProvingJobId,
type: T,
inputs: ProvingJobInputsMap[T],
epochNumber = 0,
signal?: AbortSignal,
): Promise<ProvingJobResultsMap[T]> {
const { job: job, isEnqueued } = this.getOrCreateProvingJob(id, type, signal);
if (isEnqueued) {
return job.deferred.promise;
}

// If we are here then the job was successfully accepted by the broker
// the returned status is for before any action was performed
if (jobStatus.status === 'not-found') {
// Job added for the first time
// notify the broker if job is aborted
signal?.addEventListener('abort', abortFn);
try {
const inputsUri = await this.proofStore.saveProofInput(id, type, inputs);
job.inputsUri = inputsUri;
const jobStatus = await this.broker.enqueueProvingJob({
id,
type,
inputsUri,
epochNumber,
});

this.log.verbose(
`Job enqueued with broker id=${id} type=${ProvingRequestType[type]} epochNumber=${epochNumber}`,
{
provingJobId: id,
provingJobType: ProvingRequestType[type],
epochNumber,
inputsUri: truncate(inputsUri),
numOutstandingJobs: this.jobs.size,
},
);
} else if (jobStatus.status === 'fulfilled' || jobStatus.status === 'rejected') {
// Job was already completed by the broker
// No need to notify the broker on aborted job
job.abortFn = undefined;
this.log.verbose(
`Job already completed when sent to broker id=${id} type=${ProvingRequestType[type]} epochNumber=${epochNumber}`,
{
provingJobId: id,
provingJobType: ProvingRequestType[type],
epochNumber,
inputsUri: truncate(inputsUri),
},
);
// If we are here then the job was successfully accepted by the broker
// the returned status is for before any action was performed
if (jobStatus.status === 'fulfilled' || jobStatus.status === 'rejected') {
// Job was already completed by the broker
// No need to notify the broker on aborted job
this.log.verbose(
`Job already completed when sent to broker id=${id} type=${ProvingRequestType[type]} epochNumber=${epochNumber}`,
{
provingJobId: id,
provingJobType: ProvingRequestType[type],
epochNumber,
inputsUri: truncate(inputsUri),
},
);

// Job was not enqueued. It must be completed already, add to our set of already completed jobs
this.jobsToRetrieve.add(id);
} else {
// Job was previously sent to the broker but is not completed
// notify the broker if job is aborted
signal?.addEventListener('abort', abortFn);
this.log.verbose(
`Job already in queue or in progress when sent to broker id=${id} type=${ProvingRequestType[type]} epochNumber=${epochNumber}`,
{
provingJobId: id,
provingJobType: ProvingRequestType[type],
epochNumber,
inputsUri: truncate(inputsUri),
},
);
// Job was not enqueued. It must be completed already, add to our set of already completed jobs
this.jobsToRetrieve.add(id);
} else {
// notify the broker if job is aborted
signal?.addEventListener('abort', job.abortFn);

// Job added for the first time
if (jobStatus.status === 'not-found') {
this.log.verbose(
`Job enqueued with broker id=${id} type=${ProvingRequestType[type]} epochNumber=${epochNumber}`,
{
provingJobId: id,
provingJobType: ProvingRequestType[type],
epochNumber,
inputsUri: truncate(inputsUri),
numOutstandingJobs: this.jobs.size,
},
);
} else {
// Job was previously sent to the broker but is not completed
this.log.verbose(
`Job already in queue or in progress when sent to broker id=${id} type=${ProvingRequestType[type]} epochNumber=${epochNumber}`,
{
provingJobId: id,
provingJobType: ProvingRequestType[type],
epochNumber,
inputsUri: truncate(inputsUri),
},
);
}
}
} catch (err) {
this.jobs.delete(job.id);
job.deferred.reject(err);
}
const typedPromise = promise.promise as Promise<ProvingJobResultsMap[T]>;
return { enqueuedPromise: typedPromise };

return job.deferred.promise as Promise<ProvingJobResultsMap[T]>;
}

public start() {
Expand All @@ -191,9 +197,6 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver {

this.runningPromise = new RunningPromise(() => this.monitorForCompletedJobs(), this.log, this.pollIntervalMs);
this.runningPromise.start();

this.queue = new SerialQueue();
this.queue.start();
}

public async stop(): Promise<void> {
Expand All @@ -203,14 +206,9 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver {
this.log.verbose('Stopping BrokerCircuitProverFacade');
await this.runningPromise.stop();

if (this.queue) {
await this.queue.cancel();
await this.queue.end();
}

// Reject any outstanding promises as stopped
for (const [_, v] of this.jobs) {
v.promise.reject(new Error('Broker facade stopped'));
v.deferred.reject(new Error('Broker facade stopped'));
}
this.jobs.clear();
}
Expand Down Expand Up @@ -246,15 +244,15 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver {
return allCompleted;
};

const snapshotSyncIds = [];
const snapshotSyncIds: string[] = [];
const currentTime = Date.now();
const secondsSinceLastSnapshotSync = currentTime - this.timeOfLastSnapshotSync;
if (secondsSinceLastSnapshotSync > SNAPSHOT_SYNC_INTERVAL_MS) {
this.timeOfLastSnapshotSync = currentTime;
snapshotSyncIds.push(...this.jobs.keys());
this.log.trace(`Performing full snapshot sync of completed jobs with ${snapshotSyncIds.length} job(s)`);
} else {
this.log.trace(`Performing incremental sync of completed jobs`);
this.log.trace(`Performing incremental sync of completed jobs`, { snapshotSyncIds });
}

// Now request the notifications from the broker
Expand Down Expand Up @@ -329,7 +327,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver {

if (result.success) {
this.log.verbose(`Resolved proving job id=${job.id} type=${ProvingRequestType[job.type]}`);
job.promise.resolve(result.result);
job.deferred.resolve(result.result);
} else {
this.log.error(
`Resolving proving job with error id=${job.id} type=${ProvingRequestType[job.type]}`,
Expand All @@ -338,7 +336,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver {
if (result.reason !== 'Aborted') {
void this.backupFailedProofInputs(job);
}
job.promise.reject(new Error(result.reason));
job.deferred.reject(new Error(result.reason));
}

if (job.abortFn && job.signal) {
Expand Down Expand Up @@ -370,7 +368,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver {

private async backupFailedProofInputs(job: ProvingJob) {
try {
if (!this.failedProofStore) {
if (!this.failedProofStore || !job.inputsUri) {
return;
}
const inputs = await this.proofStore.getProofInput(job.inputsUri);
Expand Down
5 changes: 0 additions & 5 deletions yarn-project/prover-client/src/test/mock_prover.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import {
} from '@aztec/circuits.js/testing';
import { times } from '@aztec/foundation/collection';

import { BrokerCircuitProverFacade } from '../proving_broker/broker_prover_facade.js';
import { InlineProofStore, type ProofStore } from '../proving_broker/proof_store/index.js';
import { ProvingAgent } from '../proving_broker/proving_agent.js';
import { ProvingBroker } from '../proving_broker/proving_broker.js';
Expand All @@ -53,7 +52,6 @@ import { InMemoryBrokerDatabase } from '../proving_broker/proving_broker_databas
export class TestBroker implements ProvingJobProducer {
private broker: ProvingBroker;
private agents: ProvingAgent[];
public facade: BrokerCircuitProverFacade;

constructor(
agentCount: number,
Expand All @@ -66,19 +64,16 @@ export class TestBroker implements ProvingJobProducer {
agentCount,
() => new ProvingAgent(this.broker, proofStore, prover, undefined, agentPollInterval),
);
this.facade = new BrokerCircuitProverFacade(this.broker, proofStore);
}

public async start() {
await this.broker.start();
this.agents.forEach(agent => agent.start());
this.facade.start();
}

public async stop() {
await Promise.all(this.agents.map(agent => agent.stop()));
await this.broker.stop();
await this.facade.stop();
}

public getProofStore(): ProofStore {
Expand Down

0 comments on commit 3485b52

Please sign in to comment.