Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

holepunch: add metrics #2246

Merged
merged 8 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,126 changes: 1,126 additions & 0 deletions dashboards/holepunch/holepunch.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ require (
github.com/multiformats/go-varint v0.0.7
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/quic-go/quic-go v0.33.0
github.com/quic-go/webtransport-go v0.5.2
github.com/raulk/go-watchdog v1.3.0
Expand Down Expand Up @@ -97,7 +98,6 @@ require (
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
}

if opts.EnableHolePunching {
if opts.EnableMetrics {
hpOpts := []holepunch.Option{
holepunch.WithMetricsTracer(holepunch.NewMetricsTracer(holepunch.WithRegisterer(opts.PrometheusRegisterer)))}
opts.HolePunchingOptions = append(hpOpts, opts.HolePunchingOptions...)

}
h.hps, err = holepunch.NewService(h, h.ids, opts.HolePunchingOptions...)
if err != nil {
return nil, fmt.Errorf("failed to create hole punch service: %w", err)
Expand Down
29 changes: 29 additions & 0 deletions p2p/metricshelper/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package metricshelper

import ma "github.com/multiformats/go-multiaddr"

var transports = [...]int{ma.P_CIRCUIT, ma.P_WEBRTC, ma.P_WEBTRANSPORT, ma.P_QUIC, ma.P_QUIC_V1, ma.P_WSS, ma.P_WS, ma.P_TCP}

func GetTransport(a ma.Multiaddr) string {
for _, t := range transports {
if _, err := a.ValueForProtocol(t); err == nil {
return ma.ProtocolWithCode(t).Name
}
}
return "other"
}

func GetIPVersion(addr ma.Multiaddr) string {
version := "unknown"
ma.ForEach(addr, func(c ma.Component) bool {
if c.Protocol().Code == ma.P_IP4 {
version = "ip4"
return false
} else if c.Protocol().Code == ma.P_IP6 {
version = "ip6"
return false
}
return true
})
return version
}
32 changes: 5 additions & 27 deletions p2p/net/swarm/swarm_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,28 +133,13 @@ func appendConnectionState(tags []string, cs network.ConnectionState) []string {
return tags
}

func getIPVersion(addr ma.Multiaddr) string {
version := "unknown"
ma.ForEach(addr, func(c ma.Component) bool {
if c.Protocol().Code == ma.P_IP4 {
version = "ip4"
return false
} else if c.Protocol().Code == ma.P_IP6 {
version = "ip6"
return false
}
return true
})
return version
}

func (m *metricsTracer) OpenedConnection(dir network.Direction, p crypto.PubKey, cs network.ConnectionState, laddr ma.Multiaddr) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

*tags = append(*tags, metricshelper.GetDirection(dir))
*tags = appendConnectionState(*tags, cs)
*tags = append(*tags, getIPVersion(laddr))
*tags = append(*tags, metricshelper.GetIPVersion(laddr))
connsOpened.WithLabelValues(*tags...).Inc()

*tags = (*tags)[:0]
Expand All @@ -169,7 +154,7 @@ func (m *metricsTracer) ClosedConnection(dir network.Direction, duration time.Du

*tags = append(*tags, metricshelper.GetDirection(dir))
*tags = appendConnectionState(*tags, cs)
*tags = append(*tags, getIPVersion(laddr))
*tags = append(*tags, metricshelper.GetIPVersion(laddr))
connsClosed.WithLabelValues(*tags...).Inc()
connDuration.WithLabelValues(*tags...).Observe(duration.Seconds())
}
Expand All @@ -179,19 +164,12 @@ func (m *metricsTracer) CompletedHandshake(t time.Duration, cs network.Connectio
defer metricshelper.PutStringSlice(tags)

*tags = appendConnectionState(*tags, cs)
*tags = append(*tags, getIPVersion(laddr))
*tags = append(*tags, metricshelper.GetIPVersion(laddr))
connHandshakeLatency.WithLabelValues(*tags...).Observe(t.Seconds())
}

var transports = [...]int{ma.P_CIRCUIT, ma.P_WEBRTC, ma.P_WEBTRANSPORT, ma.P_QUIC, ma.P_QUIC_V1, ma.P_WSS, ma.P_WS, ma.P_TCP}

func (m *metricsTracer) FailedDialing(addr ma.Multiaddr, err error) {
var transport string
for _, t := range transports {
if _, err := addr.ValueForProtocol(t); err == nil {
transport = ma.ProtocolWithCode(t).Name
}
}
transport := metricshelper.GetTransport(addr)
e := "other"
if errors.Is(err, context.Canceled) {
e = "canceled"
Expand All @@ -210,6 +188,6 @@ func (m *metricsTracer) FailedDialing(addr ma.Multiaddr, err error) {
defer metricshelper.PutStringSlice(tags)

*tags = append(*tags, transport, e)
*tags = append(*tags, getIPVersion(addr))
*tags = append(*tags, metricshelper.GetIPVersion(addr))
dialError.WithLabelValues(*tags...).Inc()
}
44 changes: 23 additions & 21 deletions p2p/protocol/holepunch/holepuncher.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,8 @@ func (hp *holePuncher) DirectConnect(p peer.ID) error {

func (hp *holePuncher) directConnect(rp peer.ID) error {
// short-circuit check to see if we already have a direct connection
for _, c := range hp.host.Network().ConnsToPeer(rp) {
if !isRelayAddress(c.RemoteMultiaddr()) {
return nil
}
if getDirectConnection(hp.host, rp) != nil {
return nil
}

// short-circuit hole punching if a direct dial works.
Expand Down Expand Up @@ -133,8 +131,8 @@ func (hp *holePuncher) directConnect(rp peer.ID) error {
log.Debugw("got inbound proxy conn", "peer", rp)

// hole punch
for i := 0; i < maxRetries; i++ {
addrs, rtt, err := hp.initiateHolePunch(rp)
for i := 1; i <= maxRetries; i++ {
addrs, obsAddrs, rtt, err := hp.initiateHolePunch(rp)
if err != nil {
log.Debugw("hole punching failed", "peer", rp, "error", err)
hp.tracer.ProtocolError(rp, err)
Expand All @@ -159,44 +157,48 @@ func (hp *holePuncher) directConnect(rp peer.ID) error {
hp.tracer.EndHolePunch(rp, dt, err)
if err == nil {
log.Debugw("hole punching with successful", "peer", rp, "time", dt)
hp.tracer.HolePunchFinished("initiator", i, addrs, obsAddrs, getDirectConnection(hp.host, rp))
return nil
}
case <-hp.ctx.Done():
timer.Stop()
return hp.ctx.Err()
}
if i == maxRetries {
hp.tracer.HolePunchFinished("initiator", maxRetries, addrs, obsAddrs, nil)
}
}
return fmt.Errorf("all retries for hole punch with peer %s failed", rp)
}

// initiateHolePunch opens a new hole punching coordination stream,
// exchanges the addresses and measures the RTT.
func (hp *holePuncher) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, time.Duration, error) {
func (hp *holePuncher) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, []ma.Multiaddr, time.Duration, error) {
hpCtx := network.WithUseTransient(hp.ctx, "hole-punch")
sCtx := network.WithNoDial(hpCtx, "hole-punch")

str, err := hp.host.NewStream(sCtx, rp, Protocol)
if err != nil {
return nil, 0, fmt.Errorf("failed to open hole-punching stream: %w", err)
return nil, nil, 0, fmt.Errorf("failed to open hole-punching stream: %w", err)
}
defer str.Close()

addr, rtt, err := hp.initiateHolePunchImpl(str)
addr, obsAddr, rtt, err := hp.initiateHolePunchImpl(str)
if err != nil {
log.Debugf("%s", err)
str.Reset()
return addr, rtt, err
return addr, obsAddr, rtt, err
}
return addr, rtt, err
return addr, obsAddr, rtt, err
}

func (hp *holePuncher) initiateHolePunchImpl(str network.Stream) ([]ma.Multiaddr, time.Duration, error) {
func (hp *holePuncher) initiateHolePunchImpl(str network.Stream) ([]ma.Multiaddr, []ma.Multiaddr, time.Duration, error) {
if err := str.Scope().SetService(ServiceName); err != nil {
return nil, 0, fmt.Errorf("error attaching stream to holepunch service: %s", err)
return nil, nil, 0, fmt.Errorf("error attaching stream to holepunch service: %s", err)
}

if err := str.Scope().ReserveMemory(maxMsgSize, network.ReservationPriorityAlways); err != nil {
return nil, 0, fmt.Errorf("error reserving memory for stream: %s", err)
return nil, nil, 0, fmt.Errorf("error reserving memory for stream: %s", err)
}
defer str.Scope().ReleaseMemory(maxMsgSize)

Expand All @@ -211,7 +213,7 @@ func (hp *holePuncher) initiateHolePunchImpl(str network.Stream) ([]ma.Multiaddr
obsAddrs = hp.filter.FilterLocal(str.Conn().RemotePeer(), obsAddrs)
}
if len(obsAddrs) == 0 {
return nil, 0, errors.New("aborting hole punch initiation as we have no public address")
return nil, nil, 0, errors.New("aborting hole punch initiation as we have no public address")
}

start := time.Now()
Expand All @@ -220,17 +222,17 @@ func (hp *holePuncher) initiateHolePunchImpl(str network.Stream) ([]ma.Multiaddr
ObsAddrs: addrsToBytes(obsAddrs),
}); err != nil {
str.Reset()
return nil, 0, err
return nil, nil, 0, err
}

// wait for a CONNECT message from the remote peer
var msg pb.HolePunch
if err := rd.ReadMsg(&msg); err != nil {
return nil, 0, fmt.Errorf("failed to read CONNECT message from remote peer: %w", err)
return nil, nil, 0, fmt.Errorf("failed to read CONNECT message from remote peer: %w", err)
}
rtt := time.Since(start)
if t := msg.GetType(); t != pb.HolePunch_CONNECT {
return nil, 0, fmt.Errorf("expect CONNECT message, got %s", t)
return nil, nil, 0, fmt.Errorf("expect CONNECT message, got %s", t)
}

addrs := removeRelayAddrs(addrsFromBytes(msg.ObsAddrs))
Expand All @@ -239,13 +241,13 @@ func (hp *holePuncher) initiateHolePunchImpl(str network.Stream) ([]ma.Multiaddr
}

if len(addrs) == 0 {
return nil, 0, errors.New("didn't receive any public addresses in CONNECT")
return nil, nil, 0, errors.New("didn't receive any public addresses in CONNECT")
}

if err := w.WriteMsg(&pb.HolePunch{Type: pb.HolePunch_SYNC.Enum()}); err != nil {
return nil, 0, fmt.Errorf("failed to send SYNC message for hole punching: %w", err)
return nil, nil, 0, fmt.Errorf("failed to send SYNC message for hole punching: %w", err)
}
return addrs, rtt, nil
return addrs, obsAddrs, rtt, nil
}

func (hp *holePuncher) Close() error {
Expand Down
Loading