Skip to content

Commit

Permalink
switch to Prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Jan 7, 2023
1 parent f53f7e6 commit f6b2ad9
Showing 1 changed file with 69 additions and 104 deletions.
173 changes: 69 additions & 104 deletions p2p/net/swarm/swarm_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,98 +9,67 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"

ma "github.com/multiformats/go-multiaddr"

"github.com/libp2p/go-libp2p/core/network"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"github.com/prometheus/client_golang/prometheus"
)

const metricNamespace = "swarm/"
const metricNamespace = "libp2p_swarm_"

var (
connsOpened = stats.Int64(metricNamespace+"connections_opened", "Connections Opened", stats.UnitDimensionless)
connsClosed = stats.Int64(metricNamespace+"connections_closed", "Connections Closed", stats.UnitDimensionless)
keyType = stats.Int64(metricNamespace+"key_type", "libp2p key type", stats.UnitDimensionless)
dialError = stats.Int64(metricNamespace+"dial_error", "Dial Error", stats.UnitDimensionless)
connDuration = stats.Float64(metricNamespace+"connection_duration", "Duration of a Connection", stats.UnitSeconds)
connHandshakeLatency = stats.Float64(metricNamespace+"handshake_latency", "Duration of the libp2p handshake", stats.UnitSeconds)
connsOpened = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: metricNamespace + "connections_opened_total",
Help: "Connections Opened",
},
[]string{"dir", "transport", "security", "muxer"},
)
keyTypes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: metricNamespace + "key_types_total",
Help: "key type",
},
[]string{"dir", "key_type"},
)
connsClosed = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: metricNamespace + "connections_closed_total",
Help: "Connections Closed",
},
[]string{"dir", "transport", "security", "muxer"},
)
dialError = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: metricNamespace + "dial_errors_total",
Help: "Dial Error",
},
[]string{"error"},
)
connDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: metricNamespace + "connection_duration_seconds",
Help: "Duration of a Connection",
Buckets: prometheus.ExponentialBuckets(1.0/16, 2, 25), // up to 24 days
},
[]string{"dir", "transport", "security", "muxer"},
)
connHandshakeLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: metricNamespace + "handshake_latency_seconds",
Help: "Duration of the libp2p Handshake",
Buckets: prometheus.ExponentialBuckets(0.001, 1.3, 35),
},
[]string{"transport", "security", "muxer"},
)
)

var (
directionTag, _ = tag.NewKey("dir")
transportTag, _ = tag.NewKey("transport")
securityTag, _ = tag.NewKey("security")
muxerTag, _ = tag.NewKey("muxer")
dialErrorTag, _ = tag.NewKey("dial_error")
keyTypeTag, _ = tag.NewKey("key_type")
)

var transports = [...]int{ma.P_CIRCUIT, ma.P_WEBRTC, ma.P_WEBTRANSPORT, ma.P_QUIC, ma.P_QUIC_V1, ma.P_WSS, ma.P_WS, ma.P_TCP}

func exponentialDistribution(min, max float64) []float64 {
var v []float64
for d := min; d < 2*max; d *= 2 {
v = append(v, d)
}
return v
func init() {
prometheus.MustRegister(connsOpened, keyTypes, connsClosed, dialError, connDuration, connHandshakeLatency)
}

func getHandshakeLatencyBuckes() []float64 {
var buckets []float64
for i := 0.01; i <= 1; i += 0.02 {
buckets = append(buckets, i)
}
for i := 1.1; i <= 5; i += 0.1 {
buckets = append(buckets, i)
}
for i := 5.25; i <= 10; i += 0.25 {
buckets = append(buckets, i)
}
return buckets
}

var (
handshakeLatencySeconds = getHandshakeLatencyBuckes()
connDurationSeconds = exponentialDistribution(250, (7 * 24 * time.Hour).Seconds())
)

var (
connOpenView = &view.View{
Measure: connsOpened,
Aggregation: view.Sum(),
TagKeys: []tag.Key{directionTag, transportTag, securityTag, muxerTag},
}
connClosedView = &view.View{
Measure: connsClosed,
Aggregation: view.Sum(),
TagKeys: []tag.Key{directionTag, transportTag, securityTag, muxerTag},
}
keyTypeView = &view.View{
Measure: keyType,
Aggregation: view.Sum(),
TagKeys: []tag.Key{keyTypeTag},
}
dialErrorView = &view.View{
Measure: dialError,
Aggregation: view.Sum(),
TagKeys: []tag.Key{transportTag, dialErrorTag},
}
connDurationView = &view.View{
Measure: connDuration,
Aggregation: view.Distribution(connDurationSeconds...),
TagKeys: []tag.Key{directionTag, transportTag, securityTag, muxerTag},
}
connHandshakeLatencyView = &view.View{
Measure: connHandshakeLatency,
Aggregation: view.Distribution(handshakeLatencySeconds...),
TagKeys: []tag.Key{transportTag, securityTag, muxerTag},
}
)

var DefaultViews = []*view.View{connOpenView, connClosedView, keyTypeView, dialErrorView, connDurationView, connHandshakeLatencyView}
var transports = [...]int{ma.P_CIRCUIT, ma.P_WEBRTC, ma.P_WEBTRANSPORT, ma.P_QUIC, ma.P_QUIC_V1, ma.P_WSS, ma.P_WS, ma.P_TCP}

func getDirection(dir network.Direction) string {
switch dir {
Expand All @@ -113,50 +82,46 @@ func getDirection(dir network.Direction) string {
}
}

func appendConnectionState(tags []tag.Mutator, cs network.ConnectionState) []tag.Mutator {
func appendConnectionState(tags []string, cs network.ConnectionState) []string {
if cs.Transport == "" {
// This shouldn't happen, unless the transport doesn't properly set the Transport field in the ConnectionState.
tags = append(tags, tag.Upsert(transportTag, "unknown"))
tags = append(tags, "unknown")
} else {
tags = append(tags, tag.Upsert(transportTag, cs.Transport))
tags = append(tags, cs.Transport)
}
// Only set the security and muxer tag for transports that actually use that field.
// These might be empty, depending on the transport.
// For example, QUIC doesn't set security nor muxer.
if cs.Security != "" {
tags = append(tags, tag.Upsert(securityTag, cs.Security))
}
if cs.StreamMultiplexer != "" {
tags = append(tags, tag.Upsert(muxerTag, cs.StreamMultiplexer))
}
tags = append(tags, cs.Security)
tags = append(tags, cs.StreamMultiplexer)
return tags
}

func recordConnectionOpened(dir network.Direction, p crypto.PubKey, cs network.ConnectionState) {
tags := make([]tag.Mutator, 0, 4)
tags = append(tags, tag.Upsert(directionTag, getDirection(dir)))
tags := make([]string, 0, 4)
tags = append(tags, getDirection(dir))
tags = appendConnectionState(tags, cs)
stats.RecordWithTags(context.Background(), tags, connsOpened.M(1))
stats.RecordWithTags(context.Background(), []tag.Mutator{tag.Upsert(keyTypeTag, p.Type().String())}, keyType.M(1))
connsOpened.WithLabelValues(tags...).Inc()
keyTypes.WithLabelValues(getDirection(dir), p.Type().String()).Inc()
}

func recordConnectionClosed(dir network.Direction, cs network.ConnectionState) {
tags := make([]tag.Mutator, 0, 4)
tags = append(tags, tag.Upsert(directionTag, getDirection(dir)))
tags := make([]string, 0, 4)
tags = append(tags, getDirection(dir))
tags = appendConnectionState(tags, cs)
stats.RecordWithTags(context.Background(), tags, connsClosed.M(1))
connsClosed.WithLabelValues(tags...).Inc()
}

func recordConnectionDuration(dir network.Direction, t time.Duration, cs network.ConnectionState) {
tags := make([]tag.Mutator, 0, 4)
tags = append(tags, tag.Upsert(directionTag, getDirection(dir)))
tags := make([]string, 0, 4)
tags = append(tags, getDirection(dir))
tags = appendConnectionState(tags, cs)
stats.RecordWithTags(context.Background(), tags, connDuration.M(t.Seconds()))
connDuration.WithLabelValues(tags...).Observe(t.Seconds())
}

func recordHandshakeLatency(t time.Duration, cs network.ConnectionState) {
tags := make([]tag.Mutator, 0, 3)
tags := make([]string, 0, 3)
tags = appendConnectionState(tags, cs)
stats.RecordWithTags(context.Background(), tags, connHandshakeLatency.M(t.Seconds()))
connHandshakeLatency.WithLabelValues(tags...).Observe(t.Seconds())
}

func recordDialFailed(addr ma.Multiaddr, err error) {
Expand All @@ -183,5 +148,5 @@ func recordDialFailed(addr ma.Multiaddr, err error) {
if e == "other" {
fmt.Printf("transport: %s, category: %s (orig: %s)\n", transport, e, err)
}
stats.RecordWithTags(context.Background(), []tag.Mutator{tag.Upsert(transportTag, transport), tag.Upsert(dialErrorTag, e)}, dialError.M(1))
dialError.WithLabelValues(e).Inc()
}

0 comments on commit f6b2ad9

Please sign in to comment.