Skip to content

Commit

Permalink
feat(p2p): request response node sampling (#11330)
Browse files Browse the repository at this point in the history
## Overview

Rather than serially sendinging request response through a list of
peers, we instead sample from the peer set.
Attempting to connect to a node that we do not already have a connection
open with.

If we are unable to get a free node after a number of attempts, we
simply bite the bullet and dial a peer we are
already connected to.

The aim is for this to decrease experienced rate limiting from other
peers

part of #8458
  • Loading branch information
Maddiaa0 authored Jan 20, 2025
1 parent 85ccc15 commit 6426d90
Show file tree
Hide file tree
Showing 6 changed files with 357 additions and 34 deletions.
4 changes: 4 additions & 0 deletions yarn-project/p2p/src/services/libp2p/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import { noise } from '@chainsafe/libp2p-noise';
import { yamux } from '@chainsafe/libp2p-yamux';
import { identify } from '@libp2p/identify';
import { type Message, type PeerId, TopicValidatorResult } from '@libp2p/interface';
import { type ConnectionManager } from '@libp2p/interface-internal';
import '@libp2p/kad-dht';
import { mplex } from '@libp2p/mplex';
import { tcp } from '@libp2p/tcp';
Expand Down Expand Up @@ -249,6 +250,9 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
},
}),
}) as (components: GossipSubComponents) => GossipSub,
components: (components: { connectionManager: ConnectionManager }) => ({
connectionManager: components.connectionManager,
}),
},
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import { sleep } from '@aztec/foundation/sleep';

import { beforeEach, describe, expect, it, jest } from '@jest/globals';
import { type PeerId, type Stream } from '@libp2p/interface';
import { createSecp256k1PeerId } from '@libp2p/peer-id-factory';
import { type MockProxy, mock } from 'jest-mock-extended';

import { ConnectionSampler, type RandomSampler } from './connection_sampler.js';

describe('ConnectionSampler', () => {
let sampler: ConnectionSampler;
let mockLibp2p: any;
let peers: PeerId[];
let mockRandomSampler: MockProxy<RandomSampler>;

beforeEach(async () => {
// Create some test peer IDs
peers = [await createSecp256k1PeerId(), await createSecp256k1PeerId(), await createSecp256k1PeerId()];

// Mock libp2p
mockLibp2p = {
getPeers: jest.fn().mockReturnValue(peers),
dialProtocol: jest.fn(),
};

mockRandomSampler = mock<RandomSampler>();
mockRandomSampler.random.mockReturnValue(0);

sampler = new ConnectionSampler(mockLibp2p, 500, mockRandomSampler);
});

afterEach(async () => {
await sampler.stop();
});

describe('getPeer', () => {
it('returns a random peer from the list', () => {
const peer = sampler.getPeer();
expect(peers).toContain(peer);
});

it('attempts to find peer with no active connections', async () => {
// Setup: Create active connection to first two peers
const mockStream1: Partial<Stream> = { id: '1', close: jest.fn() } as Partial<Stream>;
const mockStream2: Partial<Stream> = { id: '2', close: jest.fn() } as Partial<Stream>;

mockLibp2p.dialProtocol.mockResolvedValueOnce(mockStream1).mockResolvedValueOnce(mockStream2);

await sampler.dialProtocol(peers[0], 'test');
await sampler.dialProtocol(peers[1], 'test');

// Force Math.random to return values that would select the first two peers
mockRandomSampler.random.mockReturnValueOnce(0).mockReturnValueOnce(1).mockReturnValueOnce(2);

const selectedPeer = sampler.getPeer();
// Should select peers[2] as it has no active connections
expect(selectedPeer).toBe(peers[2]);
});
});

describe('connection management', () => {
it('correctly tracks active connections', async () => {
const mockStream: Partial<Stream> = {
id: '1',
close: jest.fn().mockImplementation(() => Promise.resolve()),
} as Partial<Stream>;

mockLibp2p.dialProtocol.mockResolvedValue(mockStream);

// Open connection
const stream = await sampler.dialProtocol(peers[0], 'test');
expect(stream).toBe(mockStream);

// Verify internal state
expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(1);
expect((sampler as any).streams.has('1')).toBe(true);

// Close connection
await sampler.close('1');

// Verify cleanup
expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(0);
expect((sampler as any).streams.has('1')).toBe(false);
expect(mockStream.close).toHaveBeenCalled();
});

it('handles multiple connections to same peer', async () => {
const mockStream1: Partial<Stream> = {
id: '1',
close: jest.fn(),
} as Partial<Stream>;
const mockStream2: Partial<Stream> = {
id: '2',
close: jest.fn(),
} as Partial<Stream>;

mockLibp2p.dialProtocol.mockResolvedValueOnce(mockStream1).mockResolvedValueOnce(mockStream2);

await sampler.dialProtocol(peers[0], 'test');
await sampler.dialProtocol(peers[0], 'test');

expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(2);

await sampler.close('1');
expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(1);

await sampler.close('2');
expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(0);
});

it('handles errors during connection close', async () => {
const mockStream: Partial<Stream> = {
id: '1',
close: jest.fn().mockImplementation(() => Promise.reject(new Error('Failed to close'))),
} as Partial<Stream>;

mockLibp2p.dialProtocol.mockResolvedValue(mockStream);

await sampler.dialProtocol(peers[0], 'test');
await sampler.close('1');

// Should still clean up internal state even if close fails
expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(0);
expect((sampler as any).streams.has('1')).toBe(false);
});
});

describe('cleanup', () => {
it('cleans up stale connections', async () => {
const mockStream: Partial<Stream> = {
id: '1',
close: jest.fn(),
} as Partial<Stream>;

mockLibp2p.dialProtocol.mockResolvedValue(mockStream);
await sampler.dialProtocol(peers[0], 'test');

// Manually set activeConnectionsCount to 0 to simulate lost accounting
(sampler as any).activeConnectionsCount.set(peers[0], 0);

// Trigger cleanup
await sleep(600);

expect(mockStream.close).toHaveBeenCalled();
expect((sampler as any).streams.has('1')).toBe(false);
});

it('properly cleans up on stop', async () => {
const mockStream1: Partial<Stream> = {
id: '1',
close: jest.fn(),
} as Partial<Stream>;
const mockStream2: Partial<Stream> = {
id: '2',
close: jest.fn(),
} as Partial<Stream>;

mockLibp2p.dialProtocol.mockResolvedValueOnce(mockStream1).mockResolvedValueOnce(mockStream2);

await sampler.dialProtocol(peers[0], 'test');
await sampler.dialProtocol(peers[1], 'test');

await sampler.stop();

expect(mockStream1.close).toHaveBeenCalled();
expect(mockStream2.close).toHaveBeenCalled();
expect((sampler as any).streams.size).toBe(0);
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import { createLogger } from '@aztec/foundation/log';
import { RunningPromise } from '@aztec/foundation/running-promise';

import { type Libp2p, type PeerId, type Stream } from '@libp2p/interface';

const MAX_SAMPLE_ATTEMPTS = 4;

interface StreamAndPeerId {
stream: Stream;
peerId: PeerId;
}

export class RandomSampler {
random(max: number) {
return Math.floor(Math.random() * max);
}
}

/**
* A class that samples peers from the libp2p node and returns a peer that we don't already have a connection open to.
* If we already have a connection open, we try to sample a different peer.
* We do this MAX_SAMPLE_ATTEMPTS times, if we still don't find a peer we just go for it.
*
* @dev Close must always be called on connections, else memory leak
*/
export class ConnectionSampler {
private readonly logger = createLogger('p2p:reqresp:connection-sampler');
private cleanupJob?: RunningPromise;

private readonly activeConnectionsCount: Map<PeerId, number> = new Map();
private readonly streams: Map<string, StreamAndPeerId> = new Map();

constructor(
private readonly libp2p: Libp2p,
private readonly cleanupIntervalMs: number = 60000, // Default to 1 minute

// Random sampler provided so that it can be mocked
private readonly sampler: RandomSampler = new RandomSampler(),
) {
this.cleanupJob = new RunningPromise(() => this.cleanupStaleConnections(), this.logger, this.cleanupIntervalMs);
this.cleanupJob.start();
}

/**
* Stops the cleanup job and closes all active connections
*/
async stop() {
await this.cleanupJob?.stop();

// Close all active streams
const closePromises = Array.from(this.streams.keys()).map(streamId => this.close(streamId));

await Promise.all(closePromises);
}

getPeer(): PeerId {
const peers = this.libp2p.getPeers();

let randomIndex = this.sampler.random(peers.length);
let attempts = 0;
// If the active connections count is greater than 0, then we already have a connection open
// So we try to sample a different peer, but only MAX_SAMPLE_ATTEMPTS times
while ((this.activeConnectionsCount.get(peers[randomIndex]) ?? 0) > 0 && attempts < MAX_SAMPLE_ATTEMPTS) {
randomIndex = this.sampler.random(peers.length);
attempts++;
}

this.logger.trace(`Sampled peer in ${attempts} attempts`, {
attempts,
peer: peers[randomIndex]?.toString(),
});
return peers[randomIndex];
}

// Set of passthrough functions to keep track of active connections

/**
* Dials a protocol and returns the stream
*
* @param peerId - The peer id
* @param protocol - The protocol
* @returns The stream
*/
async dialProtocol(peerId: PeerId, protocol: string): Promise<Stream> {
const stream = await this.libp2p.dialProtocol(peerId, protocol);
this.streams.set(stream.id, { stream, peerId });

const updatedActiveConnectionsCount = (this.activeConnectionsCount.get(peerId) ?? 0) + 1;
this.activeConnectionsCount.set(peerId, updatedActiveConnectionsCount);

this.logger.trace(`Dialed protocol ${protocol} with peer ${peerId.toString()}`, {
streamId: stream.id,
peerId: peerId.toString(),
activeConnectionsCount: updatedActiveConnectionsCount,
});
return stream;
}

/**
* Closes a stream and updates the active connections count
*
* @param streamId - The stream id
*/
async close(streamId: string): Promise<void> {
try {
const { stream, peerId } = this.streams.get(streamId)!;

const updatedActiveConnectionsCount = (this.activeConnectionsCount.get(peerId) ?? 1) - 1;
this.activeConnectionsCount.set(peerId, updatedActiveConnectionsCount);

this.logger.trace(`Closing connection to peer ${peerId.toString()}`, {
streamId,
peerId: peerId.toString(),
protocol: stream.protocol,
activeConnectionsCount: updatedActiveConnectionsCount,
});

await stream?.close();
} catch (error) {
this.logger.error(`Failed to close connection to peer ${streamId}`, { error });
} finally {
this.streams.delete(streamId);
}
}

/**
* Cleans up stale connections that we have lost accounting for
*/
private async cleanupStaleConnections() {
// Look for streams without anything in the activeConnectionsCount
// If we find anything, close the stream
for (const [streamId, { peerId }] of this.streams.entries()) {
try {
// Check if we have lost track of accounting
if (this.activeConnectionsCount.get(peerId) === 0) {
await this.close(streamId);
this.logger.debug(`Cleaned up stale connection ${streamId} to peer ${peerId.toString()}`);
}
} catch (error) {
this.logger.error(`Error cleaning up stale connection ${streamId}`, { error });
}
}
}
}
Loading

0 comments on commit 6426d90

Please sign in to comment.