diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index 5ccbf6266c7..da82aa420e7 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -122,7 +122,11 @@ export class AztecNodeService implements AztecNode { * @param config - The configuration to be used by the aztec node. * @returns - A fully synced Aztec Node for use in development/testing. */ - public static async createAndSync(config: AztecNodeConfig) { + public static async createAndSync( + config: AztecNodeConfig, + log = createDebugLogger('aztec:node'), + storeLog = createDebugLogger('aztec:node:lmdb'), + ) { const ethereumChain = createEthereumChain(config.rpcUrl, config.apiKey); //validate that the actual chain id matches that specified in configuration if (config.chainId !== ethereumChain.chainInfo.id) { @@ -131,8 +135,6 @@ export class AztecNodeService implements AztecNode { ); } - const log = createDebugLogger('aztec:node'); - const storeLog = createDebugLogger('aztec:node:lmdb'); const store = await initStoreForRollup( AztecLmdbStore.open(config.dataDirectory, false, storeLog), config.l1Contracts.rollupAddress, diff --git a/yarn-project/end-to-end/Earthfile b/yarn-project/end-to-end/Earthfile index 10db3c25b4a..2b3eda22113 100644 --- a/yarn-project/end-to-end/Earthfile +++ b/yarn-project/end-to-end/Earthfile @@ -46,6 +46,9 @@ E2E_TEST: # Run our docker compose, ending whenever sandbox ends, filtering out noisy eth_getLogs RUN docker run -e HARDWARE_CONCURRENCY=$hardware_concurrency --rm aztecprotocol/end-to-end:$AZTEC_DOCKER_TAG $test || $allow_fail +e2e-p2p: + DO +E2E_TEST --test=./src/e2e_p2p_network.test.ts + e2e-2-pxes: DO +E2E_TEST --test=./src/e2e_2_pxes.test.ts diff --git a/yarn-project/end-to-end/src/flakey_e2e_p2p_network.test.ts b/yarn-project/end-to-end/src/e2e_p2p_network.test.ts similarity index 52% rename from yarn-project/end-to-end/src/flakey_e2e_p2p_network.test.ts rename to yarn-project/end-to-end/src/e2e_p2p_network.test.ts index 84173febc0d..42a5f26cb8b 100644 --- a/yarn-project/end-to-end/src/flakey_e2e_p2p_network.test.ts +++ b/yarn-project/end-to-end/src/e2e_p2p_network.test.ts @@ -8,10 +8,13 @@ import { GrumpkinScalar, type SentTx, TxStatus, + createDebugLogger, + sleep, } from '@aztec/aztec.js'; import { type BootNodeConfig, BootstrapNode, createLibP2PPeerId } from '@aztec/p2p'; import { type PXEService, createPXEService, getPXEServiceConfig as getRpcConfig } from '@aztec/pxe'; +import fs from 'fs'; import { mnemonicToAccount } from 'viem/accounts'; import { MNEMONIC } from './fixtures/fixtures.js'; @@ -30,21 +33,36 @@ interface NodeContext { account: AztecAddress; } +const PEER_ID_PRIVATE_KEYS = [ + '0802122002f651fd8653925529e3baccb8489b3af4d7d9db440cbf5df4a63ff04ea69683', + '08021220c3bd886df5fe5b33376096ad0dab3d2dc86ed2a361d5fde70f24d979dc73da41', + '080212206b6567ac759db5434e79495ec7458e5e93fe479a5b80713446e0bce5439a5655', + '08021220366453668099bdacdf08fab476ee1fced6bf00ddc1223d6c2ee626e7236fb526', +]; + describe('e2e_p2p_network', () => { let config: AztecNodeConfig; let logger: DebugLogger; let teardown: () => Promise; + let bootstrapNode: BootstrapNode; + let bootstrapNodeEnr: string; beforeEach(async () => { - ({ teardown, config, logger } = await setup(1)); + ({ teardown, config, logger } = await setup(0)); + bootstrapNode = await createBootstrapNode(); + bootstrapNodeEnr = bootstrapNode.getENR().encodeTxt(); }); afterEach(() => teardown()); + afterAll(() => { + for (let i = 0; i < NUM_NODES; i++) { + fs.rmSync(`./data-${i}`, { recursive: true, force: true }); + } + }); + it('should rollup txs from all peers', async () => { // create the bootstrap node for the network - const bootstrapNode = await createBootstrapNode(); - const bootstrapNodeEnr = bootstrapNode.getENR(); if (!bootstrapNodeEnr) { throw new Error('Bootstrap node ENR is not available'); } @@ -53,14 +71,29 @@ describe('e2e_p2p_network', () => { // should be set so that the only way for rollups to be built // is if the txs are successfully gossiped around the nodes. const contexts: NodeContext[] = []; + const nodes: AztecNodeService[] = []; for (let i = 0; i < NUM_NODES; i++) { - const node = await createNode(i + 1 + BOOT_NODE_UDP_PORT, bootstrapNodeEnr?.encodeTxt(), i); + const node = await createNode(i + 1 + BOOT_NODE_UDP_PORT, bootstrapNodeEnr, i); + nodes.push(node); + } + + // wait a bit for peers to discover each other + await sleep(2000); + + for (const node of nodes) { const context = await createPXEServiceAndSubmitTransactions(node, NUM_TXS_PER_NODE); contexts.push(context); } // now ensure that all txs were successfully mined - await Promise.all(contexts.flatMap(context => context.txs.map(tx => tx.wait()))); + await Promise.all( + contexts.flatMap((context, i) => + context.txs.map(async (tx, j) => { + logger.info(`Waiting for tx ${i}-${j}: ${await tx.getTxHash()} to be mined`); + return tx.wait(); + }), + ), + ); // shutdown all nodes. for (const context of contexts) { @@ -70,6 +103,61 @@ describe('e2e_p2p_network', () => { await bootstrapNode.stop(); }); + it('should re-discover stored peers without bootstrap node', async () => { + const contexts: NodeContext[] = []; + const nodes: AztecNodeService[] = []; + for (let i = 0; i < NUM_NODES; i++) { + const node = await createNode(i + 1 + BOOT_NODE_UDP_PORT, bootstrapNodeEnr, i, `./data-${i}`); + nodes.push(node); + } + // wait a bit for peers to discover each other + await sleep(3000); + + // stop bootstrap node + await bootstrapNode.stop(); + + // create new nodes from datadir + const newNodes: AztecNodeService[] = []; + + // stop all nodes + for (let i = 0; i < NUM_NODES; i++) { + const node = nodes[i]; + await node.stop(); + logger.info(`Node ${i} stopped`); + await sleep(1200); + const newNode = await createNode(i + 1 + BOOT_NODE_UDP_PORT, undefined, i, `./data-${i}`); + logger.info(`Node ${i} restarted`); + newNodes.push(newNode); + // const context = await createPXEServiceAndSubmitTransactions(node, NUM_TXS_PER_NODE); + // contexts.push(context); + } + + // wait a bit for peers to discover each other + await sleep(2000); + + for (const node of newNodes) { + const context = await createPXEServiceAndSubmitTransactions(node, NUM_TXS_PER_NODE); + contexts.push(context); + } + + // now ensure that all txs were successfully mined + await Promise.all( + contexts.flatMap((context, i) => + context.txs.map(async (tx, j) => { + logger.info(`Waiting for tx ${i}-${j}: ${await tx.getTxHash()} to be mined`); + return tx.wait(); + }), + ), + ); + + // shutdown all nodes. + // for (const context of contexts) { + for (const context of contexts) { + await context.node.stop(); + await context.pxeService.stop(); + } + }); + const createBootstrapNode = async () => { const peerId = await createLibP2PPeerId(); const bootstrapNode = new BootstrapNode(); @@ -87,7 +175,12 @@ describe('e2e_p2p_network', () => { }; // creates a P2P enabled instance of Aztec Node Service - const createNode = async (tcpListenPort: number, bootstrapNode: string, publisherAddressIndex: number) => { + const createNode = async ( + tcpListenPort: number, + bootstrapNode: string | undefined, + publisherAddressIndex: number, + dataDirectory?: string, + ) => { // We use different L1 publisher accounts in order to avoid duplicate tx nonces. We start from // publisherAddressIndex + 1 because index 0 was already used during test environment setup. const hdAccount = mnemonicToAccount(MNEMONIC, { addressIndex: publisherAddressIndex + 1 }); @@ -96,38 +189,21 @@ describe('e2e_p2p_network', () => { const newConfig: AztecNodeConfig = { ...config, + peerIdPrivateKey: PEER_ID_PRIVATE_KEYS[publisherAddressIndex], udpListenAddress: `0.0.0.0:${tcpListenPort}`, tcpListenAddress: `0.0.0.0:${tcpListenPort}`, tcpAnnounceAddress: `127.0.0.1:${tcpListenPort}`, udpAnnounceAddress: `127.0.0.1:${tcpListenPort}`, - bootstrapNodes: [bootstrapNode], minTxsPerBlock: NUM_TXS_PER_BLOCK, maxTxsPerBlock: NUM_TXS_PER_BLOCK, p2pEnabled: true, p2pBlockCheckIntervalMS: 1000, p2pL2QueueSize: 1, transactionProtocol: '', + dataDirectory, + bootstrapNodes: bootstrapNode ? [bootstrapNode] : [], }; - return await AztecNodeService.createAndSync(newConfig); - }; - - // submits a set of transactions to the provided Private eXecution Environment (PXE) - const submitTxsTo = async (pxe: PXEService, account: AztecAddress, numTxs: number) => { - const txs: SentTx[] = []; - for (let i = 0; i < numTxs; i++) { - const tx = getSchnorrAccount(pxe, Fr.random(), GrumpkinScalar.random(), Fr.random()).deploy(); - logger.info(`Tx sent with hash ${await tx.getTxHash()}`); - const receipt = await tx.getReceipt(); - expect(receipt).toEqual( - expect.objectContaining({ - status: TxStatus.PENDING, - error: '', - }), - ); - logger.info(`Receipt received for ${await tx.getTxHash()}`); - txs.push(tx); - } - return txs; + return await AztecNodeService.createAndSync(newConfig, createDebugLogger(`aztec:node-${tcpListenPort}`)); }; // creates an instance of the PXE and submit a given number of transactions to it. @@ -142,7 +218,7 @@ describe('e2e_p2p_network', () => { const completeAddress = CompleteAddress.fromSecretKeyAndPartialAddress(secretKey, Fr.random()); await pxeService.registerAccount(secretKey, completeAddress.partialAddress); - const txs = await submitTxsTo(pxeService, completeAddress.address, numTxs); + const txs = await submitTxsTo(pxeService, numTxs); return { txs, account: completeAddress.address, @@ -150,4 +226,36 @@ describe('e2e_p2p_network', () => { node, }; }; + + // submits a set of transactions to the provided Private eXecution Environment (PXE) + const submitTxsTo = async (pxe: PXEService, numTxs: number) => { + const txs: SentTx[] = []; + for (let i = 0; i < numTxs; i++) { + // const tx = getSchnorrAccount(pxe, Fr.random(), GrumpkinScalar.random(), Fr.random()).deploy(); + const accountManager = getSchnorrAccount(pxe, Fr.random(), GrumpkinScalar.random(), Fr.random()); + const deployMethod = await accountManager.getDeployMethod(); + await deployMethod.create({ + contractAddressSalt: accountManager.salt, + skipClassRegistration: true, + skipPublicDeployment: true, + universalDeploy: true, + }); + await deployMethod.prove({}); + const tx = deployMethod.send(); + + const txHash = await tx.getTxHash(); + + logger.info(`Tx sent with hash ${txHash}`); + const receipt = await tx.getReceipt(); + expect(receipt).toEqual( + expect.objectContaining({ + status: TxStatus.PENDING, + error: '', + }), + ); + logger.info(`Receipt received for ${txHash}`); + txs.push(tx); + } + return txs; + }; }); diff --git a/yarn-project/p2p/src/client/index.ts b/yarn-project/p2p/src/client/index.ts index 8f1de9d8017..6a499693f64 100644 --- a/yarn-project/p2p/src/client/index.ts +++ b/yarn-project/p2p/src/client/index.ts @@ -4,7 +4,7 @@ import { type AztecKVStore } from '@aztec/kv-store'; import { P2PClient } from '../client/p2p_client.js'; import { type P2PConfig } from '../config.js'; import { DiscV5Service } from '../service/discV5_service.js'; -import { DummyP2PService, DummyPeerDiscoveryService } from '../service/dummy_service.js'; +import { DummyP2PService } from '../service/dummy_service.js'; import { LibP2PService, createLibP2PPeerId } from '../service/index.js'; import { type TxPool } from '../tx_pool/index.js'; import { getPublicIp, splitAddressPort } from '../util.js'; @@ -17,7 +17,6 @@ export const createP2PClient = async ( txPool: TxPool, l2BlockSource: L2BlockSource, ) => { - let discv5Service; let p2pService; if (config.p2pEnabled) { @@ -40,7 +39,7 @@ export const createP2PClient = async ( config.tcpAnnounceAddress = tcpAnnounceAddress; } else { throw new Error( - `Invalid announceTcpAddress provided: ${splitTcpAnnounceAddress}. Expected format: :`, + `Invalid announceTcpAddress provided: ${configTcpAnnounceAddress}. Expected format: :`, ); } } @@ -59,11 +58,10 @@ export const createP2PClient = async ( // Create peer discovery service const peerId = await createLibP2PPeerId(config.peerIdPrivateKey); - discv5Service = new DiscV5Service(peerId, config); - p2pService = await LibP2PService.new(config, discv5Service, peerId, txPool); + const discoveryService = new DiscV5Service(peerId, config); + p2pService = await LibP2PService.new(config, discoveryService, peerId, txPool, store); } else { p2pService = new DummyP2PService(); - discv5Service = new DummyPeerDiscoveryService(); } return new P2PClient(store, l2BlockSource, txPool, p2pService); }; diff --git a/yarn-project/p2p/src/client/p2p_client.test.ts b/yarn-project/p2p/src/client/p2p_client.test.ts index 91c0a5561f0..6726df9aeaa 100644 --- a/yarn-project/p2p/src/client/p2p_client.test.ts +++ b/yarn-project/p2p/src/client/p2p_client.test.ts @@ -37,7 +37,6 @@ describe('In-Memory P2P Client', () => { start: jest.fn(), stop: jest.fn(), propagateTx: jest.fn(), - settledTxs: jest.fn(), }; blockSource = new MockBlockSource(); diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index fe3c58db602..96401b35685 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -194,7 +194,7 @@ export class P2PClient implements P2P { this.log.debug('Stopped block downloader'); await this.runningPromise; this.setCurrentState(P2PClientState.STOPPED); - this.log.info('P2P client stopped...'); + this.log.info('P2P client stopped.'); } /** @@ -278,7 +278,6 @@ export class P2PClient implements P2P { for (const block of blocks) { const txHashes = block.body.txEffects.map(txEffect => txEffect.txHash); await this.txPool.deleteTxs(txHashes); - this.p2pService.settledTxs(txHashes); } } diff --git a/yarn-project/p2p/src/service/discV5_service.ts b/yarn-project/p2p/src/service/discV5_service.ts index 8838c180b2f..557a431e19b 100644 --- a/yarn-project/p2p/src/service/discV5_service.ts +++ b/yarn-project/p2p/src/service/discV5_service.ts @@ -1,9 +1,8 @@ import { createDebugLogger } from '@aztec/foundation/log'; -import { RunningPromise } from '@aztec/foundation/running-promise'; import { sleep } from '@aztec/foundation/sleep'; import { Discv5, type Discv5EventEmitter } from '@chainsafe/discv5'; -import { type ENR, SignableENR } from '@chainsafe/enr'; +import { ENR, SignableENR } from '@chainsafe/enr'; import type { PeerId } from '@libp2p/interface'; import { multiaddr } from '@multiformats/multiaddr'; import EventEmitter from 'events'; @@ -14,6 +13,8 @@ import { type PeerDiscoveryService, PeerDiscoveryState } from './service.js'; export const AZTEC_ENR_KEY = 'aztec_network'; +const delayBeforeStart = 2000; // 2sec + export enum AztecENR { devnet = 0x01, testnet = 0x02, @@ -33,11 +34,12 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService /** This instance's ENR */ private enr: SignableENR; - private runningPromise: RunningPromise; - private currentState = PeerDiscoveryState.STOPPED; private bootstrapNodes: string[]; + private bootstrapNodePeerIds: PeerId[] = []; + + private startTime = 0; constructor(private peerId: PeerId, config: P2PConfig, private logger = createDebugLogger('aztec:discv5_service')) { super(); @@ -83,18 +85,17 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService const multiAddrUdp = await enr.getFullMultiaddr('udp'); this.logger.debug(`ENR multiaddr: ${multiAddrTcp?.toString()}, ${multiAddrUdp?.toString()}`); }); - - this.runningPromise = new RunningPromise(async () => { - await this.discv5.findRandomNode(); - }, config.p2pPeerCheckIntervalMS); } public async start(): Promise { + // Do this conversion once since it involves an async function call + this.bootstrapNodePeerIds = await Promise.all(this.bootstrapNodes.map(enr => ENR.decodeTxt(enr).peerId())); if (this.currentState === PeerDiscoveryState.RUNNING) { throw new Error('DiscV5Service already started'); } this.logger.info('Starting DiscV5'); await this.discv5.start(); + this.startTime = Date.now(); this.logger.info('DiscV5 started'); this.currentState = PeerDiscoveryState.RUNNING; @@ -110,12 +111,25 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService this.logger.error(`Error adding bootnode ENRs: ${e}`); } } + } + + public async runRandomNodesQuery(): Promise { + if (this.currentState !== PeerDiscoveryState.RUNNING) { + throw new Error('DiscV5Service not running'); + } // First, wait some time before starting the peer discovery // reference: https://github.com/ChainSafe/lodestar/issues/3423 - await sleep(2000); + const msSinceStart = Date.now() - this.startTime; + if (Date.now() - this.startTime <= delayBeforeStart) { + await sleep(delayBeforeStart - msSinceStart); + } - this.runningPromise.start(); + try { + await this.discv5.findRandomNode(); + } catch (err) { + this.logger.error(`Error running discV5 random node query: ${err}`); + } } public getAllPeers(): ENR[] { @@ -134,8 +148,11 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService return this.currentState; } + public isBootstrapPeer(peerId: PeerId): boolean { + return this.bootstrapNodePeerIds.some(node => node.equals(peerId)); + } + public async stop(): Promise { - await this.runningPromise.stop(); await this.discv5.stop(); this.currentState = PeerDiscoveryState.STOPPED; } diff --git a/yarn-project/p2p/src/service/discv5_service.test.ts b/yarn-project/p2p/src/service/discv5_service.test.ts index dd5a58b9aae..67442f0a87e 100644 --- a/yarn-project/p2p/src/service/discv5_service.test.ts +++ b/yarn-project/p2p/src/service/discv5_service.test.ts @@ -1,3 +1,5 @@ +import { sleep } from '@aztec/foundation/sleep'; + import { jest } from '@jest/globals'; import type { PeerId } from '@libp2p/interface'; import { SemVer } from 'semver'; @@ -8,7 +10,7 @@ import { createLibP2PPeerId } from './libp2p_service.js'; import { PeerDiscoveryState } from './service.js'; const waitForPeers = (node: DiscV5Service, expectedCount: number): Promise => { - const timeout = 5_000; + const timeout = 7_000; return new Promise((resolve, reject) => { const timeoutId = setTimeout(() => { reject(new Error(`Timeout: Failed to connect to ${expectedCount} peers within ${timeout} ms`)); @@ -67,7 +69,17 @@ describe('Discv5Service', () => { const node2 = await createNode(basePort); await node1.start(); await node2.start(); - await waitForPeers(node2, 2); + await Promise.all([ + waitForPeers(node2, 2), + (async () => { + await sleep(2000); // wait for peer discovery to be able to start + for (let i = 0; i < 5; i++) { + await node1.runRandomNodesQuery(); + await node2.runRandomNodesQuery(); + await sleep(100); + } + })(), + ]); const node1Peers = await Promise.all(node1.getAllPeers().map(async peer => (await peer.peerId()).toString())); const node2Peers = await Promise.all(node2.getAllPeers().map(async peer => (await peer.peerId()).toString())); diff --git a/yarn-project/p2p/src/service/dummy_service.ts b/yarn-project/p2p/src/service/dummy_service.ts index cd1ed8d0d41..aeeedb1f03d 100644 --- a/yarn-project/p2p/src/service/dummy_service.ts +++ b/yarn-project/p2p/src/service/dummy_service.ts @@ -1,5 +1,6 @@ -import { type Tx, type TxHash } from '@aztec/circuit-types'; +import type { Tx, TxHash } from '@aztec/circuit-types'; +import type { PeerId } from '@libp2p/interface'; import EventEmitter from 'events'; import { type P2PService, type PeerDiscoveryService, PeerDiscoveryState } from './service.js'; @@ -66,6 +67,14 @@ export class DummyPeerDiscoveryService extends EventEmitter implements PeerDisco return []; } + public runRandomNodesQuery(): Promise { + return Promise.resolve(); + } + + public isBootstrapPeer(_: PeerId): boolean { + return false; + } + public getStatus(): PeerDiscoveryState { return this.currentState; } diff --git a/yarn-project/p2p/src/service/known_txs.test.ts b/yarn-project/p2p/src/service/known_txs.test.ts deleted file mode 100644 index 7c93b085320..00000000000 --- a/yarn-project/p2p/src/service/known_txs.test.ts +++ /dev/null @@ -1,42 +0,0 @@ -import { randomTxHash } from '@aztec/circuit-types'; - -import { expect } from '@jest/globals'; -import type { Ed25519PeerId, PeerId } from '@libp2p/interface'; -import { mock } from 'jest-mock-extended'; - -import { KnownTxLookup } from './known_txs.js'; - -const createMockPeerId = (peerId: string): PeerId => { - return mock({ - toString: () => peerId, - }); -}; - -describe('Known Txs', () => { - it('Returns false when a peer has not seen a tx', () => { - const knownTxs = new KnownTxLookup(); - - const peer = createMockPeerId('Peer 1'); - const txHash = randomTxHash(); - - expect(knownTxs.hasPeerSeenTx(peer, txHash.toString())).toEqual(false); - }); - - it('Returns true when a peer has seen a tx', () => { - const knownTxs = new KnownTxLookup(); - - const peer = createMockPeerId('Peer 1'); - const peer2 = createMockPeerId('Peer 2'); - const txHash = randomTxHash(); - - knownTxs.addPeerForTx(peer, txHash.toString()); - - expect(knownTxs.hasPeerSeenTx(peer, txHash.toString())).toEqual(true); - expect(knownTxs.hasPeerSeenTx(peer2, txHash.toString())).toEqual(false); - - knownTxs.addPeerForTx(peer2, txHash.toString()); - - expect(knownTxs.hasPeerSeenTx(peer, txHash.toString())).toEqual(true); - expect(knownTxs.hasPeerSeenTx(peer2, txHash.toString())).toEqual(true); - }); -}); diff --git a/yarn-project/p2p/src/service/known_txs.ts b/yarn-project/p2p/src/service/known_txs.ts deleted file mode 100644 index d25c866aebe..00000000000 --- a/yarn-project/p2p/src/service/known_txs.ts +++ /dev/null @@ -1,56 +0,0 @@ -import { type PeerId } from '@libp2p/interface'; - -/** - * Keeps a record of which Peers have 'seen' which transactions. - */ -export class KnownTxLookup { - private lookup: { [key: string]: { [key: string]: boolean } } = {}; - - constructor() {} - - /** - * Inform this lookup that a peer has 'seen' a transaction. - * @param peerId - The peerId of the peer that has 'seen' the transaction. - * @param txHash - The thHash of the 'seen' transaction. - */ - public addPeerForTx(peerId: PeerId, txHash: string) { - const peerIdAsString = peerId.toString(); - const existingLookup = this.lookup[txHash]; - if (existingLookup === undefined) { - const newLookup: { [key: string]: boolean } = {}; - newLookup[peerIdAsString] = true; - this.lookup[txHash] = newLookup; - return; - } - existingLookup[peerIdAsString] = true; - } - - /** - * Determine if a peer has 'seen' a transaction. - * @param peerId - The peerId of the peer. - * @param txHash - The thHash of the transaction. - * @returns A boolean indicating if the transaction has been 'seen' by the peer. - */ - public hasPeerSeenTx(peerId: PeerId, txHash: string) { - const existingLookup = this.lookup[txHash]; - if (existingLookup === undefined) { - return false; - } - const peerIdAsString = peerId.toString(); - return !!existingLookup[peerIdAsString]; - } - - /** - * Updates the lookup from the result of settled txs - * These txs will be cleared out of the lookup. - * It is possible that some txs could still be gossiped for a - * short period of time meaning they come back into this lookup - * but this should be infrequent and cause no undesirable effects - * @param txHashes - The hashes of the newly settled transactions - */ - public handleSettledTxs(txHashes: string[]) { - for (const txHash of txHashes) { - delete this.lookup[txHash]; - } - } -} diff --git a/yarn-project/p2p/src/service/libp2p_service.ts b/yarn-project/p2p/src/service/libp2p_service.ts index 6ae680020d0..5164ebfcd64 100644 --- a/yarn-project/p2p/src/service/libp2p_service.ts +++ b/yarn-project/p2p/src/service/libp2p_service.ts @@ -1,17 +1,16 @@ -import { type Tx, type TxHash } from '@aztec/circuit-types'; +import { type Tx } from '@aztec/circuit-types'; import { SerialQueue } from '@aztec/foundation/fifo'; import { createDebugLogger } from '@aztec/foundation/log'; -import { AztecLmdbStore } from '@aztec/kv-store/lmdb'; +import { RunningPromise } from '@aztec/foundation/running-promise'; +import type { AztecKVStore } from '@aztec/kv-store'; -import { ENR } from '@chainsafe/enr'; import { type GossipsubEvents, gossipsub } from '@chainsafe/libp2p-gossipsub'; import { noise } from '@chainsafe/libp2p-noise'; import { yamux } from '@chainsafe/libp2p-yamux'; import { identify } from '@libp2p/identify'; -import type { PeerId, PubSub, Stream } from '@libp2p/interface'; +import type { PeerId, PubSub } from '@libp2p/interface'; import '@libp2p/kad-dht'; import { mplex } from '@libp2p/mplex'; -import { peerIdFromString } from '@libp2p/peer-id'; import { createFromJSON, createSecp256k1PeerId } from '@libp2p/peer-id-factory'; import { tcp } from '@libp2p/tcp'; import { type Libp2p, createLibp2p } from 'libp2p'; @@ -20,7 +19,6 @@ import { type P2PConfig } from '../config.js'; import { type TxPool } from '../tx_pool/index.js'; import { convertToMultiaddr } from '../util.js'; import { AztecDatastore } from './data_store.js'; -import { KnownTxLookup } from './known_txs.js'; import { PeerManager } from './peer_manager.js'; import type { P2PService, PeerDiscoveryService } from './service.js'; import { AztecTxMessageCreator, fromTxMessage } from './tx_messages.js'; @@ -30,7 +28,6 @@ export interface PubSubLibp2p extends Libp2p { pubsub: PubSub; }; } - /** * Create a libp2p peer ID from the private key if provided, otherwise creates a new random ID. * @param privateKey - Optional peer ID private key as hex string @@ -52,16 +49,14 @@ export async function createLibP2PPeerId(privateKey?: string): Promise { */ export class LibP2PService implements P2PService { private jobQueue: SerialQueue = new SerialQueue(); - private knownTxLookup: KnownTxLookup = new KnownTxLookup(); private messageCreator: AztecTxMessageCreator; private peerManager: PeerManager; + private discoveryRunningPromise?: RunningPromise; constructor( private config: P2PConfig, private node: PubSubLibp2p, private peerDiscoveryService: PeerDiscoveryService, - private protocolId: string, private txPool: TxPool, - private bootstrapPeerIds: PeerId[] = [], private logger = createDebugLogger('aztec:libp2p_service'), ) { this.messageCreator = new AztecTxMessageCreator(config.txGossipVersion); @@ -73,54 +68,42 @@ export class LibP2PService implements P2PService { * @returns An empty promise. */ public async start() { + // Check if service is already started if (this.node.status === 'started') { throw new Error('P2P service already started'); } + + // Log listen & announce addresses const { tcpListenAddress, tcpAnnounceAddress } = this.config; this.logger.info(`Starting P2P node on ${tcpListenAddress}`); - if (!tcpAnnounceAddress) { throw new Error('Announce address not provided.'); } - const announceTcpMultiaddr = convertToMultiaddr(tcpAnnounceAddress, 'tcp'); - this.logger.info(`Announcing at ${announceTcpMultiaddr}`); - // handle discovered peers from external discovery service - this.peerDiscoveryService.on('peer:discovered', async (enr: ENR) => { - await this.addPeer(enr); - }); - - this.node.addEventListener('peer:connect', async evt => { - const peerId = evt.detail; - await this.handleNewConnection(peerId as PeerId); - }); - - this.node.addEventListener('peer:disconnect', async evt => { - const peerId = evt.detail; - if (this.isBootstrapPeer(peerId)) { - this.logger.info(`Disconnect from bootstrap peer ${peerId.toString()}`); - } else { - this.logger.info(`Disconnected from transaction peer ${peerId.toString()}`); - await this.peerManager.updateDiscoveryService(); - } - }); + // Start job queue, peer discovery service and libp2p node this.jobQueue.start(); await this.peerDiscoveryService.start(); await this.node.start(); this.logger.info(`Started P2P client with Peer ID ${this.node.peerId.toString()}`); - // Subscribe to standard topics by default + // Subscribe to standard GossipSub topics by default this.subscribeToTopic(this.messageCreator.getTopic()); - // add gossipsub listener + // add GossipSub listener this.node.services.pubsub.addEventListener('gossipsub:message', async e => { const { msg } = e.detail; this.logger.debug(`Received PUBSUB message.`); await this.jobQueue.put(() => this.handleNewGossipMessage(msg.topic, msg.data)); }); + + // Start running promise for peer discovery + this.discoveryRunningPromise = new RunningPromise(() => { + this.peerManager.discover(); + }, this.config.p2pPeerCheckIntervalMS); + this.discoveryRunningPromise.start(); } /** @@ -130,8 +113,12 @@ export class LibP2PService implements P2PService { public async stop() { this.logger.debug('Stopping job queue...'); await this.jobQueue.end(); + this.logger.debug('Stopping running promise...'); + await this.discoveryRunningPromise?.stop(); + this.logger.debug('Stopping peer discovery service...'); + await this.peerDiscoveryService.stop(); this.logger.debug('Stopping LibP2P...'); - await this.node.stop(); + await this.stopLibP2P(); this.logger.info('LibP2P service stopped'); } @@ -146,11 +133,14 @@ export class LibP2PService implements P2PService { peerDiscoveryService: PeerDiscoveryService, peerId: PeerId, txPool: TxPool, + store: AztecKVStore, ) { - const { tcpListenAddress, minPeerCount, maxPeerCount, transactionProtocol: protocolId } = config; + const { tcpListenAddress, tcpAnnounceAddress, minPeerCount, maxPeerCount } = config; const bindAddrTcp = convertToMultiaddr(tcpListenAddress, 'tcp'); + // We know tcpAnnounceAddress cannot be null here because we set it or throw when setting up the service. + const announceAddrTcp = convertToMultiaddr(tcpAnnounceAddress!, 'tcp'); - const datastore = new AztecDatastore(AztecLmdbStore.open()); + const datastore = new AztecDatastore(store); // The autonat service seems quite problematic in that using it seems to cause a lot of attempts // to dial ephemeral ports. I suspect that it works better if you can get the uPNPnat service to @@ -171,10 +161,19 @@ export class LibP2PService implements P2PService { peerId, addresses: { listen: [bindAddrTcp], + announce: [announceAddrTcp], }, transports: [ tcp({ maxConnections: config.maxPeerCount, + // socket option: the maximum length of the queue of pending connections + // https://nodejs.org/dist/latest-v18.x/docs/api/net.html#serverlisten + // it's not safe if we increase this number + backlog: 5, + closeServerOnMaxConnections: { + closeAbove: maxPeerCount ?? Infinity, + listenBelow: maxPeerCount ?? Infinity, + }, }), ], datastore, @@ -200,15 +199,7 @@ export class LibP2PService implements P2PService { }, }); - // extract bootstrap node peer IDs - let bootstrapPeerIds: PeerId[] = []; - if (config.bootstrapNodes.length) { - bootstrapPeerIds = await Promise.all( - config.bootstrapNodes.map(bootnodeEnr => ENR.decodeTxt(bootnodeEnr).peerId()), - ); - } - - return new LibP2PService(config, node, peerDiscoveryService, protocolId, txPool, bootstrapPeerIds); + return new LibP2PService(config, node, peerDiscoveryService, txPool); } /** @@ -260,71 +251,31 @@ export class LibP2PService implements P2PService { void this.jobQueue.put(() => Promise.resolve(this.sendTxToPeers(tx))); } - /** - * Handles the settling of a new batch of transactions. - * @param txHashes - The hashes of the newly settled transactions. - */ - public settledTxs(txHashes: TxHash[]): void { - this.knownTxLookup.handleSettledTxs(txHashes.map(x => x.toString())); - } - - private async addPeer(enr: ENR) { - const peerMultiAddr = await enr.getFullMultiaddr('tcp'); - if (!peerMultiAddr) { - // No TCP address, can't connect - return; - } - const peerIdStr = peerMultiAddr.getPeerId(); - - if (!peerIdStr) { - this.logger.debug(`Peer ID not found in discovered node's multiaddr: ${peerMultiAddr}`); - return; - } - - // check if peer is already known - const peerId = peerIdFromString(peerIdStr); - const hasPeer = await this.node.peerStore.has(peerId); - - // add to peer store if not already known - if (!hasPeer) { - this.logger.info(`Discovered peer ${peerIdStr}. Adding to libp2p peer list`); - let stream: Stream | undefined; - try { - stream = await this.node.dialProtocol(peerMultiAddr, this.protocolId); - } catch (err) { - this.logger.debug(`Failed to dial peer ${peerIdStr}: ${err}`); - } finally { - if (stream) { - await stream.close(); - } - } - } - } - - private async handleNewConnection(peerId: PeerId) { - if (this.isBootstrapPeer(peerId)) { - this.logger.info(`Connected to bootstrap peer ${peerId.toString()}`); - } else { - this.logger.info(`Connected to transaction peer ${peerId.toString()}`); - await this.peerManager.updateDiscoveryService(); - } - } - private async processTxFromPeer(tx: Tx): Promise { const txHash = tx.getTxHash(); const txHashString = txHash.toString(); - this.logger.debug(`Received tx ${txHashString} from external peer.`); + this.logger.verbose(`Received tx ${txHashString} from external peer.`); await this.txPool.addTxs([tx]); } private async sendTxToPeers(tx: Tx) { const { data: txData } = this.messageCreator.createTxMessage(tx); - this.logger.debug(`Sending tx ${tx.getTxHash().toString()} to peers`); + this.logger.verbose(`Sending tx ${tx.getTxHash().toString()} to peers`); const recipientsNum = await this.publishToTopic(this.messageCreator.getTopic(), txData); - this.logger.debug(`Sent tx ${tx.getTxHash().toString()} to ${recipientsNum} peers`); + this.logger.verbose(`Sent tx ${tx.getTxHash().toString()} to ${recipientsNum} peers`); } - private isBootstrapPeer(peer: PeerId) { - return this.bootstrapPeerIds.some(bootstrapPeer => bootstrapPeer.equals(peer)); + // Libp2p seems to hang sometimes if new peers are initiating connections. + private async stopLibP2P() { + const TIMEOUT_MS = 5000; // 5 seconds timeout + const timeout = new Promise((resolve, reject) => { + setTimeout(() => reject(new Error('Timeout during libp2p.stop()')), TIMEOUT_MS); + }); + try { + await Promise.race([this.node.stop(), timeout]); + this.logger.debug('Libp2p stopped'); + } catch (error) { + this.logger.error('Error during stop or timeout:', error); + } } } diff --git a/yarn-project/p2p/src/service/peer_manager.ts b/yarn-project/p2p/src/service/peer_manager.ts index 9e2993103d9..c81dab40124 100644 --- a/yarn-project/p2p/src/service/peer_manager.ts +++ b/yarn-project/p2p/src/service/peer_manager.ts @@ -1,26 +1,201 @@ import { createDebugLogger } from '@aztec/foundation/log'; +import { type ENR } from '@chainsafe/enr'; +import { type PeerId } from '@libp2p/interface'; +import { type Multiaddr } from '@multiformats/multiaddr'; import { type Libp2p } from 'libp2p'; import { type P2PConfig } from '../config.js'; -import { type PeerDiscoveryService, PeerDiscoveryState } from './service.js'; +import { type PeerDiscoveryService } from './service.js'; + +const MAX_DIAL_ATTEMPTS = 3; +const MAX_CACHED_PEERS = 100; + +type CachedPeer = { + peerId: PeerId; + enr: ENR; + multiaddrTcp: Multiaddr; + dialAttempts: number; +}; export class PeerManager { + private cachedPeers: Map = new Map(); constructor( private libP2PNode: Libp2p, - private discV5Node: PeerDiscoveryService, + private peerDiscoveryService: PeerDiscoveryService, private config: P2PConfig, private logger = createDebugLogger('aztec:p2p:peer_manager'), - ) {} - - async updateDiscoveryService() { - const peerCount = this.libP2PNode.getPeers().length; - if (peerCount >= this.config.maxPeerCount && this.discV5Node.getStatus() === PeerDiscoveryState.RUNNING) { - this.logger.debug('Max peer count reached, stopping discovery service'); - await this.discV5Node.stop(); - } else if (peerCount <= this.config.minPeerCount && this.discV5Node.getStatus() === PeerDiscoveryState.STOPPED) { - this.logger.debug('Min peer count reached, starting discovery service'); - await this.discV5Node.start(); + ) { + // Handle new established connections + this.libP2PNode.addEventListener('peer:connect', evt => { + const peerId = evt.detail; + if (this.peerDiscoveryService.isBootstrapPeer(peerId)) { + this.logger.debug(`Connected to bootstrap peer ${peerId.toString()}`); + } else { + this.logger.debug(`Connected to transaction peer ${peerId.toString()}`); + } + }); + + // Handle lost connections + this.libP2PNode.addEventListener('peer:disconnect', evt => { + const peerId = evt.detail; + if (this.peerDiscoveryService.isBootstrapPeer(peerId)) { + this.logger.debug(`Disconnected from bootstrap peer ${peerId.toString()}`); + } else { + this.logger.debug(`Disconnected from transaction peer ${peerId.toString()}`); + } + }); + + // Handle Discovered peers + this.peerDiscoveryService.on('peer:discovered', async (enr: ENR) => { + await this.handleDiscoveredPeer(enr); + }); + } + + /** + * Discovers peers. + */ + public discover() { + // Get current connections + const connections = this.libP2PNode.getConnections(); + + // Calculate how many connections we're looking to make + const peersToConnect = this.config.maxPeerCount - connections.length; + + this.logger.debug( + `Connections: ${connections.length}, Peers to connect: ${peersToConnect}, maxPeerCount: ${this.config.maxPeerCount}, cachedPeers: ${this.cachedPeers.size}`, + ); + + // Exit if no peers to connect + if (peersToConnect <= 0) { + return; + } + + const cachedPeersToDial: CachedPeer[] = []; + + const pendingDials = new Set( + this.libP2PNode + .getDialQueue() + .map(pendingDial => pendingDial.peerId?.toString()) + .filter(Boolean) as string[], + ); + + for (const [id, peerData] of this.cachedPeers.entries()) { + // if already dialling or connected to, remove from cache + if (pendingDials.has(id) || connections.some(conn => conn.remotePeer.equals(peerData.peerId))) { + this.cachedPeers.delete(id); + } else { + // cachedPeersToDial.set(id, enr); + cachedPeersToDial.push(peerData); + } + } + + // reverse to dial older entries first + cachedPeersToDial.reverse(); + + for (const peer of cachedPeersToDial) { + this.cachedPeers.delete(peer.peerId.toString()); + void this.dialPeer(peer); + } + + // if we need more peers, start randomNodesQuery + if (peersToConnect > 0) { + this.logger.debug('Running random nodes query'); + void this.peerDiscoveryService.runRandomNodesQuery(); + } + } + + /** + * Handles a discovered peer. + * @param enr - The discovered peer's ENR. + */ + private async handleDiscoveredPeer(enr: ENR) { + // TODO: Will be handling peer scoring here + + // check if peer is already connected + const [peerId, multiaddrTcp] = await Promise.all([enr.peerId(), enr.getFullMultiaddr('tcp')]); + + this.logger.debug(`Handling discovered peer ${peerId.toString()}, ${multiaddrTcp?.toString()}`); + + // throw if no tcp addr in multiaddr + if (!multiaddrTcp) { + this.logger.debug(`No TCP address in discovered node's multiaddr: ${enr.toString()}`); + return; + } + const connections = this.libP2PNode.getConnections(); + if (connections.some(conn => conn.remotePeer.equals(peerId))) { + this.logger.debug(`Already connected to peer ${peerId.toString()}`); + return; + } + + // check if peer is already in cache + const id = peerId.toString(); + if (this.cachedPeers.has(id)) { + this.logger.debug(`Already in cache ${id}`); + return; + } + + // create cached peer object + const cachedPeer: CachedPeer = { + peerId, + enr, + multiaddrTcp, + dialAttempts: 0, + }; + + // Determine if we should dial immediately or not + if (this.shouldDialPeer()) { + this.logger.debug(`Dialing peer ${id}`); + void this.dialPeer(cachedPeer); + } else { + this.logger.debug(`Caching peer ${id}`); + this.cachedPeers.set(id, cachedPeer); + // Prune set of cached peers + this.pruneCachedPeers(); + } + } + + async dialPeer(peer: CachedPeer) { + const id = peer.peerId.toString(); + await this.libP2PNode.peerStore.merge(peer.peerId, { multiaddrs: [peer.multiaddrTcp] }); + + this.logger.debug(`Dialing peer ${id}`); + try { + await this.libP2PNode.dial(peer.multiaddrTcp); + } catch { + this.logger.debug(`Failed to dial peer ${id}`); + peer.dialAttempts++; + if (peer.dialAttempts < MAX_DIAL_ATTEMPTS) { + this.cachedPeers.set(id, peer); + } else { + this.cachedPeers.delete(id); + } + } + } + + private shouldDialPeer(): boolean { + const connections = this.libP2PNode.getConnections().length; + this.logger.debug(`Connections: ${connections}, maxPeerCount: ${this.config.maxPeerCount}`); + if (connections >= this.config.maxPeerCount) { + this.logger.debug('Not dialing peer, maxPeerCount reached'); + return false; + } + return true; + } + + private pruneCachedPeers() { + let peersToDelete = this.cachedPeers.size - MAX_CACHED_PEERS; + if (peersToDelete <= 0) { + return; + } + + // Remove the oldest peers + for (const key of this.cachedPeers.keys()) { + this.cachedPeers.delete(key); + peersToDelete--; + if (peersToDelete <= 0) { + break; + } } } } diff --git a/yarn-project/p2p/src/service/service.ts b/yarn-project/p2p/src/service/service.ts index 5d3389af54d..f9933dd3b34 100644 --- a/yarn-project/p2p/src/service/service.ts +++ b/yarn-project/p2p/src/service/service.ts @@ -1,6 +1,7 @@ -import type { Tx, TxHash } from '@aztec/circuit-types'; +import type { Tx } from '@aztec/circuit-types'; import type { ENR } from '@chainsafe/enr'; +import type { PeerId } from '@libp2p/interface'; import type EventEmitter from 'events'; export enum PeerDiscoveryState { @@ -29,12 +30,6 @@ export interface P2PService { * @param tx - The transaction to be propagated. */ propagateTx(tx: Tx): void; - - /** - * Called upon receipt of settled transactions. - * @param txHashes - The hashes of the settled transactions. - */ - settledTxs(txHashes: TxHash[]): void; } /** @@ -57,6 +52,18 @@ export interface PeerDiscoveryService extends EventEmitter { */ getAllPeers(): ENR[]; + /** + * Runs findRandomNode query. + */ + runRandomNodesQuery(): Promise; + + /** + * Checks if the given peer is a bootstrap peer. + * @param peerId - The peer ID to check. + * @returns True if the peer is a bootstrap peer. + */ + isBootstrapPeer(peerId: PeerId): boolean; + /** * Event emitted when a new peer is discovered. */