Skip to content

Commit

Permalink
swarm: introduce a MetricsTracer interface
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Jan 7, 2023
1 parent 45eb893 commit 28cb275
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 34 deletions.
11 changes: 7 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type Config struct {
HolePunchingOptions []holepunch.Option
}

func (cfg *Config) makeSwarm() (*swarm.Swarm, error) {
func (cfg *Config) makeSwarm(enableMetrics bool) (*swarm.Swarm, error) {
if cfg.Peerstore == nil {
return nil, fmt.Errorf("no peerstore specified")
}
Expand Down Expand Up @@ -151,7 +151,7 @@ func (cfg *Config) makeSwarm() (*swarm.Swarm, error) {
return nil, err
}

opts := make([]swarm.Option, 0, 3)
opts := make([]swarm.Option, 0, 6)
if cfg.Reporter != nil {
opts = append(opts, swarm.WithMetrics(cfg.Reporter))
}
Expand All @@ -167,6 +167,9 @@ func (cfg *Config) makeSwarm() (*swarm.Swarm, error) {
if cfg.MultiaddrResolver != nil {
opts = append(opts, swarm.WithMultiaddrResolver(cfg.MultiaddrResolver))
}
if enableMetrics {
opts = append(opts, swarm.WithMetricsTracer(swarm.NewMetricsTracer()))
}
// TODO: Make the swarm implementation configurable.
return swarm.NewSwarm(pid, cfg.Peerstore, opts...)
}
Expand Down Expand Up @@ -276,7 +279,7 @@ func (cfg *Config) addTransports(h host.Host) error {
//
// This function consumes the config. Do not reuse it (really!).
func (cfg *Config) NewNode() (host.Host, error) {
swrm, err := cfg.makeSwarm()
swrm, err := cfg.makeSwarm(true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -382,7 +385,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
Peerstore: ps,
}

dialer, err := autoNatCfg.makeSwarm()
dialer, err := autoNatCfg.makeSwarm(false)
if err != nil {
h.Close()
return nil, err
Expand Down
10 changes: 9 additions & 1 deletion p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ func WithMetrics(reporter metrics.Reporter) Option {
}
}

func WithMetricsTracer(t MetricsTracer) Option {
return func(s *Swarm) error {
s.metricsTracer = t
return nil
}
}

func WithDialTimeout(t time.Duration) Option {
return func(s *Swarm) error {
s.dialTimeout = t
Expand Down Expand Up @@ -151,7 +158,8 @@ type Swarm struct {
ctx context.Context // is canceled when Close is called
ctxCancel context.CancelFunc

bwc metrics.Reporter
bwc metrics.Reporter
metricsTracer MetricsTracer
}

// NewSwarm constructs a Swarm.
Expand Down
5 changes: 3 additions & 2 deletions p2p/net/swarm/swarm_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ func (c *Conn) Close() error {
}

func (c *Conn) doClose() {
recordConnectionClosed(c.stat.Direction, c.ConnState())
recordConnectionDuration(c.stat.Direction, time.Since(c.stat.Stats.Opened), c.ConnState())
if c.swarm.metricsTracer != nil {
c.swarm.metricsTracer.ClosedConnection(c.stat.Direction, time.Since(c.stat.Stats.Opened), c.ConnState())
}

c.swarm.removeConn(c)

Expand Down
11 changes: 8 additions & 3 deletions p2p/net/swarm/swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,12 +493,17 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (tra
start := time.Now()
connC, err := tpt.Dial(ctx, addr, p)
if err != nil {
recordDialFailed(addr, err)
if s.metricsTracer != nil {
s.metricsTracer.FailedDialing(addr, err)
}
return nil, err
}
canonicallog.LogPeerStatus(100, connC.RemotePeer(), connC.RemoteMultiaddr(), "connection_status", "established", "dir", "outbound")
recordConnectionOpened(network.DirOutbound, connC.RemotePublicKey(), connC.ConnState())
recordHandshakeLatency(time.Since(start), connC.ConnState())
if s.metricsTracer != nil {
connState := connC.ConnState()
s.metricsTracer.OpenedConnection(network.DirOutbound, connC.RemotePublicKey(), connState)
s.metricsTracer.CompletedHandshake(time.Since(start), connState)
}

// Trust the transport? Yeah... right.
if connC.RemotePeer() != p {
Expand Down
4 changes: 3 additions & 1 deletion p2p/net/swarm/swarm_listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
return
}
canonicallog.LogPeerStatus(100, c.RemotePeer(), c.RemoteMultiaddr(), "connection_status", "established", "dir", "inbound")
recordConnectionOpened(network.DirInbound, c.RemotePublicKey(), c.ConnState())
if s.metricsTracer != nil {
s.metricsTracer.OpenedConnection(network.DirInbound, c.RemotePublicKey(), c.ConnState())
}

log.Debugf("swarm listener accepted connection: %s <-> %s", c.LocalMultiaddr(), c.RemoteMultiaddr())
s.refs.Add(1)
Expand Down
46 changes: 25 additions & 21 deletions p2p/net/swarm/swarm_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package swarm
import (
"context"
"errors"
"fmt"
"net"
"strings"
"sync"
Expand Down Expand Up @@ -66,10 +65,28 @@ var (
)
)

func init() {
var initMetricsOnce sync.Once

func initMetrics() {
prometheus.MustRegister(connsOpened, keyTypes, connsClosed, dialError, connDuration, connHandshakeLatency)
}

type MetricsTracer interface {
OpenedConnection(network.Direction, crypto.PubKey, network.ConnectionState)
ClosedConnection(network.Direction, time.Duration, network.ConnectionState)
CompletedHandshake(time.Duration, network.ConnectionState)
FailedDialing(ma.Multiaddr, error)
}

type metricsTracer struct{}

var _ MetricsTracer = &metricsTracer{}

func NewMetricsTracer() *metricsTracer {
initMetricsOnce.Do(initMetrics)
return &metricsTracer{}
}

var stringPool = sync.Pool{New: func() any {
s := make([]string, 0, 8)
return &s
Expand Down Expand Up @@ -110,7 +127,7 @@ func appendConnectionState(tags []string, cs network.ConnectionState) []string {
return tags
}

func recordConnectionOpened(dir network.Direction, p crypto.PubKey, cs network.ConnectionState) {
func (m *metricsTracer) OpenedConnection(dir network.Direction, p crypto.PubKey, cs network.ConnectionState) {
tags := getStringSlice()
defer putStringSlice(tags)

Expand All @@ -124,37 +141,27 @@ func recordConnectionOpened(dir network.Direction, p crypto.PubKey, cs network.C
keyTypes.WithLabelValues(*tags...).Inc()
}

func recordConnectionClosed(dir network.Direction, cs network.ConnectionState) {
func (m *metricsTracer) ClosedConnection(dir network.Direction, duration time.Duration, cs network.ConnectionState) {
tags := getStringSlice()
defer putStringSlice(tags)
*tags = append(*tags, getDirection(dir))
*tags = appendConnectionState(*tags, cs)
connsClosed.WithLabelValues(*tags...).Inc()
}

func recordConnectionDuration(dir network.Direction, t time.Duration, cs network.ConnectionState) {
tags := getStringSlice()
defer putStringSlice(tags)
*tags = (*tags)[:0]
*tags = append(*tags, getDirection(dir))
*tags = appendConnectionState(*tags, cs)
connDuration.WithLabelValues(*tags...).Observe(t.Seconds())
connDuration.WithLabelValues(*tags...).Observe(duration.Seconds())
}

func recordHandshakeLatency(t time.Duration, cs network.ConnectionState) {
func (m *metricsTracer) CompletedHandshake(t time.Duration, cs network.ConnectionState) {
tags := getStringSlice()
defer putStringSlice(tags)
*tags = appendConnectionState(*tags, cs)
connHandshakeLatency.WithLabelValues(*tags...).Observe(t.Seconds())
}

func recordDialFailed(addr ma.Multiaddr, err error) {
var transport string
for _, p := range transports {
if _, err := addr.ValueForProtocol(p); err == nil {
transport = ma.ProtocolWithCode(p).Name
break
}
}
func (m *metricsTracer) FailedDialing(_ ma.Multiaddr, err error) {
e := "other"
if errors.Is(err, context.Canceled) {
e = "canceled"
Expand All @@ -168,8 +175,5 @@ func recordDialFailed(addr ma.Multiaddr, err error) {
e = "connection refused"
}
}
if e == "other" {
fmt.Printf("transport: %s, category: %s (orig: %s)\n", transport, e, err)
}
dialError.WithLabelValues(e).Inc()
}
5 changes: 3 additions & 2 deletions p2p/net/swarm/swarm_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ func BenchmarkMetricsConnOpen(b *testing.B) {
}
_, pub, err := crypto.GenerateEd25519Key(rand.Reader)
require.NoError(b, err)
tr := NewMetricsTracer()
for i := 0; i < b.N; i++ {
switch i % 2 {
case 0:
recordConnectionOpened(network.DirInbound, pub, quicConnState)
tr.OpenedConnection(network.DirInbound, pub, quicConnState)
case 1:
recordConnectionOpened(network.DirInbound, pub, tcpConnState)
tr.OpenedConnection(network.DirInbound, pub, tcpConnState)
}
}
}

0 comments on commit 28cb275

Please sign in to comment.