diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts index 574e5be55cc2..a44164eed09f 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts @@ -1,4 +1,5 @@ import { createLogger } from '@aztec/foundation/log'; +import { RunningPromise } from '@aztec/foundation/running-promise'; import { type Libp2p, type PeerId, type Stream } from '@libp2p/interface'; @@ -24,9 +25,10 @@ export class RandomSampler { */ export class ConnectionSampler { private readonly logger = createLogger('p2p:reqresp:connection-sampler'); + private cleanupJob?: RunningPromise; + private readonly activeConnectionsCount: Map = new Map(); private readonly streams: Map = new Map(); - private cleanupInterval?: NodeJS.Timeout; constructor( private readonly libp2p: Libp2p, @@ -35,22 +37,15 @@ export class ConnectionSampler { // Random sampler provided so that it can be mocked private readonly sampler: RandomSampler = new RandomSampler(), ) { - this.startCleanupJob(); - } - - private startCleanupJob() { - this.cleanupInterval = setInterval(() => { - void this.cleanupStaleConnections(); - }, this.cleanupIntervalMs); + this.cleanupJob = new RunningPromise(() => this.cleanupStaleConnections(), this.logger, this.cleanupIntervalMs); + this.cleanupJob.start(); } /** * Stops the cleanup job and closes all active connections */ async stop() { - if (this.cleanupInterval) { - clearInterval(this.cleanupInterval); - } + await this.cleanupJob?.stop(); // Close all active streams const closePromises = Array.from(this.streams.keys()).map(streamId => this.close(streamId));