Skip to content

Commit

Permalink
chore(p2p): reorganise reqresp handlers (#11327)
Browse files Browse the repository at this point in the history
  • Loading branch information
Maddiaa0 authored Jan 19, 2025
1 parent bfcd8a5 commit f048acd
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 113 deletions.
4 changes: 2 additions & 2 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import { type AttestationPool } from '../mem_pools/attestation_pool/attestation_
import { type EpochProofQuotePool } from '../mem_pools/epoch_proof_quote_pool/epoch_proof_quote_pool.js';
import { type MemPools } from '../mem_pools/interface.js';
import { type TxPool } from '../mem_pools/tx_pool/index.js';
import { TX_REQ_PROTOCOL } from '../services/reqresp/interface.js';
import { ReqRespSubProtocol } from '../services/reqresp/interface.js';
import type { P2PService } from '../services/service.js';

/**
Expand Down Expand Up @@ -459,7 +459,7 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
* @returns A promise that resolves to a transaction or undefined.
*/
public async requestTxByHash(txHash: TxHash): Promise<Tx | undefined> {
const tx = await this.p2pService.sendRequest(TX_REQ_PROTOCOL, txHash);
const tx = await this.p2pService.sendRequest(ReqRespSubProtocol.TX, txHash);

if (tx) {
this.log.debug(`Received tx ${txHash.toString()} from peer`);
Expand Down
19 changes: 9 additions & 10 deletions yarn-project/p2p/src/mocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@ import { DiscV5Service } from '../services/discv5/discV5_service.js';
import { LibP2PService } from '../services/libp2p/libp2p_service.js';
import { type PeerManager } from '../services/peer_manager.js';
import { type P2PReqRespConfig } from '../services/reqresp/config.js';
import { pingHandler, statusHandler } from '../services/reqresp/handlers.js';
import {
PING_PROTOCOL,
ReqRespSubProtocol,
type ReqRespSubProtocolHandlers,
type ReqRespSubProtocolValidators,
STATUS_PROTOCOL,
TX_REQ_PROTOCOL,
noopValidator,
} from '../services/reqresp/interface.js';
import { pingHandler } from '../services/reqresp/protocols/ping.js';
import { statusHandler } from '../services/reqresp/protocols/status.js';
import { ReqResp } from '../services/reqresp/reqresp.js';
import { type PubSubLibp2p } from '../util.js';

Expand Down Expand Up @@ -151,17 +150,17 @@ export type ReqRespNode = {

// Mock sub protocol handlers
export const MOCK_SUB_PROTOCOL_HANDLERS: ReqRespSubProtocolHandlers = {
[PING_PROTOCOL]: pingHandler,
[STATUS_PROTOCOL]: statusHandler,
[TX_REQ_PROTOCOL]: (_msg: any) => Promise.resolve(Buffer.from('tx')),
[ReqRespSubProtocol.PING]: pingHandler,
[ReqRespSubProtocol.STATUS]: statusHandler,
[ReqRespSubProtocol.TX]: (_msg: any) => Promise.resolve(Buffer.from('tx')),
};

// By default, all requests are valid
// If you want to test an invalid response, you can override the validator
export const MOCK_SUB_PROTOCOL_VALIDATORS: ReqRespSubProtocolValidators = {
[PING_PROTOCOL]: noopValidator,
[STATUS_PROTOCOL]: noopValidator,
[TX_REQ_PROTOCOL]: noopValidator,
[ReqRespSubProtocol.PING]: noopValidator,
[ReqRespSubProtocol.STATUS]: noopValidator,
[ReqRespSubProtocol.TX]: noopValidator,
};

/**
Expand Down
32 changes: 10 additions & 22 deletions yarn-project/p2p/src/services/libp2p/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
type RawGossipMessage,
TopicTypeMap,
Tx,
TxHash,
type TxHash,
type TxValidationResult,
type WorldStateSynchronizer,
getTopicTypeForClientType,
Expand Down Expand Up @@ -57,17 +57,15 @@ import { type PubSubLibp2p, convertToMultiaddr } from '../../util.js';
import { AztecDatastore } from '../data_store.js';
import { SnappyTransform, fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from '../encoding.js';
import { PeerManager } from '../peer_manager.js';
import { pingHandler, statusHandler } from '../reqresp/handlers.js';
import {
DEFAULT_SUB_PROTOCOL_HANDLERS,
DEFAULT_SUB_PROTOCOL_VALIDATORS,
PING_PROTOCOL,
type ReqRespSubProtocol,
ReqRespSubProtocol,
type ReqRespSubProtocolHandlers,
STATUS_PROTOCOL,
type SubProtocolMap,
TX_REQ_PROTOCOL,
} from '../reqresp/interface.js';
import { pingHandler, statusHandler } from '../reqresp/protocols/index.js';
import { reqRespTxHandler } from '../reqresp/protocols/tx.js';
import { ReqResp } from '../reqresp/reqresp.js';
import type { P2PService, PeerDiscoveryService } from '../service.js';
import { GossipSubEvent } from '../types.js';
Expand Down Expand Up @@ -199,7 +197,7 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
// Define the sub protocol validators - This is done within this start() method to gain a callback to the existing validateTx function
const reqrespSubProtocolValidators = {
...DEFAULT_SUB_PROTOCOL_VALIDATORS,
[TX_REQ_PROTOCOL]: this.validateRequestedTx.bind(this),
[ReqRespSubProtocol.TX]: this.validateRequestedTx.bind(this),
};
await this.reqresp.start(this.requestResponseHandlers, reqrespSubProtocolValidators);
this.logger.info(`Started P2P service`, {
Expand Down Expand Up @@ -337,22 +335,12 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
});

// Create request response protocol handlers
/**
* Handler for tx requests
* @param msg - the tx request message
* @returns the tx response message
*/
const txHandler = (msg: Buffer): Promise<Buffer> => {
const txHash = TxHash.fromBuffer(msg);
const foundTx = mempools.txPool.getTxByHash(txHash);
const buf = foundTx ? foundTx.toBuffer() : Buffer.alloc(0);
return Promise.resolve(buf);
};
const txHandler = reqRespTxHandler(mempools);

const requestResponseHandlers = {
[PING_PROTOCOL]: pingHandler,
[STATUS_PROTOCOL]: statusHandler,
[TX_REQ_PROTOCOL]: txHandler,
[ReqRespSubProtocol.PING]: pingHandler,
[ReqRespSubProtocol.STATUS]: statusHandler,
[ReqRespSubProtocol.TX]: txHandler,
};

return new LibP2PService(
Expand Down Expand Up @@ -548,7 +536,7 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
* In order to perform this check, the tx proof must be verified.
*
* Note: This function is called from within `ReqResp.sendRequest` as part of the
* TX_REQ_PROTOCOL subprotocol validation.
* ReqRespSubProtocol.TX subprotocol validation.
*
* @param requestedTxHash - The hash of the tx that was requested.
* @param responseTx - The tx that was received as a response to the request.
Expand Down
33 changes: 18 additions & 15 deletions yarn-project/p2p/src/services/reqresp/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import { type PeerId } from '@libp2p/interface';
/*
* Request Response Sub Protocols
*/
export const PING_PROTOCOL = '/aztec/req/ping/0.1.0';
export const STATUS_PROTOCOL = '/aztec/req/status/0.1.0';
export const TX_REQ_PROTOCOL = '/aztec/req/tx/0.1.0';

// Sum type for sub protocols
export type ReqRespSubProtocol = typeof PING_PROTOCOL | typeof STATUS_PROTOCOL | typeof TX_REQ_PROTOCOL;
const PING_PROTOCOL = '/aztec/req/ping/0.1.0';
const STATUS_PROTOCOL = '/aztec/req/status/0.1.0';
const TX_PROTOCOL = '/aztec/req/tx/0.1.0';

export enum ReqRespSubProtocol {
PING = PING_PROTOCOL,
STATUS = STATUS_PROTOCOL,
TX = TX_PROTOCOL,
}

/**
* A handler for a sub protocol
Expand Down Expand Up @@ -66,9 +69,9 @@ export type ReqRespSubProtocolValidators = {
};

export const DEFAULT_SUB_PROTOCOL_VALIDATORS: ReqRespSubProtocolValidators = {
[PING_PROTOCOL]: noopValidator,
[STATUS_PROTOCOL]: noopValidator,
[TX_REQ_PROTOCOL]: noopValidator,
[ReqRespSubProtocol.PING]: noopValidator,
[ReqRespSubProtocol.STATUS]: noopValidator,
[ReqRespSubProtocol.TX]: noopValidator,
};

/**
Expand All @@ -91,9 +94,9 @@ const defaultHandler = (_msg: any): Promise<Buffer> => {
* Default sub protocol handlers - this SHOULD be overwritten by the service,
*/
export const DEFAULT_SUB_PROTOCOL_HANDLERS: ReqRespSubProtocolHandlers = {
[PING_PROTOCOL]: defaultHandler,
[STATUS_PROTOCOL]: defaultHandler,
[TX_REQ_PROTOCOL]: defaultHandler,
[ReqRespSubProtocol.PING]: defaultHandler,
[ReqRespSubProtocol.STATUS]: defaultHandler,
[ReqRespSubProtocol.TX]: defaultHandler,
};

/**
Expand Down Expand Up @@ -135,15 +138,15 @@ export class RequestableBuffer {
* as a type rather than an object
*/
export const subProtocolMap: SubProtocolMap = {
[PING_PROTOCOL]: {
[ReqRespSubProtocol.PING]: {
request: RequestableBuffer,
response: RequestableBuffer,
},
[STATUS_PROTOCOL]: {
[ReqRespSubProtocol.STATUS]: {
request: RequestableBuffer,
response: RequestableBuffer,
},
[TX_REQ_PROTOCOL]: {
[ReqRespSubProtocol.TX]: {
request: TxHash,
response: Tx,
},
Expand Down
6 changes: 6 additions & 0 deletions yarn-project/p2p/src/services/reqresp/protocols/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/**
* Request Response protocol handlers
*/
export * from './ping.js';
export * from './status.js';
export * from './tx.js';
8 changes: 8 additions & 0 deletions yarn-project/p2p/src/services/reqresp/protocols/ping.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/**
* Handles the ping request.
* @param _msg - The ping request message.
* @returns A resolved promise with the pong response.
*/
export function pingHandler(_msg: any): Promise<Buffer> {
return Promise.resolve(Buffer.from('pong'));
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,3 @@
/**
* Handles the ping request.
* @param _msg - The ping request message.
* @returns A resolved promise with the pong response.
*/
export function pingHandler(_msg: any): Promise<Buffer> {
return Promise.resolve(Buffer.from('pong'));
}

/**
* Handles the status request.
* @param _msg - The status request message.
Expand Down
26 changes: 26 additions & 0 deletions yarn-project/p2p/src/services/reqresp/protocols/tx.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { type P2PClientType } from '@aztec/circuit-types';
import { TxHash } from '@aztec/circuit-types/tx_hash';

import { type MemPools } from '../../../mem_pools/interface.js';

/**
* We want to keep the logic of the req resp handler in this file, but we do not have a reference to the mempools here
* so we need to pass it in as a parameter.
*
* Handler for tx requests
* @param mempools - the mempools
* @returns the tx response message
*/
export function reqRespTxHandler<T extends P2PClientType>(mempools: MemPools<T>): (msg: Buffer) => Promise<Buffer> {
/**
* Handler for tx requests
* @param msg - the tx request message
* @returns the tx response message
*/
return (msg: Buffer) => {
const txHash = TxHash.fromBuffer(msg);
const foundTx = mempools.txPool.getTxByHash(txHash);
const buf = foundTx ? foundTx.toBuffer() : Buffer.alloc(0);
return Promise.resolve(buf);
};
}
Loading

0 comments on commit f048acd

Please sign in to comment.