Skip to content

Commit

Permalink
feat: add pending connections count to metrics (#2713)
Browse files Browse the repository at this point in the history
Allows graphing pending (e.g. pre-upgrade) connections alongside fully open connections.

---------

Co-authored-by: Daniel N <2color@users.noreply.github.com>
Co-authored-by: achingbrain <alex@achingbrain.net>
  • Loading branch information
3 people authored Sep 24, 2024
1 parent 859f535 commit b3272cf
Showing 1 changed file with 42 additions and 32 deletions.
74 changes: 42 additions & 32 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
private readonly deny: Multiaddr[]
private readonly maxIncomingPendingConnections: number
private incomingPendingConnections: number
private outboundPendingConnections: number
private readonly maxConnections: number

public readonly dialQueue: DialQueue
Expand Down Expand Up @@ -200,6 +201,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
this.allow = (init.allow ?? []).map(ma => multiaddr(ma))
this.deny = (init.deny ?? []).map(ma => multiaddr(ma))

this.outboundPendingConnections = 0
this.incomingPendingConnections = 0
this.maxIncomingPendingConnections = init.maxIncomingPendingConnections ?? defaultOptions.maxIncomingPendingConnections

Expand Down Expand Up @@ -261,7 +263,9 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
calculate: () => {
const metric = {
inbound: 0,
outbound: 0
'inbound pending': this.incomingPendingConnections,
outbound: 0,
'outbound pending': this.outboundPendingConnections
}

for (const conns of this.connections.values()) {
Expand Down Expand Up @@ -462,48 +466,54 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {

options.signal?.throwIfAborted()

const { peerId } = getPeerAddress(peerIdOrMultiaddr)
try {
this.outboundPendingConnections++

if (peerId != null && options.force !== true) {
this.log('dial %p', peerId)
const existingConnection = this.getConnections(peerId)
.find(conn => conn.limits == null)
const { peerId } = getPeerAddress(peerIdOrMultiaddr)

if (existingConnection != null) {
this.log('had an existing non-limited connection to %p', peerId)
if (peerId != null && options.force !== true) {
this.log('dial %p', peerId)
const existingConnection = this.getConnections(peerId)
.find(conn => conn.limits == null)

options.onProgress?.(new CustomProgressEvent('dial-queue:already-connected'))
return existingConnection
if (existingConnection != null) {
this.log('had an existing non-limited connection to %p', peerId)

options.onProgress?.(new CustomProgressEvent('dial-queue:already-connected'))
return existingConnection
}
}
}

const connection = await this.dialQueue.dial(peerIdOrMultiaddr, {
...options,
priority: options.priority ?? DEFAULT_DIAL_PRIORITY
})
let peerConnections = this.connections.get(connection.remotePeer)
const connection = await this.dialQueue.dial(peerIdOrMultiaddr, {
...options,
priority: options.priority ?? DEFAULT_DIAL_PRIORITY
})
let peerConnections = this.connections.get(connection.remotePeer)

if (peerConnections == null) {
peerConnections = []
this.connections.set(connection.remotePeer, peerConnections)
}
if (peerConnections == null) {
peerConnections = []
this.connections.set(connection.remotePeer, peerConnections)
}

// we get notified of connections via the Upgrader emitting "connection"
// events, double check we aren't already tracking this connection before
// storing it
let trackedConnection = false
// we get notified of connections via the Upgrader emitting "connection"
// events, double check we aren't already tracking this connection before
// storing it
let trackedConnection = false

for (const conn of peerConnections) {
if (conn.id === connection.id) {
trackedConnection = true
for (const conn of peerConnections) {
if (conn.id === connection.id) {
trackedConnection = true
}
}
}

if (!trackedConnection) {
peerConnections.push(connection)
}
if (!trackedConnection) {
peerConnections.push(connection)
}

return connection
return connection
} finally {
this.outboundPendingConnections--
}
}

async closeConnections (peerId: PeerId, options: AbortOptions = {}): Promise<void> {
Expand Down

0 comments on commit b3272cf

Please sign in to comment.