Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: reenable reqresp offline peers test #11384

Merged
merged 5 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions yarn-project/p2p/src/mocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,7 @@ export const startNodes = async (
};

export const stopNodes = async (nodes: ReqRespNode[]): Promise<void> => {
const stopPromises = [];
for (const node of nodes) {
stopPromises.push(node.req.stop());
stopPromises.push(node.p2p.stop());
}
const stopPromises = nodes.flatMap(node => [node.req.stop(), node.p2p.stop()]);
await Promise.all(stopPromises);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export class BatchConnectionSampler {
return;
}

const excluding = new Map([[peerId, true]]);
const excluding = new Map([[peerId.toString(), true]]);
const newPeer = this.connectionSampler.getPeer(excluding);

if (newPeer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ describe('ConnectionSampler', () => {
let sampler: ConnectionSampler;
let mockLibp2p: any;
let peers: PeerId[];
let excluding: Map<PeerId, boolean>;
let excluding: Map<string, boolean>;
let mockRandomSampler: MockProxy<RandomSampler>;

beforeEach(async () => {
Expand Down Expand Up @@ -73,7 +73,7 @@ describe('ConnectionSampler', () => {
.mockReturnValueOnce(0)
.mockReturnValueOnce(1);

excluding.set(peers[0], true);
excluding.set(peers[0].toString(), true);
const selectedPeer = sampler.getPeer(excluding);
expect(selectedPeer).toBe(peers[1]);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { createLogger } from '@aztec/foundation/log';
import { SerialQueue } from '@aztec/foundation/queue';
import { RunningPromise } from '@aztec/foundation/running-promise';

import { type Libp2p, type PeerId, type Stream } from '@libp2p/interface';

Expand All @@ -26,7 +25,8 @@ export class RandomSampler {
*/
export class ConnectionSampler {
private readonly logger = createLogger('p2p:reqresp:connection-sampler');
private cleanupJob: RunningPromise;
private cleanupInterval: NodeJS.Timeout;
private abortController: AbortController = new AbortController();

private readonly activeConnectionsCount: Map<PeerId, number> = new Map();
private readonly streams: Map<string, StreamAndPeerId> = new Map();
Expand All @@ -39,8 +39,7 @@ export class ConnectionSampler {
private readonly cleanupIntervalMs: number = 60000, // Default to 1 minute
private readonly sampler: RandomSampler = new RandomSampler(), // Allow randomness to be mocked for testing
) {
this.cleanupJob = new RunningPromise(() => this.cleanupStaleConnections(), this.logger, this.cleanupIntervalMs);
this.cleanupJob.start();
this.cleanupInterval = setInterval(() => void this.cleanupStaleConnections(), this.cleanupIntervalMs);

this.dialQueue.start();
}
Expand All @@ -50,7 +49,9 @@ export class ConnectionSampler {
*/
async stop() {
this.logger.info('Stopping connection sampler');
await this.cleanupJob.stop();
clearInterval(this.cleanupInterval);

this.abortController.abort();
await this.dialQueue.end();

// Close all active streams
Expand All @@ -65,7 +66,7 @@ export class ConnectionSampler {
* This is to prevent sampling with replacement
* @returns
*/
getPeer(excluding?: Map<PeerId, boolean>): PeerId | undefined {
getPeer(excluding?: Map<string, boolean>): PeerId | undefined {
const peers = this.libp2p.getPeers();

if (peers.length === 0) {
Expand All @@ -80,7 +81,8 @@ export class ConnectionSampler {
// - either the peer has active connections OR is in the exclusion list
while (
attempts < MAX_SAMPLE_ATTEMPTS &&
((this.activeConnectionsCount.get(peers[randomIndex]) ?? 0) > 0 || (excluding?.get(peers[randomIndex]) ?? false))
((this.activeConnectionsCount.get(peers[randomIndex]) ?? 0) > 0 ||
(excluding?.get(peers[randomIndex]?.toString()) ?? false))
) {
randomIndex = this.sampler.random(peers.length);
attempts++;
Expand Down Expand Up @@ -143,7 +145,9 @@ export class ConnectionSampler {
async dialProtocol(peerId: PeerId, protocol: string): Promise<Stream> {
// Dialling at the same time can cause race conditions where two different streams
// end up with the same id, hence a serial queue
const stream = await this.dialQueue.put(() => this.libp2p.dialProtocol(peerId, protocol));
const stream = await this.dialQueue.put(() =>
this.libp2p.dialProtocol(peerId, protocol, { signal: this.abortController.signal }),
);

this.streams.set(stream.id, { stream, peerId });
const updatedActiveConnectionsCount = (this.activeConnectionsCount.get(peerId) ?? 0) + 1;
Expand Down
16 changes: 10 additions & 6 deletions yarn-project/p2p/src/services/reqresp/reqresp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ describe('ReqResp', () => {

afterEach(async () => {
if (nodes) {
await stopNodes(nodes as ReqRespNode[]);
await stopNodes(nodes);
}
});

Expand Down Expand Up @@ -74,15 +74,17 @@ describe('ReqResp', () => {
await connectToPeers(nodes);
await sleep(500);

void ponger.stop();
const stopPonger = ponger.stop();

// It should return undefined if it cannot dial the peer
const res = await pinger.sendRequest(ReqRespSubProtocol.PING, PING_REQUEST);

expect(res).toBeUndefined();

await stopPonger;
});

it.skip('should request from a later peer if other peers are offline', async () => {
it('should request from a later peer if other peers are offline', async () => {
nodes = await createNodes(peerScoring, 4);

await startNodes(nodes);
Expand All @@ -91,22 +93,24 @@ describe('ReqResp', () => {
await sleep(500);

// Stop the second middle two nodes
void nodes[1].req.stop();
void nodes[2].req.stop();
const stopNode1 = nodes[1].req.stop();
const stopNode2 = nodes[2].req.stop();

// send from the first node
let res = await nodes[0].req.sendRequest(ReqRespSubProtocol.PING, PING_REQUEST);

if (!res) {
// The peer chosen is randomly selected, and the node above wont respond, so if
// we wait and try again, there will only be one node to chose from
logger.debug('No response from node, retrying');
logger.debug('\n\n\n\n\nNo response from node, retrying\n\n\n\n\n');
await sleep(500);
res = await nodes[0].req.sendRequest(ReqRespSubProtocol.PING, PING_REQUEST);
}

// It will randomly try to connect, then hit the correct node
expect(res?.toBuffer().toString('utf-8')).toEqual('pong');

await Promise.all([stopNode1, stopNode2]);
});

it('should hit a rate limit if too many requests are made in quick succession', async () => {
Expand Down
15 changes: 8 additions & 7 deletions yarn-project/p2p/src/services/reqresp/reqresp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ export class ReqResp {
* Stop the reqresp service
*/
async stop() {
// Unregister all handlers
for (const protocol of Object.keys(this.subProtocolHandlers)) {
await this.libp2p.unhandle(protocol);
}
// Unregister handlers in parallel
const unregisterPromises = Object.keys(this.subProtocolHandlers).map(protocol => this.libp2p.unhandle(protocol));
await Promise.all(unregisterPromises);

// Close all active connections
// Close connection sampler
await this.connectionSampler.stop();
this.logger.debug('ReqResp: Connection sampler stopped');

// Close streams in parallel
const closeStreamPromises = this.libp2p.getConnections().map(connection => connection.close());
await Promise.all(closeStreamPromises);
this.logger.debug('ReqResp: All active streams closed');
Expand Down Expand Up @@ -169,16 +169,17 @@ export class ReqResp {
return undefined;
}

const attemptedPeers: Map<PeerId, boolean> = new Map();
const attemptedPeers: Map<string, boolean> = new Map();
for (let i = 0; i < numberOfPeers; i++) {
// Sample a peer to make a request to
const peer = this.connectionSampler.getPeer(attemptedPeers);
this.logger.trace(`Attempting to send request to peer: ${peer?.toString()}`);
if (!peer) {
this.logger.debug('No peers available to send requests to');
return undefined;
}

attemptedPeers.set(peer, true);
attemptedPeers.set(peer.toString(), true);

this.logger.trace(`Sending request to peer: ${peer.toString()}`);
const response = await this.sendRequestToPeer(peer, subProtocol, requestBuffer);
Expand Down
Loading