diff --git a/package.json b/package.json index c2e18a7..2a58770 100644 --- a/package.json +++ b/package.json @@ -174,6 +174,10 @@ "dependencies": { "@libp2p/components": "^2.0.0", "@libp2p/crypto": "^1.0.0", + "@libp2p/interface-connection": "^2.0.0", + "@libp2p/interface-peer-id": "^1.0.2", + "@libp2p/interface-pubsub": "^1.0.3", + "@libp2p/interface-registrar": "^2.0.0", "@libp2p/interfaces": "^3.0.2", "@libp2p/logger": "^2.0.0", "@libp2p/peer-collections": "^1.0.0", @@ -191,10 +195,6 @@ "uint8arrays": "^3.0.0" }, "devDependencies": { - "@libp2p/interface-connection": "^2.0.0", - "@libp2p/interface-peer-id": "^1.0.2", - "@libp2p/interface-pubsub": "^1.0.1", - "@libp2p/interface-registrar": "^2.0.0", "@libp2p/peer-id-factory": "^1.0.0", "aegir": "^37.2.0", "delay": "^5.0.0", diff --git a/src/index.ts b/src/index.ts index a8d0f52..5f481d1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -65,6 +65,8 @@ export abstract class PubSubBaseProtocol extends EventEmi private _registrarTopologyIds: string[] | undefined protected enabled: boolean + private readonly maxInboundStreams: number + private readonly maxOutboundStreams: number constructor (props: PubSubInit) { super() @@ -74,7 +76,9 @@ export abstract class PubSubBaseProtocol extends EventEmi globalSignaturePolicy = 'StrictSign', canRelayMessage = false, emitSelf = false, - messageProcessingConcurrency = 10 + messageProcessingConcurrency = 10, + maxInboundStreams = 1, + maxOutboundStreams = 1 } = props this.multicodecs = ensureArray(multicodecs) @@ -88,6 +92,8 @@ export abstract class PubSubBaseProtocol extends EventEmi this.emitSelf = emitSelf this.topicValidators = new Map() this.queue = new Queue({ concurrency: messageProcessingConcurrency }) + this.maxInboundStreams = maxInboundStreams + this.maxOutboundStreams = maxOutboundStreams this._onIncomingStream = this._onIncomingStream.bind(this) this._onPeerConnected = this._onPeerConnected.bind(this) @@ -115,7 +121,10 @@ export abstract class PubSubBaseProtocol extends EventEmi const registrar = this.components.getRegistrar() // Incoming streams // Called after a peer dials us - await Promise.all(this.multicodecs.map(async multicodec => await registrar.handle(multicodec, this._onIncomingStream))) + await Promise.all(this.multicodecs.map(async multicodec => await registrar.handle(multicodec, this._onIncomingStream, { + maxInboundStreams: this.maxInboundStreams, + maxOutboundStreams: this.maxOutboundStreams + }))) // register protocol with topology // Topology callbacks called on connection manager changes