Skip to content

Commit

Permalink
holepunch: add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Mar 24, 2023
1 parent 950151e commit 2b6edec
Show file tree
Hide file tree
Showing 12 changed files with 976 additions and 167 deletions.
475 changes: 475 additions & 0 deletions dashboards/holepunch/holepunch.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/multiformats/go-varint v0.0.7
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/quic-go/quic-go v0.33.0
github.com/quic-go/webtransport-go v0.5.2
github.com/raulk/go-watchdog v1.3.0
Expand Down Expand Up @@ -95,7 +96,6 @@ require (
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
}

if opts.EnableHolePunching {
if opts.EnableMetrics {
hpOpts := []holepunch.Option{
holepunch.WithMetricsTracer(holepunch.NewMetricsTracer(holepunch.WithRegisterer(opts.PrometheusRegisterer)))}
opts.HolePunchingOptions = append(hpOpts, opts.HolePunchingOptions...)

}
h.hps, err = holepunch.NewService(h, h.ids, opts.HolePunchingOptions...)
if err != nil {
return nil, fmt.Errorf("failed to create hole punch service: %w", err)
Expand Down
29 changes: 29 additions & 0 deletions p2p/metricshelper/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package metricshelper

import ma "github.com/multiformats/go-multiaddr"

var transports = [...]int{ma.P_CIRCUIT, ma.P_WEBRTC, ma.P_WEBTRANSPORT, ma.P_QUIC, ma.P_QUIC_V1, ma.P_WSS, ma.P_WS, ma.P_TCP}

func GetTransport(a ma.Multiaddr) string {
for _, t := range transports {
if _, err := a.ValueForProtocol(t); err == nil {
return ma.ProtocolWithCode(t).Name
}
}
return "other"
}

func GetIPVersion(addr ma.Multiaddr) string {
version := "unknown"
ma.ForEach(addr, func(c ma.Component) bool {
if c.Protocol().Code == ma.P_IP4 {
version = "ip4"
return false
} else if c.Protocol().Code == ma.P_IP6 {
version = "ip6"
return false
}
return true
})
return version
}
32 changes: 5 additions & 27 deletions p2p/net/swarm/swarm_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,28 +133,13 @@ func appendConnectionState(tags []string, cs network.ConnectionState) []string {
return tags
}

func getIPVersion(addr ma.Multiaddr) string {
version := "unknown"
ma.ForEach(addr, func(c ma.Component) bool {
if c.Protocol().Code == ma.P_IP4 {
version = "ip4"
return false
} else if c.Protocol().Code == ma.P_IP6 {
version = "ip6"
return false
}
return true
})
return version
}

func (m *metricsTracer) OpenedConnection(dir network.Direction, p crypto.PubKey, cs network.ConnectionState, laddr ma.Multiaddr) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

*tags = append(*tags, metricshelper.GetDirection(dir))
*tags = appendConnectionState(*tags, cs)
*tags = append(*tags, getIPVersion(laddr))
*tags = append(*tags, metricshelper.GetIPVersion(laddr))
connsOpened.WithLabelValues(*tags...).Inc()

*tags = (*tags)[:0]
Expand All @@ -169,7 +154,7 @@ func (m *metricsTracer) ClosedConnection(dir network.Direction, duration time.Du

*tags = append(*tags, metricshelper.GetDirection(dir))
*tags = appendConnectionState(*tags, cs)
*tags = append(*tags, getIPVersion(laddr))
*tags = append(*tags, metricshelper.GetIPVersion(laddr))
connsClosed.WithLabelValues(*tags...).Inc()
connDuration.WithLabelValues(*tags...).Observe(duration.Seconds())
}
Expand All @@ -179,19 +164,12 @@ func (m *metricsTracer) CompletedHandshake(t time.Duration, cs network.Connectio
defer metricshelper.PutStringSlice(tags)

*tags = appendConnectionState(*tags, cs)
*tags = append(*tags, getIPVersion(laddr))
*tags = append(*tags, metricshelper.GetIPVersion(laddr))
connHandshakeLatency.WithLabelValues(*tags...).Observe(t.Seconds())
}

var transports = [...]int{ma.P_CIRCUIT, ma.P_WEBRTC, ma.P_WEBTRANSPORT, ma.P_QUIC, ma.P_QUIC_V1, ma.P_WSS, ma.P_WS, ma.P_TCP}

func (m *metricsTracer) FailedDialing(addr ma.Multiaddr, err error) {
var transport string
for _, t := range transports {
if _, err := addr.ValueForProtocol(t); err == nil {
transport = ma.ProtocolWithCode(t).Name
}
}
transport := metricshelper.GetTransport(addr)
e := "other"
if errors.Is(err, context.Canceled) {
e = "canceled"
Expand All @@ -210,6 +188,6 @@ func (m *metricsTracer) FailedDialing(addr ma.Multiaddr, err error) {
defer metricshelper.PutStringSlice(tags)

*tags = append(*tags, transport, e)
*tags = append(*tags, getIPVersion(addr))
*tags = append(*tags, metricshelper.GetIPVersion(addr))
dialError.WithLabelValues(*tags...).Inc()
}
45 changes: 24 additions & 21 deletions p2p/protocol/holepunch/holepuncher.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,8 @@ func (hp *holePuncher) DirectConnect(p peer.ID) error {

func (hp *holePuncher) directConnect(rp peer.ID) error {
// short-circuit check to see if we already have a direct connection
for _, c := range hp.host.Network().ConnsToPeer(rp) {
if !isRelayAddress(c.RemoteMultiaddr()) {
return nil
}
if getDirectConnection(hp.host, rp) != nil {
return nil
}

// short-circuit hole punching if a direct dial works.
Expand Down Expand Up @@ -133,8 +131,8 @@ func (hp *holePuncher) directConnect(rp peer.ID) error {
log.Debugw("got inbound proxy conn", "peer", rp)

// hole punch
for i := 0; i < maxRetries; i++ {
addrs, rtt, err := hp.initiateHolePunch(rp)
for i := 1; i <= maxRetries; i++ {
addrs, obsAddrs, rtt, err := hp.initiateHolePunch(rp)
if err != nil {
log.Debugw("hole punching failed", "peer", rp, "error", err)
hp.tracer.ProtocolError(rp, err)
Expand All @@ -159,44 +157,49 @@ func (hp *holePuncher) directConnect(rp peer.ID) error {
hp.tracer.EndHolePunch(rp, dt, err)
if err == nil {
log.Debugw("hole punching with successful", "peer", rp, "time", dt)
hp.tracer.HolePunchFinished("initiator", i, addrs, obsAddrs, getDirectConnection(hp.host, rp))
return nil
}
case <-hp.ctx.Done():
timer.Stop()
return hp.ctx.Err()
}
if i == maxRetries {
hp.tracer.HolePunchFinished("initiator", maxRetries, addrs, obsAddrs, nil)
}
}

return fmt.Errorf("all retries for hole punch with peer %s failed", rp)
}

// initiateHolePunch opens a new hole punching coordination stream,
// exchanges the addresses and measures the RTT.
func (hp *holePuncher) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, time.Duration, error) {
func (hp *holePuncher) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, []ma.Multiaddr, time.Duration, error) {
hpCtx := network.WithUseTransient(hp.ctx, "hole-punch")
sCtx := network.WithNoDial(hpCtx, "hole-punch")

str, err := hp.host.NewStream(sCtx, rp, Protocol)
if err != nil {
return nil, 0, fmt.Errorf("failed to open hole-punching stream: %w", err)
return nil, nil, 0, fmt.Errorf("failed to open hole-punching stream: %w", err)
}
defer str.Close()

addr, rtt, err := hp.initiateHolePunchImpl(str)
addr, obsAddr, rtt, err := hp.initiateHolePunchImpl(str)
if err != nil {
log.Debugf("%s", err)
str.Reset()
return addr, rtt, err
return addr, obsAddr, rtt, err
}
return addr, rtt, err
return addr, obsAddr, rtt, err
}

func (hp *holePuncher) initiateHolePunchImpl(str network.Stream) ([]ma.Multiaddr, time.Duration, error) {
func (hp *holePuncher) initiateHolePunchImpl(str network.Stream) ([]ma.Multiaddr, []ma.Multiaddr, time.Duration, error) {
if err := str.Scope().SetService(ServiceName); err != nil {
return nil, 0, fmt.Errorf("error attaching stream to holepunch service: %s", err)
return nil, nil, 0, fmt.Errorf("error attaching stream to holepunch service: %s", err)
}

if err := str.Scope().ReserveMemory(maxMsgSize, network.ReservationPriorityAlways); err != nil {
return nil, 0, fmt.Errorf("error reserving memory for stream: %s", err)
return nil, nil, 0, fmt.Errorf("error reserving memory for stream: %s", err)
}
defer str.Scope().ReleaseMemory(maxMsgSize)

Expand All @@ -211,7 +214,7 @@ func (hp *holePuncher) initiateHolePunchImpl(str network.Stream) ([]ma.Multiaddr
obsAddrs = hp.filter.FilterLocal(str.Conn().RemotePeer(), obsAddrs)
}
if len(obsAddrs) == 0 {
return nil, 0, errors.New("aborting hole punch initiation as we have no public address")
return nil, nil, 0, errors.New("aborting hole punch initiation as we have no public address")
}

start := time.Now()
Expand All @@ -220,17 +223,17 @@ func (hp *holePuncher) initiateHolePunchImpl(str network.Stream) ([]ma.Multiaddr
ObsAddrs: addrsToBytes(obsAddrs),
}); err != nil {
str.Reset()
return nil, 0, err
return nil, nil, 0, err
}

// wait for a CONNECT message from the remote peer
var msg pb.HolePunch
if err := rd.ReadMsg(&msg); err != nil {
return nil, 0, fmt.Errorf("failed to read CONNECT message from remote peer: %w", err)
return nil, nil, 0, fmt.Errorf("failed to read CONNECT message from remote peer: %w", err)
}
rtt := time.Since(start)
if t := msg.GetType(); t != pb.HolePunch_CONNECT {
return nil, 0, fmt.Errorf("expect CONNECT message, got %s", t)
return nil, nil, 0, fmt.Errorf("expect CONNECT message, got %s", t)
}

addrs := removeRelayAddrs(addrsFromBytes(msg.ObsAddrs))
Expand All @@ -239,13 +242,13 @@ func (hp *holePuncher) initiateHolePunchImpl(str network.Stream) ([]ma.Multiaddr
}

if len(addrs) == 0 {
return nil, 0, errors.New("didn't receive any public addresses in CONNECT")
return nil, nil, 0, errors.New("didn't receive any public addresses in CONNECT")
}

if err := w.WriteMsg(&pb.HolePunch{Type: pb.HolePunch_SYNC.Enum()}); err != nil {
return nil, 0, fmt.Errorf("failed to send SYNC message for hole punching: %w", err)
return nil, nil, 0, fmt.Errorf("failed to send SYNC message for hole punching: %w", err)
}
return addrs, rtt, nil
return addrs, obsAddrs, rtt, nil
}

func (hp *holePuncher) Close() error {
Expand Down
121 changes: 121 additions & 0 deletions p2p/protocol/holepunch/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package holepunch

import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/p2p/metricshelper"
ma "github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus"
)

const metricNamespace = "libp2p_holepunch"

var (
directDialsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "direct_dials_total",
Help: "Direct Dials Total",
},
[]string{"outcome"},
)
holePunchOutcomesTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "hole_punch_outcomes_total",
Help: "Hole_punch outcomes",
},
[]string{"side", "num_attempts", "ipv", "transport", "outcome"},
)
collectors = []prometheus.Collector{
directDialsTotal,
holePunchOutcomesTotal,
}
)

type MetricsTracer interface {
HolePunchFinished(side string, attemptNum int, theirAddrs []ma.Multiaddr, ourAddr []ma.Multiaddr, directConn network.ConnMultiaddrs)
DirectDialFinished(success bool)
}

type metricsTracer struct{}

var _ MetricsTracer = &metricsTracer{}

type metricsTracerSetting struct {
reg prometheus.Registerer
}

type MetricsTracerOption func(*metricsTracerSetting)

func WithRegisterer(reg prometheus.Registerer) MetricsTracerOption {
return func(s *metricsTracerSetting) {
if reg != nil {
s.reg = reg
}
}
}

func NewMetricsTracer(opts ...MetricsTracerOption) MetricsTracer {
setting := &metricsTracerSetting{reg: prometheus.DefaultRegisterer}
for _, opt := range opts {
opt(setting)
}
metricshelper.RegisterCollectors(setting.reg, collectors...)
return &metricsTracer{}
}

func (mt *metricsTracer) HolePunchFinished(side string, numAttempts int,
theirAddrs []ma.Multiaddr, ourAddrs []ma.Multiaddr, directConn network.ConnMultiaddrs) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

*tags = append(*tags, side, getNumAttemptString(numAttempts))
var dipv, dtransport string
if directConn != nil {
dipv = metricshelper.GetIPVersion(directConn.LocalMultiaddr())
dtransport = metricshelper.GetTransport(directConn.LocalMultiaddr())
}
for _, ta := range theirAddrs {
tipv := metricshelper.GetIPVersion(ta)
ttransport := metricshelper.GetTransport(ta)
for _, oa := range ourAddrs {
oipv := metricshelper.GetIPVersion(oa)
otransport := metricshelper.GetTransport(oa)
if tipv == oipv && ttransport == otransport {
*tags = append(*tags, tipv, ttransport)
if directConn != nil && dipv == tipv && dtransport == ttransport {
*tags = append(*tags, "success")
} else {
*tags = append(*tags, "failed")
}
holePunchOutcomesTotal.WithLabelValues(*tags...).Inc()
*tags = (*tags)[:2]
break
}
}
}
}

func getNumAttemptString(numAttempt int) string {
switch numAttempt {
case 1:
return "1"
case 2:
return "2"
case 3:
return "3"
default:
return ">=4"
}
}

func (mt *metricsTracer) DirectDialFinished(success bool) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)
if success {
*tags = append(*tags, "success")
} else {
*tags = append(*tags, "failed")
}
directDialsTotal.WithLabelValues(*tags...).Inc()
}
Loading

0 comments on commit 2b6edec

Please sign in to comment.