From b212490adbaeead2d035b1533cddcd4ec59f81e5 Mon Sep 17 00:00:00 2001 From: Maddiaa <47148561+Maddiaa0@users.noreply.github.com> Date: Fri, 7 Feb 2025 01:22:04 +0800 Subject: [PATCH] feat(reqresp): send status messages along with reqresp responses (#11727) --- .../p2p/src/services/reqresp/interface.ts | 11 ++ .../reqresp/rate-limiter/rate_limiter.test.ts | 2 +- .../reqresp/rate-limiter/rate_limiter.ts | 4 +- .../reqresp/rate-limiter/rate_limits.ts | 4 +- .../p2p/src/services/reqresp/reqresp.test.ts | 24 +++- .../p2p/src/services/reqresp/reqresp.ts | 135 ++++++++++++++---- .../p2p/src/services/reqresp/status.ts | 59 ++++++++ 7 files changed, 209 insertions(+), 30 deletions(-) create mode 100644 yarn-project/p2p/src/services/reqresp/status.ts diff --git a/yarn-project/p2p/src/services/reqresp/interface.ts b/yarn-project/p2p/src/services/reqresp/interface.ts index bf3e2a67c12..75c35ac9ca1 100644 --- a/yarn-project/p2p/src/services/reqresp/interface.ts +++ b/yarn-project/p2p/src/services/reqresp/interface.ts @@ -3,6 +3,8 @@ import { Fr } from '@aztec/foundation/fields'; import { type PeerId } from '@libp2p/interface'; +import { type ReqRespStatus } from './status.js'; + /* * Request Response Sub Protocols */ @@ -31,6 +33,15 @@ export type ReqRespSubProtocolHandler = (peerId: PeerId, msg: Buffer) => Promise */ export type ReqRespSubProtocolRateLimits = Record; +/** + * The response from the ReqResp protocol + * Consists of a status (Error code) and data + */ +export interface ReqRespResponse { + status: ReqRespStatus; + data: Buffer; +} + /** * A rate limit quota */ diff --git a/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.test.ts b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.test.ts index 68c6e044f3f..30ab47fcff6 100644 --- a/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.test.ts +++ b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.test.ts @@ -77,7 +77,7 @@ describe('rate limiter', () => { expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(false); // Spy on the peer manager and check that penalizePeer is called - expect(peerScoring.penalizePeer).toHaveBeenCalledWith(peerId, PeerErrorSeverity.MidToleranceError); + expect(peerScoring.penalizePeer).toHaveBeenCalledWith(peerId, PeerErrorSeverity.HighToleranceError); }); it('Should allow requests within the global limit', () => { diff --git a/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.ts b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.ts index 5477b65d295..058fe6e24a0 100644 --- a/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.ts +++ b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.ts @@ -200,9 +200,11 @@ export class RequestResponseRateLimiter { switch (rateLimitStatus) { case RateLimitStatus.DeniedPeer: - this.peerScoring.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); + // Hitting a peer specific limit, we should lightly penalise the peer + this.peerScoring.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError); return false; case RateLimitStatus.DeniedGlobal: + // Hitting a global limit, we should not penalise the peer return false; default: return true; diff --git a/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limits.ts b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limits.ts index dde34d64c1e..db358530085 100644 --- a/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limits.ts +++ b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limits.ts @@ -25,11 +25,11 @@ export const DEFAULT_RATE_LIMITS: ReqRespSubProtocolRateLimits = { [ReqRespSubProtocol.TX]: { peerLimit: { quotaTimeMs: 1000, - quotaCount: 5, + quotaCount: 10, }, globalLimit: { quotaTimeMs: 1000, - quotaCount: 10, + quotaCount: 20, }, }, [ReqRespSubProtocol.BLOCK]: { diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts index c8e6313a670..e441bad2efb 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts @@ -22,6 +22,7 @@ import { type PeerScoring } from '../peer-manager/peer_scoring.js'; import { ReqRespSubProtocol, RequestableBuffer } from './interface.js'; import { reqRespBlockHandler } from './protocols/block.js'; import { GoodByeReason, reqGoodbyeHandler } from './protocols/goodbye.js'; +import { ReqRespStatus, prettyPrintReqRespStatus } from './status.js'; const PING_REQUEST = RequestableBuffer.fromBuffer(Buffer.from('ping')); @@ -126,10 +127,21 @@ describe('ReqResp', () => { await sleep(500); // Default rate is set at 1 every 200 ms; so this should fire a few times + const responses = []; for (let i = 0; i < 10; i++) { - await nodes[0].req.sendRequestToPeer(nodes[1].p2p.peerId, ReqRespSubProtocol.PING, Buffer.from('ping')); + // Response object contains the status (error flags) and data + const response = await nodes[0].req.sendRequestToPeer( + nodes[1].p2p.peerId, + ReqRespSubProtocol.PING, + Buffer.from('ping'), + ); + responses.push(response); } + // Check that one of the responses gets a rate limit response + const rateLimitResponse = responses.find(response => response?.status === ReqRespStatus.RATE_LIMIT_EXCEEDED); + expect(rateLimitResponse).toBeDefined(); + // Make sure the error message is logged const errorMessage = `Rate limit exceeded for ${ReqRespSubProtocol.PING} from ${nodes[0].p2p.peerId.toString()}`; expect(loggerSpy).toHaveBeenCalledWith(expect.stringContaining(errorMessage)); @@ -343,7 +355,8 @@ describe('ReqResp', () => { ); // Expect the response to be a buffer of length 1 - expect(response).toEqual(Buffer.from([0x0])); + expect(response?.status).toEqual(ReqRespStatus.SUCCESS); + expect(response?.data).toEqual(Buffer.from([0x0])); }); }); @@ -413,6 +426,8 @@ describe('ReqResp', () => { const batchSize = 12; nodes = await createNodes(peerScoring, 3); + const requesterLoggerSpy = jest.spyOn((nodes[0].req as any).logger, 'debug'); + await startNodes(nodes); await sleep(500); await connectToPeers(nodes); @@ -426,6 +441,11 @@ describe('ReqResp', () => { const res = await nodes[0].req.sendBatchRequest(ReqRespSubProtocol.PING, requests); expect(res).toEqual(expectResponses); + + // Check that we did detect hitting a rate limit + expect(requesterLoggerSpy).toHaveBeenCalledWith( + expect.stringContaining(`${prettyPrintReqRespStatus(ReqRespStatus.RATE_LIMIT_EXCEEDED)}`), + ); }); }); }); diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index b3ebc30f97a..ea82bdfa4ab 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -22,7 +22,8 @@ import { ConnectionSampler } from './connection-sampler/connection_sampler.js'; import { DEFAULT_SUB_PROTOCOL_HANDLERS, DEFAULT_SUB_PROTOCOL_VALIDATORS, - type ReqRespSubProtocol, + type ReqRespResponse, + ReqRespSubProtocol, type ReqRespSubProtocolHandlers, type ReqRespSubProtocolValidators, type SubProtocolMap, @@ -30,6 +31,7 @@ import { } from './interface.js'; import { ReqRespMetrics } from './metrics.js'; import { RequestResponseRateLimiter } from './rate-limiter/rate_limiter.js'; +import { ReqRespStatus, ReqRespStatusError, parseStatusChunk, prettyPrintReqRespStatus } from './status.js'; /** * The Request Response Service @@ -190,10 +192,17 @@ export class ReqResp { this.logger.trace(`Sending request to peer: ${peer.toString()}`); const response = await this.sendRequestToPeer(peer, subProtocol, requestBuffer); + if (response && response.status !== ReqRespStatus.SUCCESS) { + this.logger.debug( + `Request to peer ${peer.toString()} failed with status ${prettyPrintReqRespStatus(response.status)}`, + ); + continue; + } + // If we get a response, return it, otherwise we iterate onto the next peer // We do not consider it a success if we have an empty buffer - if (response && response.length > 0) { - const object = subProtocolMap[subProtocol].response.fromBuffer(response); + if (response && response.data.length > 0) { + const object = subProtocolMap[subProtocol].response.fromBuffer(response.data); // The response validator handles peer punishment within const isValid = await responseValidator(request, object, peer); if (!isValid) { @@ -311,8 +320,22 @@ export class ReqResp { for (const index of indices) { const response = await this.sendRequestToPeer(peer, subProtocol, requestBuffers[index]); - if (response && response.length > 0) { - const object = subProtocolMap[subProtocol].response.fromBuffer(response); + // Check the status of the response buffer + if (response && response.status !== ReqRespStatus.SUCCESS) { + this.logger.debug( + `Request to peer ${peer.toString()} failed with status ${prettyPrintReqRespStatus( + response.status, + )}`, + ); + + // If we hit a rate limit or some failure, we remove the peer and return the results, + // they will be split among remaining peers and the new sampled peer + batchSampler.removePeerAndReplace(peer); + return { peer, results: peerResults }; + } + + if (response && response.data.length > 0) { + const object = subProtocolMap[subProtocol].response.fromBuffer(response.data); const isValid = await responseValidator(requests[index], object, peer); if (isValid) { @@ -394,7 +417,7 @@ export class ReqResp { peerId: PeerId, subProtocol: ReqRespSubProtocol, payload: Buffer, - ): Promise { + ): Promise { let stream: Stream | undefined; try { this.metrics.recordRequestSent(subProtocol); @@ -402,8 +425,8 @@ export class ReqResp { stream = await this.connectionSampler.dialProtocol(peerId, subProtocol); // Open the stream with a timeout - const result = await executeTimeout( - (): Promise => pipe([payload], stream!, this.readMessage.bind(this)), + const result = await executeTimeout( + (): Promise => pipe([payload], stream!, this.readMessage.bind(this)), this.individualRequestTimeoutMs, () => new IndividualReqRespTimeoutError(), ); @@ -447,7 +470,15 @@ export class ReqResp { * Categorize the error and log it. */ private categorizeError(e: any, peerId: PeerId, subProtocol: ReqRespSubProtocol): PeerErrorSeverity | undefined { - // Non pubishable errors + // Non punishable errors - we do not expect a response for goodbye messages + if (subProtocol === ReqRespSubProtocol.GOODBYE) { + this.logger.debug('Error encountered on goodbye sub protocol, no penalty', { + peerId: peerId.toString(), + subProtocol, + }); + return undefined; + } + // We do not punish a collective timeout, as the node triggers this interupt, independent of the peer's behaviour const logTags = { peerId: peerId.toString(), @@ -492,14 +523,45 @@ export class ReqResp { /** * Read a message returned from a stream into a single buffer + * + * The message is split into two components + * - The first chunk should contain a control byte, indicating the status of the response see `ReqRespStatus` + * - The second chunk should contain the response data */ - private async readMessage(source: AsyncIterable): Promise { + private async readMessage(source: AsyncIterable): Promise { + let statusBuffer: ReqRespStatus | undefined; const chunks: Uint8Array[] = []; - for await (const chunk of source) { - chunks.push(chunk.subarray()); + + try { + for await (const chunk of source) { + if (statusBuffer === undefined) { + const firstChunkBuffer = chunk.subarray(); + statusBuffer = parseStatusChunk(firstChunkBuffer); + } else { + chunks.push(chunk.subarray()); + } + } + + const messageData = Buffer.concat(chunks); + const message: Buffer = this.snappyTransform.inboundTransformNoTopic(messageData); + + return { + status: statusBuffer ?? ReqRespStatus.UNKNOWN, + data: message, + }; + } catch (e: any) { + this.logger.debug(`Reading message failed: ${e.message}`); + + let status = ReqRespStatus.UNKNOWN; + if (e instanceof ReqRespStatusError) { + status = e.status; + } + + return { + status, + data: Buffer.from([]), + }; } - const messageData = Buffer.concat(chunks); - return this.snappyTransform.inboundTransformNoTopic(messageData); } /** @@ -525,25 +587,28 @@ export class ReqResp { private async streamHandler(protocol: ReqRespSubProtocol, { stream, connection }: IncomingStreamData) { this.metrics.recordRequestReceived(protocol); - // Store a reference to from this for the async generator - if (!this.rateLimiter.allow(protocol, connection.remotePeer)) { - this.logger.warn(`Rate limit exceeded for ${protocol} from ${connection.remotePeer}`); + try { + // Store a reference to from this for the async generator + if (!this.rateLimiter.allow(protocol, connection.remotePeer)) { + this.logger.warn(`Rate limit exceeded for ${protocol} from ${connection.remotePeer}`); - // TODO(#8483): handle changing peer scoring for failed rate limit, maybe differentiate between global and peer limits here when punishing - await stream.close(); - return; - } + throw new ReqRespStatusError(ReqRespStatus.RATE_LIMIT_EXCEEDED); + } - const handler = this.subProtocolHandlers[protocol]; - const transform = this.snappyTransform; + const handler = this.subProtocolHandlers[protocol]; + const transform = this.snappyTransform; - try { await pipe( stream, async function* (source: any) { for await (const chunkList of source) { const msg = Buffer.from(chunkList.subarray()); const response = await handler(connection.remotePeer, msg); + + // Send success code first, then the response + const successChunk = Buffer.from([ReqRespStatus.SUCCESS]); + yield new Uint8Array(successChunk); + yield new Uint8Array(transform.outboundTransformNoTopic(response)); } }, @@ -552,8 +617,30 @@ export class ReqResp { } catch (e: any) { this.logger.warn(e); this.metrics.recordResponseError(protocol); + + // If we receive a known error, we use the error status in the response chunk, otherwise we categorize as unknown + let errorStatus = ReqRespStatus.UNKNOWN; + if (e instanceof ReqRespStatusError) { + errorStatus = e.status; + } + + const sendErrorChunk = this.sendErrorChunk(errorStatus); + + // Return and yield the response chunk + await pipe( + stream, + async function* (_source: any) { + yield* sendErrorChunk; + }, + stream, + ); } finally { await stream.close(); } } + + private async *sendErrorChunk(error: ReqRespStatus): AsyncIterable { + const errorChunk = Buffer.from([error]); + yield new Uint8Array(errorChunk); + } } diff --git a/yarn-project/p2p/src/services/reqresp/status.ts b/yarn-project/p2p/src/services/reqresp/status.ts new file mode 100644 index 00000000000..b37d3ca5bc2 --- /dev/null +++ b/yarn-project/p2p/src/services/reqresp/status.ts @@ -0,0 +1,59 @@ +/** + * The error codes for the ReqResp protocol + */ +export enum ReqRespStatus { + SUCCESS = 0, + RATE_LIMIT_EXCEEDED = 1, + BADLY_FORMED_REQUEST = 2, + UNKNOWN = 127, +} + +export class ReqRespStatusError extends Error { + /** + * The status code + */ + status: ReqRespStatus; + + constructor(status: ReqRespStatus) { + super(`ReqResp Error: ${prettyPrintReqRespStatus(status)}`); + this.status = status; + } +} + +/** + * Parse the status chunk + * @param chunk + * @returns + * + * @throws ReqRespStatusError if the chunk is not valid + */ +export function parseStatusChunk(chunk: Uint8Array): ReqRespStatus { + if (chunk.length !== 1) { + throw new ReqRespStatusError(ReqRespStatus.UNKNOWN); + } + + const status = chunk[0]; + // Check if status is a valid ReqRespStatus value + if (!(status in ReqRespStatus)) { + throw new ReqRespStatusError(ReqRespStatus.UNKNOWN); + } + return status as ReqRespStatus; +} + +/** + * Pretty print the ReqResp status + * @param status + * @returns + */ +export function prettyPrintReqRespStatus(status: ReqRespStatus) { + switch (status) { + case ReqRespStatus.SUCCESS: + return 'SUCCESS'; + case ReqRespStatus.RATE_LIMIT_EXCEEDED: + return 'RATE_LIMIT_EXCEEDED'; + case ReqRespStatus.BADLY_FORMED_REQUEST: + return 'BADLY_FORMED_REQUEST'; + case ReqRespStatus.UNKNOWN: + return 'UNKNOWN'; + } +}