Skip to content

Commit

Permalink
feat(p2p): send goodbye messages on disconnecting to peers (#10920)
Browse files Browse the repository at this point in the history
  • Loading branch information
Maddiaa0 authored Jan 20, 2025
1 parent 1075113 commit 046968f
Show file tree
Hide file tree
Showing 20 changed files with 566 additions and 193 deletions.
15 changes: 8 additions & 7 deletions yarn-project/p2p/src/mocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@ import { type BootnodeConfig, type P2PConfig } from '../config.js';
import { type MemPools } from '../mem_pools/interface.js';
import { DiscV5Service } from '../services/discv5/discV5_service.js';
import { LibP2PService } from '../services/libp2p/libp2p_service.js';
import { type PeerManager } from '../services/peer_manager.js';
import { type PeerScoring } from '../services/peer-manager/peer_scoring.js';
import { type P2PReqRespConfig } from '../services/reqresp/config.js';
import {
ReqRespSubProtocol,
type ReqRespSubProtocolHandlers,
type ReqRespSubProtocolValidators,
noopValidator,
} from '../services/reqresp/interface.js';
import { pingHandler } from '../services/reqresp/protocols/ping.js';
import { statusHandler } from '../services/reqresp/protocols/status.js';
import { pingHandler, statusHandler } from '../services/reqresp/protocols/index.js';
import { ReqResp } from '../services/reqresp/reqresp.js';
import { type PubSubLibp2p } from '../util.js';

Expand Down Expand Up @@ -153,6 +152,7 @@ export const MOCK_SUB_PROTOCOL_HANDLERS: ReqRespSubProtocolHandlers = {
[ReqRespSubProtocol.PING]: pingHandler,
[ReqRespSubProtocol.STATUS]: statusHandler,
[ReqRespSubProtocol.TX]: (_msg: any) => Promise.resolve(Buffer.from('tx')),
[ReqRespSubProtocol.GOODBYE]: (_msg: any) => Promise.resolve(Buffer.from('goodbye')),
};

// By default, all requests are valid
Expand All @@ -161,14 +161,15 @@ export const MOCK_SUB_PROTOCOL_VALIDATORS: ReqRespSubProtocolValidators = {
[ReqRespSubProtocol.PING]: noopValidator,
[ReqRespSubProtocol.STATUS]: noopValidator,
[ReqRespSubProtocol.TX]: noopValidator,
[ReqRespSubProtocol.GOODBYE]: noopValidator,
};

/**
* @param numberOfNodes - the number of nodes to create
* @returns An array of the created nodes
*/
export const createNodes = async (peerManager: PeerManager, numberOfNodes: number): Promise<ReqRespNode[]> => {
return await Promise.all(Array.from({ length: numberOfNodes }, () => createReqResp(peerManager)));
export const createNodes = async (peerScoring: PeerScoring, numberOfNodes: number): Promise<ReqRespNode[]> => {
return await Promise.all(Array.from({ length: numberOfNodes }, () => createReqResp(peerScoring)));
};

export const startNodes = async (
Expand All @@ -191,13 +192,13 @@ export const stopNodes = async (nodes: ReqRespNode[]): Promise<void> => {
};

// Create a req resp node, exposing the underlying p2p node
export const createReqResp = async (peerManager: PeerManager): Promise<ReqRespNode> => {
export const createReqResp = async (peerScoring: PeerScoring): Promise<ReqRespNode> => {
const p2p = await createLibp2pNode();
const config: P2PReqRespConfig = {
overallRequestTimeoutMs: 4000,
individualRequestTimeoutMs: 2000,
};
const req = new ReqResp(config, p2p, peerManager);
const req = new ReqResp(config, p2p, peerScoring);
return {
p2p,
req,
Expand Down
232 changes: 120 additions & 112 deletions yarn-project/p2p/src/services/libp2p/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,10 @@ import {
import { type PubSubLibp2p, convertToMultiaddr } from '../../util.js';
import { AztecDatastore } from '../data_store.js';
import { SnappyTransform, fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from '../encoding.js';
import { PeerManager } from '../peer_manager.js';
import {
DEFAULT_SUB_PROTOCOL_HANDLERS,
DEFAULT_SUB_PROTOCOL_VALIDATORS,
ReqRespSubProtocol,
type ReqRespSubProtocolHandlers,
type SubProtocolMap,
} from '../reqresp/interface.js';
import { PeerManager } from '../peer-manager/peer_manager.js';
import { PeerScoring } from '../peer-manager/peer_scoring.js';
import { DEFAULT_SUB_PROTOCOL_VALIDATORS, ReqRespSubProtocol, type SubProtocolMap } from '../reqresp/interface.js';
import { reqGoodbyeHandler } from '../reqresp/protocols/goodbye.js';
import { pingHandler, statusHandler } from '../reqresp/protocols/index.js';
import { reqRespTxHandler } from '../reqresp/protocols/tx.js';
import { ReqResp } from '../reqresp/reqresp.js';
Expand Down Expand Up @@ -115,21 +111,32 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
private peerDiscoveryService: PeerDiscoveryService,
private mempools: MemPools<T>,
private l2BlockSource: L2BlockSource,
private epochCache: EpochCache,
epochCache: EpochCache,
private proofVerifier: ClientProtocolCircuitVerifier,
private worldStateSynchronizer: WorldStateSynchronizer,
private telemetry: TelemetryClient,
private requestResponseHandlers: ReqRespSubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS,
telemetry: TelemetryClient,
private logger = createLogger('p2p:libp2p_service'),
) {
super(telemetry, 'LibP2PService');

this.peerManager = new PeerManager(node, peerDiscoveryService, config, telemetry, logger);
const peerScoring = new PeerScoring(config);
this.reqresp = new ReqResp(config, node, peerScoring);

this.peerManager = new PeerManager(
node,
peerDiscoveryService,
config,
telemetry,
logger,
peerScoring,
this.reqresp,
);

// Update gossipsub score params
this.node.services.pubsub.score.params.appSpecificScore = (peerId: string) => {
return this.peerManager.getPeerScore(peerId);
};
this.node.services.pubsub.score.params.appSpecificWeight = 10;
this.reqresp = new ReqResp(config, node, this.peerManager);

this.attestationValidator = new AttestationValidator(epochCache);
this.blockProposalValidator = new BlockProposalValidator(epochCache);
Expand All @@ -143,95 +150,6 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
};
}

/**
* Starts the LibP2P service.
* @returns An empty promise.
*/
public async start() {
// Check if service is already started
if (this.node.status === 'started') {
throw new Error('P2P service already started');
}

// Get listen & announce addresses for logging
const { tcpListenAddress, tcpAnnounceAddress } = this.config;
if (!tcpAnnounceAddress) {
throw new Error('Announce address not provided.');
}
const announceTcpMultiaddr = convertToMultiaddr(tcpAnnounceAddress, 'tcp');

// Start job queue, peer discovery service and libp2p node
this.jobQueue.start();
await this.peerDiscoveryService.start();
await this.node.start();

// Subscribe to standard GossipSub topics by default
for (const topic of getTopicTypeForClientType(this.clientType)) {
this.subscribeToTopic(TopicTypeMap[topic].p2pTopic);
}

// Add p2p topic validators
// As they are stored within a kv pair, there is no need to register them conditionally
// based on the client type
const topicValidators = {
[Tx.p2pTopic]: this.validatePropagatedTxFromMessage.bind(this),
[BlockAttestation.p2pTopic]: this.validatePropagatedAttestationFromMessage.bind(this),
[BlockProposal.p2pTopic]: this.validatePropagatedBlockFromMessage.bind(this),
[EpochProofQuote.p2pTopic]: this.validatePropagatedEpochProofQuoteFromMessage.bind(this),
};
for (const [topic, validator] of Object.entries(topicValidators)) {
this.node.services.pubsub.topicValidators.set(topic, validator);
}

// add GossipSub listener
this.node.services.pubsub.addEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this));

// Start running promise for peer discovery
this.discoveryRunningPromise = new RunningPromise(
() => this.peerManager.heartbeat(),
this.logger,
this.config.peerCheckIntervalMS,
);
this.discoveryRunningPromise.start();

// Define the sub protocol validators - This is done within this start() method to gain a callback to the existing validateTx function
const reqrespSubProtocolValidators = {
...DEFAULT_SUB_PROTOCOL_VALIDATORS,
[ReqRespSubProtocol.TX]: this.validateRequestedTx.bind(this),
};
await this.reqresp.start(this.requestResponseHandlers, reqrespSubProtocolValidators);
this.logger.info(`Started P2P service`, {
listen: tcpListenAddress,
announce: announceTcpMultiaddr,
peerId: this.node.peerId.toString(),
});
}

/**
* Stops the LibP2P service.
* @returns An empty promise.
*/
public async stop() {
// Remove gossip sub listener
this.node.services.pubsub.removeEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this));

// Stop peer manager
this.logger.debug('Stopping peer manager...');
this.peerManager.stop();

this.logger.debug('Stopping job queue...');
await this.jobQueue.end();
this.logger.debug('Stopping running promise...');
await this.discoveryRunningPromise?.stop();
this.logger.debug('Stopping peer discovery service...');
await this.peerDiscoveryService.stop();
this.logger.debug('Request response service stopped...');
await this.reqresp.stop();
this.logger.debug('Stopping LibP2P...');
await this.stopLibP2P();
this.logger.info('LibP2P service stopped');
}

/**
* Creates an instance of the LibP2P service.
* @param config - The configuration to use when creating the service.
Expand Down Expand Up @@ -334,15 +252,6 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
},
});

// Create request response protocol handlers
const txHandler = reqRespTxHandler(mempools);

const requestResponseHandlers = {
[ReqRespSubProtocol.PING]: pingHandler,
[ReqRespSubProtocol.STATUS]: statusHandler,
[ReqRespSubProtocol.TX]: txHandler,
};

return new LibP2PService(
clientType,
config,
Expand All @@ -354,10 +263,109 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
proofVerifier,
worldStateSynchronizer,
telemetry,
requestResponseHandlers,
);
}

/**
* Starts the LibP2P service.
* @returns An empty promise.
*/
public async start() {
// Check if service is already started
if (this.node.status === 'started') {
throw new Error('P2P service already started');
}

// Get listen & announce addresses for logging
const { tcpListenAddress, tcpAnnounceAddress } = this.config;
if (!tcpAnnounceAddress) {
throw new Error('Announce address not provided.');
}
const announceTcpMultiaddr = convertToMultiaddr(tcpAnnounceAddress, 'tcp');

// Start job queue, peer discovery service and libp2p node
this.jobQueue.start();
await this.peerDiscoveryService.start();
await this.node.start();

// Subscribe to standard GossipSub topics by default
for (const topic of getTopicTypeForClientType(this.clientType)) {
this.subscribeToTopic(TopicTypeMap[topic].p2pTopic);
}

// Create request response protocol handlers
const txHandler = reqRespTxHandler(this.mempools);
const goodbyeHandler = reqGoodbyeHandler(this.peerManager);

const requestResponseHandlers = {
[ReqRespSubProtocol.PING]: pingHandler,
[ReqRespSubProtocol.STATUS]: statusHandler,
[ReqRespSubProtocol.TX]: txHandler.bind(this),
[ReqRespSubProtocol.GOODBYE]: goodbyeHandler.bind(this),
};

// Add p2p topic validators
// As they are stored within a kv pair, there is no need to register them conditionally
// based on the client type
const topicValidators = {
[Tx.p2pTopic]: this.validatePropagatedTxFromMessage.bind(this),
[BlockAttestation.p2pTopic]: this.validatePropagatedAttestationFromMessage.bind(this),
[BlockProposal.p2pTopic]: this.validatePropagatedBlockFromMessage.bind(this),
[EpochProofQuote.p2pTopic]: this.validatePropagatedEpochProofQuoteFromMessage.bind(this),
};
for (const [topic, validator] of Object.entries(topicValidators)) {
this.node.services.pubsub.topicValidators.set(topic, validator);
}

// add GossipSub listener
this.node.services.pubsub.addEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this));

// Start running promise for peer discovery
this.discoveryRunningPromise = new RunningPromise(
() => this.peerManager.heartbeat(),
this.logger,
this.config.peerCheckIntervalMS,
);
this.discoveryRunningPromise.start();

// Define the sub protocol validators - This is done within this start() method to gain a callback to the existing validateTx function
const reqrespSubProtocolValidators = {
...DEFAULT_SUB_PROTOCOL_VALIDATORS,
[ReqRespSubProtocol.TX]: this.validateRequestedTx.bind(this),
};
await this.reqresp.start(requestResponseHandlers, reqrespSubProtocolValidators);
this.logger.info(`Started P2P service`, {
listen: tcpListenAddress,
announce: announceTcpMultiaddr,
peerId: this.node.peerId.toString(),
});
}

/**
* Stops the LibP2P service.
* @returns An empty promise.
*/
public async stop() {
// Remove gossip sub listener
this.node.services.pubsub.removeEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this));

// Stop peer manager
this.logger.debug('Stopping peer manager...');
await this.peerManager.stop();

this.logger.debug('Stopping job queue...');
await this.jobQueue.end();
this.logger.debug('Stopping running promise...');
await this.discoveryRunningPromise?.stop();
this.logger.debug('Stopping peer discovery service...');
await this.peerDiscoveryService.stop();
this.logger.debug('Request response service stopped...');
await this.reqresp.stop();
this.logger.debug('Stopping LibP2P...');
await this.stopLibP2P();
this.logger.info('LibP2P service stopped');
}

public getPeers(includePending?: boolean): PeerInfo[] {
return this.peerManager.getPeers(includePending);
}
Expand Down
41 changes: 41 additions & 0 deletions yarn-project/p2p/src/services/peer-manager/metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import {
Attributes,
Metrics,
type TelemetryClient,
type Tracer,
type UpDownCounter,
ValueType,
} from '@aztec/telemetry-client';

import { type GoodByeReason, prettyGoodbyeReason } from '../reqresp/protocols/index.js';

export class PeerManagerMetrics {
private sentGoodbyes: UpDownCounter;
private receivedGoodbyes: UpDownCounter;

public readonly tracer: Tracer;

constructor(public readonly telemetryClient: TelemetryClient, name = 'PeerManager') {
this.tracer = telemetryClient.getTracer(name);

const meter = telemetryClient.getMeter(name);
this.sentGoodbyes = meter.createUpDownCounter(Metrics.PEER_MANAGER_GOODBYES_SENT, {
description: 'Number of goodbyes sent to peers',
unit: 'peers',
valueType: ValueType.INT,
});
this.receivedGoodbyes = meter.createUpDownCounter(Metrics.PEER_MANAGER_GOODBYES_RECEIVED, {
description: 'Number of goodbyes received from peers',
unit: 'peers',
valueType: ValueType.INT,
});
}

public recordGoodbyeSent(reason: GoodByeReason) {
this.sentGoodbyes.add(1, { [Attributes.P2P_GOODBYE_REASON]: prettyGoodbyeReason(reason) });
}

public recordGoodbyeReceived(reason: GoodByeReason) {
this.receivedGoodbyes.add(1, { [Attributes.P2P_GOODBYE_REASON]: prettyGoodbyeReason(reason) });
}
}
Loading

0 comments on commit 046968f

Please sign in to comment.