diff --git a/yarn-project/prover-client/package.json b/yarn-project/prover-client/package.json index 06190f73f53..3caaee4489a 100644 --- a/yarn-project/prover-client/package.json +++ b/yarn-project/prover-client/package.json @@ -6,7 +6,6 @@ ".": "./dest/index.js", "./block-builder": "./dest/block_builder/index.js", "./broker": "./dest/proving_broker/index.js", - "./prover-agent": "./dest/prover-agent/index.js", "./orchestrator": "./dest/orchestrator/index.js", "./helpers": "./dest/orchestrator/block-building-helpers.js", "./config": "./dest/config.js" diff --git a/yarn-project/prover-client/src/mocks/test_context.ts b/yarn-project/prover-client/src/mocks/test_context.ts index b635187f08f..8bc3c05980d 100644 --- a/yarn-project/prover-client/src/mocks/test_context.ts +++ b/yarn-project/prover-client/src/mocks/test_context.ts @@ -27,7 +27,6 @@ import { WASMSimulatorWithBlobs, type WorldStateDB, } from '@aztec/simulator/server'; -import { getTelemetryClient } from '@aztec/telemetry-client'; import { type MerkleTreeAdminDatabase } from '@aztec/world-state'; import { NativeWorldStateService } from '@aztec/world-state/native'; @@ -40,8 +39,7 @@ import { AvmFinalizedCallResult } from '../../../simulator/src/avm/avm_contract_ import { type AvmPersistableStateManager } from '../../../simulator/src/avm/journal/journal.js'; import { buildBlock } from '../block_builder/light.js'; import { ProvingOrchestrator } from '../orchestrator/index.js'; -import { MemoryProvingQueue } from '../prover-agent/memory-proving-queue.js'; -import { ProverAgent } from '../prover-agent/prover-agent.js'; +import { TestBroker } from '../test/mock_prover.js'; import { getEnvironmentConfig, getSimulationProvider, makeGlobals, updateExpectedTreesFromTxs } from './fixtures.js'; export class TestContext { @@ -54,7 +52,7 @@ export class TestContext { public simulationProvider: SimulationProvider, public globalVariables: GlobalVariables, public prover: ServerCircuitProver, - public proverAgent: ProverAgent, + public broker: TestBroker, public orchestrator: TestProvingOrchestrator, public blockNumber: number, public directoriesToCleanup: string[], @@ -115,12 +113,10 @@ export class TestContext { directoriesToCleanup.push(config.directoryToCleanup); } - const queue = new MemoryProvingQueue(getTelemetryClient()); - const orchestrator = new TestProvingOrchestrator(ws, queue, Fr.ZERO); - const agent = new ProverAgent(localProver, proverCount, undefined); + const broker = new TestBroker(proverCount, localProver); + const orchestrator = new TestProvingOrchestrator(ws, broker.facade, Fr.ZERO); - queue.start(); - agent.start(queue); + await broker.start(); return new this( publicTxSimulator, @@ -129,7 +125,7 @@ export class TestContext { simulationProvider, globalVariables, localProver, - agent, + broker, orchestrator, blockNumber, directoriesToCleanup, @@ -152,7 +148,7 @@ export class TestContext { } async cleanup() { - await this.proverAgent.stop(); + await this.broker.stop(); for (const dir of this.directoriesToCleanup.filter(x => x !== '')) { await fs.rm(dir, { recursive: true, force: true }); } diff --git a/yarn-project/prover-client/src/prover-agent/agent-queue-integration.test.ts b/yarn-project/prover-client/src/prover-agent/agent-queue-integration.test.ts deleted file mode 100644 index a6fd9009c8c..00000000000 --- a/yarn-project/prover-client/src/prover-agent/agent-queue-integration.test.ts +++ /dev/null @@ -1,132 +0,0 @@ -import { - type PublicInputsAndRecursiveProof, - type ServerCircuitProver, - makePublicInputsAndRecursiveProof, -} from '@aztec/circuit-types'; -import { - type ParityPublicInputs, - RECURSIVE_PROOF_LENGTH, - VerificationKeyData, - makeRecursiveProof, -} from '@aztec/circuits.js'; -import { makeBaseParityInputs, makeParityPublicInputs } from '@aztec/circuits.js/testing'; -import { AbortError } from '@aztec/foundation/error'; -import { promiseWithResolvers } from '@aztec/foundation/promise'; -import { sleep } from '@aztec/foundation/sleep'; -import { getTelemetryClient } from '@aztec/telemetry-client'; - -import { type MockProxy, mock } from 'jest-mock-extended'; - -import { MemoryProvingQueue } from './memory-proving-queue.js'; -import { ProverAgent } from './prover-agent.js'; - -describe('Prover agent <-> queue integration', () => { - let queue: MemoryProvingQueue; - let agent: ProverAgent; - let prover: MockProxy; - let agentPollInterval: number; - let queuePollInterval: number; - let queueJobTimeout: number; - - beforeEach(() => { - prover = mock(); - - queueJobTimeout = 100; - queuePollInterval = 10; - queue = new MemoryProvingQueue(getTelemetryClient(), queueJobTimeout, queuePollInterval); - - agentPollInterval = 10; - agent = new ProverAgent(prover, 1, agentPollInterval); - - queue.start(); - agent.start(queue); - }); - - afterEach(async () => { - await agent.stop(); - await queue.stop(); - }); - - it('picks up jobs from the queue', async () => { - const { promise, resolve } = - promiseWithResolvers>(); - const output = makePublicInputsAndRecursiveProof( - makeParityPublicInputs(1), - makeRecursiveProof(RECURSIVE_PROOF_LENGTH), - VerificationKeyData.makeFakeHonk(), - ); - prover.getBaseParityProof.mockResolvedValueOnce(promise); - const proofPromise = queue.getBaseParityProof(makeBaseParityInputs()); - - await sleep(agentPollInterval); - resolve(output); - await expect(proofPromise).resolves.toEqual(output); - }); - - it('keeps job alive', async () => { - const { promise, resolve } = - promiseWithResolvers>(); - const output = makePublicInputsAndRecursiveProof( - makeParityPublicInputs(1), - makeRecursiveProof(RECURSIVE_PROOF_LENGTH), - VerificationKeyData.makeFakeHonk(), - ); - prover.getBaseParityProof.mockResolvedValueOnce(promise); - const proofPromise = queue.getBaseParityProof(makeBaseParityInputs()); - - await sleep(2 * queueJobTimeout); - resolve(output); - await expect(proofPromise).resolves.toEqual(output); - }); - - it('reports cancellations', async () => { - const { promise, resolve } = - promiseWithResolvers>(); - const output = makePublicInputsAndRecursiveProof( - makeParityPublicInputs(1), - makeRecursiveProof(RECURSIVE_PROOF_LENGTH), - VerificationKeyData.makeFakeHonk(), - ); - prover.getBaseParityProof.mockResolvedValueOnce(promise); - const controller = new AbortController(); - const proofPromise = queue.getBaseParityProof(makeBaseParityInputs(), controller.signal); - await sleep(agentPollInterval); - controller.abort(); - resolve(output); - await expect(proofPromise).rejects.toThrow(AbortError); - }); - - it('re-queues timed out jobs', async () => { - const firstRun = - promiseWithResolvers>(); - - const output = makePublicInputsAndRecursiveProof( - makeParityPublicInputs(1), - makeRecursiveProof(RECURSIVE_PROOF_LENGTH), - VerificationKeyData.makeFakeHonk(), - ); - prover.getBaseParityProof.mockResolvedValueOnce(firstRun.promise); - const proofPromise = queue.getBaseParityProof(makeBaseParityInputs()); - - // stop the agent to simulate a machine going down - await agent.stop(); - - // give the queue a chance to figure out the node is timed out and re-queue the job - await sleep(queueJobTimeout); - // reset the mock - const secondRun = - promiseWithResolvers>(); - - prover.getBaseParityProof.mockResolvedValueOnce(secondRun.promise); - const newAgent = new ProverAgent(prover, 1, agentPollInterval); - newAgent.start(queue); - // test that the job is re-queued and kept alive by the new agent - await sleep(queueJobTimeout * 2); - secondRun.resolve(output); - await expect(proofPromise).resolves.toEqual(output); - - firstRun.reject(new Error('stop this promise otherwise it hangs jest')); - - await newAgent.stop(); - }); -}); diff --git a/yarn-project/prover-client/src/prover-agent/agent-queue-rpc-integration.test.ts b/yarn-project/prover-client/src/prover-agent/agent-queue-rpc-integration.test.ts deleted file mode 100644 index 2169adb4f02..00000000000 --- a/yarn-project/prover-client/src/prover-agent/agent-queue-rpc-integration.test.ts +++ /dev/null @@ -1,78 +0,0 @@ -import { ProvingJobSourceSchema, type ServerCircuitProver } from '@aztec/circuit-types'; -import { ClientIvcProof } from '@aztec/circuits.js'; -import { TubeInputs } from '@aztec/circuits.js/rollup'; -import { - makeAvmCircuitInputs, - makeBaseParityInputs, - makeBlockMergeRollupInputs, - makeBlockRootRollupInputs, - makeEmptyBlockRootRollupInputs, - makeMergeRollupInputs, - makePrivateBaseRollupInputs, - makePublicBaseRollupInputs, - makeRootParityInputs, - makeRootRollupInputs, - makeSingleTxBlockRootRollupInputs, -} from '@aztec/circuits.js/testing'; -import { createSafeJsonRpcClient } from '@aztec/foundation/json-rpc/client'; -import { type SafeJsonRpcServer } from '@aztec/foundation/json-rpc/server'; -import { getTelemetryClient } from '@aztec/telemetry-client'; - -import getPort from 'get-port'; - -import { MockProver } from '../test/mock_prover.js'; -import { MemoryProvingQueue } from './memory-proving-queue.js'; -import { ProverAgent } from './prover-agent.js'; -import { createProvingJobSourceServer } from './rpc.js'; - -describe('Prover agent <-> queue integration', () => { - let queue: MemoryProvingQueue; - let queueRpcServer: SafeJsonRpcServer; - let agent: ProverAgent; - let prover: ServerCircuitProver; - - type MakeInputs = { - [K in keyof ServerCircuitProver]: () => Promise[0]>; - }; - - const makeInputs: MakeInputs = { - getAvmProof: makeAvmCircuitInputs, - getBaseParityProof: (...args) => Promise.resolve(makeBaseParityInputs(...args)), - getPrivateBaseRollupProof: (...args) => Promise.resolve(makePrivateBaseRollupInputs(...args)), - getPublicBaseRollupProof: (...args) => Promise.resolve(makePublicBaseRollupInputs(...args)), - getRootParityProof: (...args) => Promise.resolve(makeRootParityInputs(...args)), - getBlockMergeRollupProof: (...args) => Promise.resolve(makeBlockMergeRollupInputs(...args)), - getEmptyBlockRootRollupProof: (...args) => Promise.resolve(makeEmptyBlockRootRollupInputs(...args)), - getBlockRootRollupProof: (...args) => Promise.resolve(makeBlockRootRollupInputs(...args)), - getSingleTxBlockRootRollupProof: (...args) => Promise.resolve(makeSingleTxBlockRootRollupInputs(...args)), - getMergeRollupProof: (...args) => Promise.resolve(makeMergeRollupInputs(...args)), - getRootRollupProof: (...args) => Promise.resolve(makeRootRollupInputs(...args)), - getTubeProof: () => Promise.resolve(new TubeInputs(ClientIvcProof.empty())), - }; - - beforeEach(async () => { - prover = new MockProver(); - - queue = new MemoryProvingQueue(getTelemetryClient(), 100, 10); - queue.start(); - const port = await getPort(); - queueRpcServer = createProvingJobSourceServer(queue); - queueRpcServer.start(port); - - agent = new ProverAgent(prover, 1, 10); - const queueRpcClient = createSafeJsonRpcClient(`http://127.0.0.1:${port}`, ProvingJobSourceSchema); - agent.start(queueRpcClient); - }); - - afterEach(async () => { - await agent.stop(); - await queueRpcServer.stop(); - await queue.stop(); - }); - - // TODO: This test hangs instead of failing when the Inputs are not registered on the RPC wrapper - it.each(Object.entries(makeInputs))('can call %s over JSON-RPC', async (fnName, makeInputs) => { - const resp = await queue[fnName as keyof ServerCircuitProver]((await makeInputs()) as any); - expect(resp).toBeDefined(); - }); -}); diff --git a/yarn-project/prover-client/src/prover-agent/index.ts b/yarn-project/prover-client/src/prover-agent/index.ts deleted file mode 100644 index f9aafea995b..00000000000 --- a/yarn-project/prover-client/src/prover-agent/index.ts +++ /dev/null @@ -1,3 +0,0 @@ -export * from './prover-agent.js'; -export * from './memory-proving-queue.js'; -export * from './rpc.js'; diff --git a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.test.ts b/yarn-project/prover-client/src/prover-agent/memory-proving-queue.test.ts deleted file mode 100644 index 465ddb60e12..00000000000 --- a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.test.ts +++ /dev/null @@ -1,161 +0,0 @@ -import { ProvingRequestType, makePublicInputsAndRecursiveProof } from '@aztec/circuit-types'; -import { RECURSIVE_PROOF_LENGTH, VerificationKeyData, makeRecursiveProof } from '@aztec/circuits.js'; -import { - makeBaseParityInputs, - makeParityPublicInputs, - makePrivateBaseRollupInputs, - makePublicBaseRollupInputs, - makeRootRollupInputs, -} from '@aztec/circuits.js/testing'; -import { AbortError } from '@aztec/foundation/error'; -import { sleep } from '@aztec/foundation/sleep'; -import { getTelemetryClient } from '@aztec/telemetry-client'; - -import { InlineProofStore, type ProofStore } from '../proving_broker/proof_store/index.js'; -import { MemoryProvingQueue } from './memory-proving-queue.js'; - -describe('MemoryProvingQueue', () => { - let queue: MemoryProvingQueue; - let jobTimeoutMs: number; - let pollingIntervalMs: number; - let proofStore: ProofStore; - - beforeEach(() => { - jobTimeoutMs = 100; - pollingIntervalMs = 10; - proofStore = new InlineProofStore(); - queue = new MemoryProvingQueue( - getTelemetryClient(), - jobTimeoutMs, - pollingIntervalMs, - undefined, - undefined, - proofStore, - ); - queue.start(); - }); - - afterEach(async () => { - await queue.stop(); - }); - - it('returns jobs in order', async () => { - void queue.getBaseParityProof(makeBaseParityInputs()); - void queue.getPrivateBaseRollupProof(makePrivateBaseRollupInputs()); - - const job1 = await queue.getProvingJob(); - expect(job1?.type).toEqual(ProvingRequestType.BASE_PARITY); - - const job2 = await queue.getProvingJob(); - expect(job2?.type).toEqual(ProvingRequestType.PRIVATE_BASE_ROLLUP); - }); - - it('returns jobs ordered by priority', async () => { - // We push base rollup proof requests for a first block - void queue.getPrivateBaseRollupProof(makePrivateBaseRollupInputs(), undefined, 1); - void queue.getPublicBaseRollupProof(makePublicBaseRollupInputs(), undefined, 1); - - // The agent consumes one of them - expect((await queue.getProvingJob())!.type).toEqual(ProvingRequestType.PRIVATE_BASE_ROLLUP); - - // A new block comes along with its base rollups, and the orchestrator then pushes a root request for the first one - void queue.getPublicBaseRollupProof(makePublicBaseRollupInputs(), undefined, 2); - void queue.getPrivateBaseRollupProof(makePrivateBaseRollupInputs(), undefined, 2); - void queue.getPrivateBaseRollupProof(makePrivateBaseRollupInputs(), undefined, 2); - void queue.getPublicBaseRollupProof(makePublicBaseRollupInputs(), undefined, 2); - void queue.getRootRollupProof(makeRootRollupInputs(), undefined, 1); - - // The next jobs for the agent should be the ones from block 1, skipping the ones for block 2 - expect((await queue.getProvingJob())!.type).toEqual(ProvingRequestType.PUBLIC_BASE_ROLLUP); - expect((await queue.getProvingJob())!.type).toEqual(ProvingRequestType.ROOT_ROLLUP); - - // And the base rollups for block 2 should go next - expect((await queue.getProvingJob())!.type).toEqual(ProvingRequestType.PUBLIC_BASE_ROLLUP); - expect((await queue.getProvingJob())!.type).toEqual(ProvingRequestType.PRIVATE_BASE_ROLLUP); - expect((await queue.getProvingJob())!.type).toEqual(ProvingRequestType.PRIVATE_BASE_ROLLUP); - expect((await queue.getProvingJob())!.type).toEqual(ProvingRequestType.PUBLIC_BASE_ROLLUP); - }); - - it('returns undefined when no jobs are available', async () => { - await expect(queue.getProvingJob({ timeoutSec: 0 })).resolves.toBeUndefined(); - }); - - it('notifies of completion', async () => { - const inputs = makeBaseParityInputs(); - const promise = queue.getBaseParityProof(inputs); - - const job = await queue.getProvingJob(); - const jobInputs = await proofStore.getProofInput(job!.inputsUri); - expect(jobInputs.inputs).toEqual(inputs); - - const publicInputs = makeParityPublicInputs(); - const proof = makeRecursiveProof(RECURSIVE_PROOF_LENGTH); - const vk = VerificationKeyData.makeFakeHonk(); - const result = makePublicInputsAndRecursiveProof(publicInputs, proof, vk); - await queue.resolveProvingJob(job!.id, { - type: ProvingRequestType.BASE_PARITY, - result, - }); - await expect(promise).resolves.toEqual(result); - }); - - it('retries failed jobs', async () => { - const inputs = makeBaseParityInputs(); - void queue.getBaseParityProof(inputs); - - const job = await queue.getProvingJob(); - const proofInput = await proofStore.getProofInput(job!.inputsUri); - expect(proofInput.inputs).toEqual(inputs); - - const error = new Error('test error'); - - await queue.rejectProvingJob(job!.id, error.message); - await expect(queue.getProvingJob()).resolves.toEqual(job); - }); - - it('notifies errors', async () => { - const promise = queue.getBaseParityProof(makeBaseParityInputs()); - - const error = new Error('test error'); - await queue.rejectProvingJob((await queue.getProvingJob())!.id, error.message); - await queue.rejectProvingJob((await queue.getProvingJob())!.id, error.message); - await queue.rejectProvingJob((await queue.getProvingJob())!.id, error.message); - - await expect(promise).rejects.toEqual(error); - }); - - it('reaps timed out jobs', async () => { - const controller = new AbortController(); - const promise = queue.getBaseParityProof(makeBaseParityInputs(), controller.signal); - const job = await queue.getProvingJob(); - - expect(queue.isJobRunning(job!.id)).toBe(true); - await sleep(jobTimeoutMs + 2 * pollingIntervalMs); - expect(queue.isJobRunning(job!.id)).toBe(false); - - controller.abort(); - await expect(promise).rejects.toThrow(AbortError); - }); - - it('keeps jobs running while heartbeat is called', async () => { - const promise = queue.getBaseParityProof(makeBaseParityInputs()); - const job = await queue.getProvingJob(); - - expect(queue.isJobRunning(job!.id)).toBe(true); - await sleep(pollingIntervalMs); - expect(queue.isJobRunning(job!.id)).toBe(true); - - await queue.heartbeat(job!.id); - expect(queue.isJobRunning(job!.id)).toBe(true); - await sleep(pollingIntervalMs); - expect(queue.isJobRunning(job!.id)).toBe(true); - - const output = makePublicInputsAndRecursiveProof( - makeParityPublicInputs(), - makeRecursiveProof(RECURSIVE_PROOF_LENGTH), - VerificationKeyData.makeFakeHonk(), - ); - await queue.resolveProvingJob(job!.id, { type: ProvingRequestType.BASE_PARITY, result: output }); - await expect(promise).resolves.toEqual(output); - }); -}); diff --git a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts b/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts deleted file mode 100644 index 182d7733f58..00000000000 --- a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts +++ /dev/null @@ -1,416 +0,0 @@ -import { - type ProofAndVerificationKey, - type ProvingJob, - type ProvingJobInputsMap, - type ProvingJobSource, - type ProvingRequestResultFor, - ProvingRequestType, - type PublicInputsAndRecursiveProof, - type ServerCircuitProver, -} from '@aztec/circuit-types'; -import type { - AVM_PROOF_LENGTH_IN_FIELDS, - AvmCircuitInputs, - BaseParityInputs, - NESTED_RECURSIVE_PROOF_LENGTH, - NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH, - ParityPublicInputs, - RECURSIVE_PROOF_LENGTH, - RootParityInputs, - TUBE_PROOF_LENGTH, -} from '@aztec/circuits.js'; -import { - type BaseOrMergeRollupPublicInputs, - type BlockMergeRollupInputs, - type BlockRootOrBlockMergePublicInputs, - type BlockRootRollupInputs, - type EmptyBlockRootRollupInputs, - type MergeRollupInputs, - type PrivateBaseRollupInputs, - type PublicBaseRollupInputs, - type RootRollupInputs, - type RootRollupPublicInputs, - type SingleTxBlockRootRollupInputs, - type TubeInputs, -} from '@aztec/circuits.js/rollup'; -import { randomBytes } from '@aztec/foundation/crypto'; -import { AbortError, TimeoutError } from '@aztec/foundation/error'; -import { createLogger } from '@aztec/foundation/log'; -import { type PromiseWithResolvers, RunningPromise, promiseWithResolvers } from '@aztec/foundation/promise'; -import { PriorityMemoryQueue } from '@aztec/foundation/queue'; -import { type TelemetryClient, type Tracer, trackSpan } from '@aztec/telemetry-client'; - -import { InlineProofStore, type ProofStore } from '../proving_broker/proof_store/index.js'; -import { ProvingQueueMetrics } from './queue_metrics.js'; - -type ProvingJobWithResolvers = ProvingJob & - PromiseWithResolvers> & { - signal?: AbortSignal; - epochNumber?: number; - attempts: number; - heartbeat: number; - }; - -const MAX_RETRIES = 3; - -const defaultIdGenerator = () => randomBytes(4).toString('hex'); -const defaultTimeSource = () => Date.now(); -/** - * A helper class that sits in between services that need proofs created and agents that can create them. - * The queue accumulates jobs and provides them to agents prioritized by block number. - */ -export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource { - private log = createLogger('prover-client:prover-pool:queue'); - private queue = new PriorityMemoryQueue( - (a, b) => (a.epochNumber ?? 0) - (b.epochNumber ?? 0), - ); - private jobsInProgress = new Map(); - private runningPromise: RunningPromise; - private metrics: ProvingQueueMetrics; - - public readonly tracer: Tracer; - - constructor( - client: TelemetryClient, - /** Timeout the job if an agent doesn't report back in this time */ - private jobTimeoutMs = 60 * 1000, - /** How often to check for timed out jobs */ - pollingIntervalMs = 1000, - private generateId = defaultIdGenerator, - private timeSource = defaultTimeSource, - private proofStore: ProofStore = new InlineProofStore(), - ) { - this.tracer = client.getTracer('MemoryProvingQueue'); - this.metrics = new ProvingQueueMetrics(client, 'MemoryProvingQueue'); - this.runningPromise = new RunningPromise(this.poll.bind(this), this.log, pollingIntervalMs); - } - - public start() { - if (this.runningPromise.isRunning()) { - this.log.warn('Proving queue is already running'); - return; - } - - this.runningPromise.start(); - this.log.info('Proving queue started'); - } - - public async stop() { - if (!this.runningPromise.isRunning()) { - this.log.warn('Proving queue is already stopped'); - return; - } - - await this.runningPromise.stop(); - this.log.info('Proving queue stopped'); - } - - public async getProvingJob({ timeoutSec = 1 } = {}): Promise { - if (!this.runningPromise.isRunning()) { - throw new Error('Proving queue is not running. Start the queue before getting jobs.'); - } - - try { - const job = await this.queue.get(timeoutSec); - if (!job) { - return undefined; - } - - if (job.signal?.aborted) { - return undefined; - } - - job.heartbeat = this.timeSource(); - this.jobsInProgress.set(job.id, job); - return { - id: job.id, - type: job.type, - inputsUri: job.inputsUri, - epochNumber: job.epochNumber, - }; - } catch (err) { - if (err instanceof TimeoutError) { - return undefined; - } - - throw err; - } - } - - resolveProvingJob(jobId: string, result: ProvingRequestResultFor): Promise { - if (!this.runningPromise.isRunning()) { - throw new Error('Proving queue is not running.'); - } - - const job = this.jobsInProgress.get(jobId); - if (!job) { - this.log.warn(`Job id=${jobId} not found. Can't resolve`); - return Promise.resolve(); - } - - this.jobsInProgress.delete(jobId); - if (!job.signal?.aborted) { - job.resolve(result); - } - - return Promise.resolve(); - } - - rejectProvingJob(jobId: string, reason: string): Promise { - if (!this.runningPromise.isRunning()) { - throw new Error('Proving queue is not running.'); - } - - const job = this.jobsInProgress.get(jobId); - if (!job) { - this.log.warn(`Job id=${jobId} not found. Can't reject`); - return Promise.resolve(); - } - - this.jobsInProgress.delete(jobId); - - if (job.signal?.aborted) { - return Promise.resolve(); - } - - // every job should be retried with the exception of the public VM since its in development and can fail - if (job.attempts < MAX_RETRIES && job.type !== ProvingRequestType.PUBLIC_VM) { - job.attempts++; - this.log.warn( - `Job id=${job.id} type=${ProvingRequestType[job.type]} failed with error: ${reason}. Retry ${ - job.attempts - }/${MAX_RETRIES}`, - ); - this.queue.put(job); - } else { - const logFn = - job.type === ProvingRequestType.PUBLIC_VM && !process.env.AVM_PROVING_STRICT ? this.log.warn : this.log.error; - logFn(`Job id=${job.id} type=${ProvingRequestType[job.type]} failed with error: ${reason}`); - job.reject(new Error(reason)); - } - return Promise.resolve(); - } - - public heartbeat(jobId: string): Promise { - if (!this.runningPromise.isRunning()) { - throw new Error('Proving queue is not running.'); - } - - const job = this.jobsInProgress.get(jobId); - if (job) { - job.heartbeat = this.timeSource(); - } - - return Promise.resolve(); - } - - public isJobRunning(jobId: string): boolean { - return this.jobsInProgress.has(jobId); - } - - @trackSpan('MemoryProvingQueue.poll') - private poll() { - const now = this.timeSource(); - this.metrics.recordQueueSize(this.queue.length()); - - for (const job of this.jobsInProgress.values()) { - if (job.signal?.aborted) { - this.jobsInProgress.delete(job.id); - continue; - } - - if (job.heartbeat + this.jobTimeoutMs < now) { - this.log.warn(`Job ${job.id} type=${ProvingRequestType[job.type]} has timed out`); - - this.jobsInProgress.delete(job.id); - job.heartbeat = 0; - this.queue.put(job); - } - } - } - - private async enqueue( - type: T, - inputs: ProvingJobInputsMap[T], - signal?: AbortSignal, - epochNumber?: number, - ): Promise['result']> { - if (!this.runningPromise.isRunning()) { - return Promise.reject(new Error('Proving queue is not running.')); - } - - const { promise, resolve, reject } = promiseWithResolvers>(); - const id = this.generateId(); - const inputsUri = await this.proofStore.saveProofInput(id, type, inputs); - const item: ProvingJobWithResolvers = { - id, - type, - inputsUri, - signal, - promise, - resolve, - reject, - attempts: 1, - heartbeat: 0, - epochNumber: epochNumber ?? 0, - }; - - if (signal) { - signal.addEventListener('abort', () => reject(new AbortError('Operation has been aborted'))); - } - - this.log.debug( - `Adding id=${item.id} type=${ProvingRequestType[type]} proving job to queue depth=${this.queue.length()}`, - ); - - if (!this.queue.put(item as ProvingJobWithResolvers)) { - throw new Error(); - } - - return promise.then(({ result }) => result); - } - - getTubeProof( - inputs: TubeInputs, - signal?: AbortSignal, - epochNumber?: number, - ): Promise> { - return this.enqueue(ProvingRequestType.TUBE_PROOF, inputs, signal, epochNumber); - } - - /** - * Creates a proof for the given input. - * @param input - Input to the circuit. - */ - getBaseParityProof( - inputs: BaseParityInputs, - signal?: AbortSignal, - epochNumber?: number, - ): Promise> { - return this.enqueue(ProvingRequestType.BASE_PARITY, inputs, signal, epochNumber); - } - - /** - * Creates a proof for the given input. - * @param input - Input to the circuit. - */ - getRootParityProof( - inputs: RootParityInputs, - signal?: AbortSignal, - epochNumber?: number, - ): Promise> { - return this.enqueue(ProvingRequestType.ROOT_PARITY, inputs, signal, epochNumber); - } - - getPrivateBaseRollupProof( - inputs: PrivateBaseRollupInputs, - signal?: AbortSignal, - epochNumber?: number, - ): Promise< - PublicInputsAndRecursiveProof - > { - return this.enqueue(ProvingRequestType.PRIVATE_BASE_ROLLUP, inputs, signal, epochNumber); - } - - getPublicBaseRollupProof( - inputs: PublicBaseRollupInputs, - signal?: AbortSignal, - epochNumber?: number, - ): Promise< - PublicInputsAndRecursiveProof - > { - return this.enqueue(ProvingRequestType.PUBLIC_BASE_ROLLUP, inputs, signal, epochNumber); - } - - /** - * Creates a proof for the given input. - * @param input - Input to the circuit. - */ - getMergeRollupProof( - inputs: MergeRollupInputs, - signal?: AbortSignal, - epochNumber?: number, - ): Promise< - PublicInputsAndRecursiveProof - > { - return this.enqueue(ProvingRequestType.MERGE_ROLLUP, inputs, signal, epochNumber); - } - - /** - * Creates a proof for the given input. - * @param input - Input to the circuit. - */ - getBlockRootRollupProof( - inputs: BlockRootRollupInputs, - signal?: AbortSignal, - epochNumber?: number, - ): Promise< - PublicInputsAndRecursiveProof - > { - return this.enqueue(ProvingRequestType.BLOCK_ROOT_ROLLUP, inputs, signal, epochNumber); - } - - getSingleTxBlockRootRollupProof( - inputs: SingleTxBlockRootRollupInputs, - signal?: AbortSignal, - epochNumber?: number, - ): Promise< - PublicInputsAndRecursiveProof - > { - return this.enqueue(ProvingRequestType.SINGLE_TX_BLOCK_ROOT_ROLLUP, inputs, signal, epochNumber); - } - - getEmptyBlockRootRollupProof( - inputs: EmptyBlockRootRollupInputs, - signal?: AbortSignal, - epochNumber?: number, - ): Promise< - PublicInputsAndRecursiveProof - > { - return this.enqueue(ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP, inputs, signal, epochNumber); - } - - /** - * Creates a proof for the given input. - * @param input - Input to the circuit. - */ - getBlockMergeRollupProof( - inputs: BlockMergeRollupInputs, - signal?: AbortSignal, - epochNumber?: number, - ): Promise< - PublicInputsAndRecursiveProof - > { - return this.enqueue(ProvingRequestType.BLOCK_MERGE_ROLLUP, inputs, signal, epochNumber); - } - - /** - * Creates a proof for the given input. - * @param input - Input to the circuit. - */ - getRootRollupProof( - inputs: RootRollupInputs, - signal?: AbortSignal, - epochNumber?: number, - ): Promise> { - return this.enqueue(ProvingRequestType.ROOT_ROLLUP, inputs, signal, epochNumber); - } - - /** - * Creates an AVM proof. - */ - getAvmProof( - inputs: AvmCircuitInputs, - signal?: AbortSignal, - epochNumber?: number, - ): Promise> { - return this.enqueue(ProvingRequestType.PUBLIC_VM, inputs, signal, epochNumber); - } - - /** - * Verifies a circuit proof - */ - verifyProof(): Promise { - return Promise.reject('not implemented'); - } -} diff --git a/yarn-project/prover-client/src/prover-agent/prover-agent.test.ts b/yarn-project/prover-client/src/prover-agent/prover-agent.test.ts deleted file mode 100644 index 8af88de41cf..00000000000 --- a/yarn-project/prover-client/src/prover-agent/prover-agent.test.ts +++ /dev/null @@ -1,77 +0,0 @@ -import { type ServerCircuitProver, makePublicInputsAndRecursiveProof } from '@aztec/circuit-types'; -import { RECURSIVE_PROOF_LENGTH, VerificationKeyData, makeRecursiveProof } from '@aztec/circuits.js'; -import { makeBaseParityInputs, makeParityPublicInputs } from '@aztec/circuits.js/testing'; -import { getTelemetryClient } from '@aztec/telemetry-client'; - -import { type MockProxy, mock } from 'jest-mock-extended'; - -import { MemoryProvingQueue } from './memory-proving-queue.js'; -import { ProverAgent } from './prover-agent.js'; - -describe('ProverAgent', () => { - let queue: MemoryProvingQueue; - let agent: ProverAgent; - let prover: MockProxy; - - beforeEach(() => { - prover = mock(); - queue = new MemoryProvingQueue(getTelemetryClient()); - agent = new ProverAgent(prover); - }); - - beforeEach(() => { - queue.start(); - agent.start(queue); - }); - - afterEach(async () => { - await agent.stop(); - await queue.stop(); - }); - - it('takes jobs from the queue', async () => { - const publicInputs = makeParityPublicInputs(); - const proof = makeRecursiveProof(RECURSIVE_PROOF_LENGTH); - const vk = VerificationKeyData.makeFakeHonk(); - prover.getBaseParityProof.mockResolvedValue(makePublicInputsAndRecursiveProof(publicInputs, proof, vk)); - - const inputs = makeBaseParityInputs(); - - const promise = queue.getBaseParityProof(inputs); - await expect(promise).resolves.toEqual(makePublicInputsAndRecursiveProof(publicInputs, proof, vk)); - - expect(prover.getBaseParityProof).toHaveBeenCalledWith(inputs); - }); - - it('reports errors', async () => { - const error = new Error('test error'); - prover.getBaseParityProof.mockRejectedValue(error); - - const inputs = makeBaseParityInputs(); - const promise = queue.getBaseParityProof(inputs); - - await expect(promise).rejects.toEqual(error); - expect(prover.getBaseParityProof).toHaveBeenCalledWith(inputs); - }); - - it('continues to process jobs', async () => { - const publicInputs = makeParityPublicInputs(); - const proof = makeRecursiveProof(RECURSIVE_PROOF_LENGTH); - const vk = VerificationKeyData.makeFakeHonk(); - prover.getBaseParityProof.mockResolvedValue(makePublicInputsAndRecursiveProof(publicInputs, proof, vk)); - - const inputs = makeBaseParityInputs(); - const promise1 = queue.getBaseParityProof(inputs); - - await expect(promise1).resolves.toEqual(makePublicInputsAndRecursiveProof(publicInputs, proof, vk)); - - const inputs2 = makeBaseParityInputs(); - const promise2 = queue.getBaseParityProof(inputs2); - - await expect(promise2).resolves.toEqual(makePublicInputsAndRecursiveProof(publicInputs, proof, vk)); - - expect(prover.getBaseParityProof).toHaveBeenCalledTimes(2); - expect(prover.getBaseParityProof).toHaveBeenCalledWith(inputs); - expect(prover.getBaseParityProof).toHaveBeenCalledWith(inputs2); - }); -}); diff --git a/yarn-project/prover-client/src/prover-agent/prover-agent.ts b/yarn-project/prover-client/src/prover-agent/prover-agent.ts deleted file mode 100644 index 16eeb991fa5..00000000000 --- a/yarn-project/prover-client/src/prover-agent/prover-agent.ts +++ /dev/null @@ -1,248 +0,0 @@ -import { - type ProverAgentApi, - type ProvingJob, - type ProvingJobInputs, - type ProvingJobResultsMap, - type ProvingJobSource, - ProvingRequestType, - type ServerCircuitProver, - makeProvingRequestResult, -} from '@aztec/circuit-types'; -import { createLogger } from '@aztec/foundation/log'; -import { RunningPromise } from '@aztec/foundation/running-promise'; -import { elapsed } from '@aztec/foundation/timer'; -import { - Attributes, - type TelemetryClient, - type Traceable, - type Tracer, - getTelemetryClient, - trackSpan, -} from '@aztec/telemetry-client'; - -import { InlineProofStore } from '../proving_broker/proof_store/index.js'; - -const PRINT_THRESHOLD_NS = 6e10; // 60 seconds - -type InFlightPromise = { - id: string; - type: ProvingRequestType; - promise: Promise; -}; - -/** - * A helper class that encapsulates a circuit prover and connects it to a job source. - */ -export class ProverAgent implements ProverAgentApi, Traceable { - private inFlightPromises = new Map(); - private runningPromise?: RunningPromise; - private proofInputsDatabase = new InlineProofStore(); - - public readonly tracer: Tracer; - - constructor( - /** The prover implementation to defer jobs to */ - private circuitProver: ServerCircuitProver, - /** How many proving jobs this agent can handle in parallel */ - private maxConcurrency = 1, - /** How long to wait between jobs */ - private pollIntervalMs = 100, - /** Telemetry client */ - telemetry: TelemetryClient = getTelemetryClient(), - /** Logger */ - private log = createLogger('prover-client:prover-agent'), - ) { - this.tracer = telemetry.getTracer('ProverAgent'); - } - - setMaxConcurrency(maxConcurrency: number): Promise { - if (maxConcurrency < 1) { - throw new Error('Concurrency must be at least 1'); - } - this.maxConcurrency = maxConcurrency; - return Promise.resolve(); - } - - setCircuitProver(circuitProver: ServerCircuitProver): void { - this.circuitProver = circuitProver; - } - - isRunning() { - return Promise.resolve(this.#isRunning()); - } - - #isRunning() { - return this.runningPromise?.isRunning() ?? false; - } - - getCurrentJobs(): Promise<{ id: string; type: string }[]> { - return Promise.resolve( - Array.from(this.inFlightPromises.values()).map(({ id, type }) => ({ id, type: ProvingRequestType[type] })), - ); - } - - start(jobSource: ProvingJobSource): void { - if (this.runningPromise) { - throw new Error('Agent is already running'); - } - - let lastPrint = process.hrtime.bigint(); - - this.runningPromise = new RunningPromise( - async () => { - for (const jobId of this.inFlightPromises.keys()) { - await jobSource.heartbeat(jobId); - } - - const now = process.hrtime.bigint(); - - if (now - lastPrint >= PRINT_THRESHOLD_NS) { - // only log if we're actually doing work - if (this.inFlightPromises.size > 0) { - const jobs = Array.from(this.inFlightPromises.values()) - .map(job => `id=${job.id},type=${ProvingRequestType[job.type]}`) - .join(' '); - this.log.info(`Agent is running with ${this.inFlightPromises.size} in-flight jobs: ${jobs}`); - } - lastPrint = now; - } - - while (this.inFlightPromises.size < this.maxConcurrency) { - try { - const job = await jobSource.getProvingJob(); - if (!job) { - // job source is fully drained, sleep for a bit and try again - return; - } - - try { - const promise = this.work(jobSource, job).finally(() => this.inFlightPromises.delete(job.id)); - this.inFlightPromises.set(job.id, { - id: job.id, - type: job.type, - promise, - }); - } catch (err) { - this.log.warn( - `Error processing job! type=${ProvingRequestType[job.type]}: ${err}. ${(err as Error).stack}`, - ); - } - } catch (err) { - this.log.error(`Error fetching job`, err); - } - } - }, - this.log, - this.pollIntervalMs, - ); - - this.runningPromise.start(); - this.log.info(`Agent started with concurrency=${this.maxConcurrency}`); - } - - async stop(): Promise { - if (!this.runningPromise?.isRunning()) { - return; - } - - await this.runningPromise.stop(); - this.runningPromise = undefined; - - this.log.info('Agent stopped'); - } - - @trackSpan('ProverAgent.work', (_jobSoure, job) => ({ - [Attributes.PROVING_JOB_ID]: job.id, - [Attributes.PROVING_JOB_TYPE]: ProvingRequestType[job.type], - })) - private async work(jobSource: ProvingJobSource, job: ProvingJob): Promise { - try { - this.log.debug(`Picked up proving job ${job.id} ${ProvingRequestType[job.type]}`, { - jobId: job.id, - jobType: ProvingRequestType[job.type], - }); - const type = job.type; - const inputs = await this.proofInputsDatabase.getProofInput(job.inputsUri); - const [time, result] = await elapsed(this.getProof(inputs)); - if (this.#isRunning()) { - this.log.verbose(`Processed proving job id=${job.id} type=${ProvingRequestType[type]} duration=${time}ms`); - await jobSource.resolveProvingJob(job.id, makeProvingRequestResult(type, result)); - } else { - this.log.verbose( - `Dropping proving job id=${job.id} type=${ProvingRequestType[job.type]} duration=${time}ms: agent stopped`, - ); - } - } catch (err) { - const type = ProvingRequestType[job.type]; - if (this.#isRunning()) { - if (job.type === ProvingRequestType.PUBLIC_VM && !process.env.AVM_PROVING_STRICT) { - this.log.warn(`Expected error processing VM proving job id=${job.id} type=${type}: ${err}`); - } else { - this.log.error(`Error processing proving job id=${job.id} type=${type}: ${err}`, err); - } - const reason = (err as any)?.message ?? String(err); - await jobSource.rejectProvingJob(job.id, reason); - } else { - this.log.verbose(`Dropping proving job id=${job.id} type=${type}: agent stopped: ${(err as any).stack || err}`); - } - } - } - - private getProof(request: ProvingJobInputs): Promise { - const { type, inputs } = request; - switch (type) { - case ProvingRequestType.PUBLIC_VM: { - return this.circuitProver.getAvmProof(inputs); - } - - case ProvingRequestType.PRIVATE_BASE_ROLLUP: { - return this.circuitProver.getPrivateBaseRollupProof(inputs); - } - - case ProvingRequestType.PUBLIC_BASE_ROLLUP: { - return this.circuitProver.getPublicBaseRollupProof(inputs); - } - - case ProvingRequestType.MERGE_ROLLUP: { - return this.circuitProver.getMergeRollupProof(inputs); - } - - case ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP: { - return this.circuitProver.getEmptyBlockRootRollupProof(inputs); - } - - case ProvingRequestType.BLOCK_ROOT_ROLLUP: { - return this.circuitProver.getBlockRootRollupProof(inputs); - } - - case ProvingRequestType.SINGLE_TX_BLOCK_ROOT_ROLLUP: { - return this.circuitProver.getSingleTxBlockRootRollupProof(inputs); - } - - case ProvingRequestType.BLOCK_MERGE_ROLLUP: { - return this.circuitProver.getBlockMergeRollupProof(inputs); - } - - case ProvingRequestType.ROOT_ROLLUP: { - return this.circuitProver.getRootRollupProof(inputs); - } - - case ProvingRequestType.BASE_PARITY: { - return this.circuitProver.getBaseParityProof(inputs); - } - - case ProvingRequestType.ROOT_PARITY: { - return this.circuitProver.getRootParityProof(inputs); - } - - case ProvingRequestType.TUBE_PROOF: { - return this.circuitProver.getTubeProof(inputs); - } - - default: { - const _exhaustive: never = type; - return Promise.reject(new Error(`Invalid proof request type: ${type}`)); - } - } - } -} diff --git a/yarn-project/prover-client/src/prover-agent/proving-error.ts b/yarn-project/prover-client/src/prover-agent/proving-error.ts deleted file mode 100644 index 982ec0cdc0e..00000000000 --- a/yarn-project/prover-client/src/prover-agent/proving-error.ts +++ /dev/null @@ -1,9 +0,0 @@ -export class ProvingError extends Error { - override toString() { - return this.message; - } - - static fromString(message: string) { - return new ProvingError(message); - } -} diff --git a/yarn-project/prover-client/src/prover-agent/queue_metrics.ts b/yarn-project/prover-client/src/prover-agent/queue_metrics.ts deleted file mode 100644 index 542ea20b321..00000000000 --- a/yarn-project/prover-client/src/prover-agent/queue_metrics.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { ProvingRequestType } from '@aztec/circuit-types'; -import { Attributes, type Gauge, type Histogram, Metrics, type TelemetryClient } from '@aztec/telemetry-client'; - -export class ProvingQueueMetrics { - private jobSize: Histogram; - private queueSize: Gauge; - - constructor(client: TelemetryClient, name = 'ProvingQueueMetrics') { - const meter = client.getMeter(name); - this.jobSize = meter.createHistogram(Metrics.PROVING_QUEUE_JOB_SIZE, { - description: 'Size of proving job', - unit: 'by', - }); - - this.queueSize = meter.createGauge(Metrics.PROVING_QUEUE_SIZE, { - description: 'Size of proving queue', - }); - } - - recordNewJob(type: ProvingRequestType, size: number) { - this.jobSize.record(size, { - [Attributes.PROVING_JOB_TYPE]: ProvingRequestType[type], - }); - } - - recordQueueSize(size: number) { - this.queueSize.record(size); - } -} diff --git a/yarn-project/prover-client/src/prover-agent/rpc.ts b/yarn-project/prover-client/src/prover-agent/rpc.ts deleted file mode 100644 index f3ccf529687..00000000000 --- a/yarn-project/prover-client/src/prover-agent/rpc.ts +++ /dev/null @@ -1,36 +0,0 @@ -import { - type ComponentsVersions, - ProverAgentApiSchema, - type ProvingJobSource, - ProvingJobSourceSchema, - getVersioningResponseHandler, -} from '@aztec/circuit-types'; -import { createSafeJsonRpcClient } from '@aztec/foundation/json-rpc/client'; -import { createTracedJsonRpcServer, makeTracedFetch } from '@aztec/telemetry-client'; - -import { type ProverAgent } from './prover-agent.js'; - -export function createProvingJobSourceServer(queue: ProvingJobSource) { - return createTracedJsonRpcServer(queue, ProvingJobSourceSchema); -} - -export function createProvingJobSourceClient( - url: string, - versions: Partial, - fetch = makeTracedFetch([1, 2, 3], false), -): ProvingJobSource { - return createSafeJsonRpcClient(url, ProvingJobSourceSchema, { - namespaceMethods: 'provingJobSource', - fetch, - onResponse: getVersioningResponseHandler(versions), - }); -} - -/** - * Wrap a ProverAgent instance with a JSON RPC HTTP server. - * @param agent - The Prover Agent - * @returns An JSON-RPC HTTP server - */ -export function createProverAgentRpcServer(agent: ProverAgent) { - return createTracedJsonRpcServer(agent, ProverAgentApiSchema); -} diff --git a/yarn-project/prover-client/src/test/mock_prover.ts b/yarn-project/prover-client/src/test/mock_prover.ts index 1f9c5d6cab9..6d3e24dbc0f 100644 --- a/yarn-project/prover-client/src/test/mock_prover.ts +++ b/yarn-project/prover-client/src/test/mock_prover.ts @@ -44,14 +44,16 @@ import { } from '@aztec/circuits.js/testing'; import { times } from '@aztec/foundation/collection'; +import { BrokerCircuitProverFacade } from '../proving_broker/broker_prover_facade.js'; import { InlineProofStore, type ProofStore } from '../proving_broker/proof_store/index.js'; import { ProvingAgent } from '../proving_broker/proving_agent.js'; import { ProvingBroker } from '../proving_broker/proving_broker.js'; import { InMemoryBrokerDatabase } from '../proving_broker/proving_broker_database/memory.js'; export class TestBroker implements ProvingJobProducer { - private broker = new ProvingBroker(new InMemoryBrokerDatabase()); + private broker: ProvingBroker; private agents: ProvingAgent[]; + public facade: BrokerCircuitProverFacade; constructor( agentCount: number, @@ -63,16 +65,20 @@ export class TestBroker implements ProvingJobProducer { agentCount, () => new ProvingAgent(this.broker, proofStore, prover, undefined, agentPollInterval), ); + this.broker = new ProvingBroker(new InMemoryBrokerDatabase()); + this.facade = new BrokerCircuitProverFacade(this.broker, proofStore); } public async start() { await this.broker.start(); this.agents.forEach(agent => agent.start()); + this.facade.start(); } public async stop() { await Promise.all(this.agents.map(agent => agent.stop())); await this.broker.stop(); + await this.facade.stop(); } public getProofStore(): ProofStore {