Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BasicHost changes for introspection #774

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/introspect"
"github.com/libp2p/go-libp2p-core/metrics"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -77,6 +78,8 @@ type Config struct {

EnableAutoRelay bool
StaticRelays []peer.AddrInfo

Introspector introspect.Introspector
}

// NewNode constructs a new libp2p Host from the Config.
Expand Down Expand Up @@ -115,7 +118,7 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) {
}

// TODO: Make the swarm implementation configurable.
swrm := swarm.NewSwarm(ctx, pid, cfg.Peerstore, cfg.Reporter)
swrm := swarm.NewSwarm(ctx, pid, cfg.Peerstore, cfg.Reporter, cfg.Introspector)
if cfg.Filters != nil {
swrm.Filters = cfg.Filters
}
Expand All @@ -126,6 +129,7 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) {
NATManager: cfg.NATManager,
EnablePing: !cfg.DisablePing,
UserAgent: cfg.UserAgent,
Introspector: cfg.Introspector,
})

if err != nil {
Expand Down Expand Up @@ -242,6 +246,7 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) {
if router != nil {
return routed.Wrap(h, router), nil
}

return h, nil
}

Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J6rWBEfz/aiY=
github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg=
github.com/libp2p/go-libp2p-introspection v0.0.0-20200123110920-ee44b5738886 h1:aCvSYZQ487Xt/ELd6QM4J/Qws5jGy0DmrHwJIUMGi40=
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90=
github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo=
Expand Down
190 changes: 190 additions & 0 deletions introspect_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package libp2p

import (
"context"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please group imports in groups, in this order:

  1. one group for stdlib
  2. one group for go-libp2p-core
  3. other go-libp2p imports
  4. 3rd party libs

"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
"github.com/libp2p/go-libp2p-core/introspect"
"github.com/libp2p/go-libp2p-core/metrics"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-introspection/introspection"
"github.com/stretchr/testify/require"
"net/url"
"runtime"
"sync"
"testing"
"time"
)

func TestIntrospector(t *testing.T) {
msg1 := []byte("1")
msg2 := []byte("12")
msg3 := []byte("111")
msg4 := []byte("0000")

iaddr := "0.0.0.0:9999"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests should not listen on 0.0.0.0 (that's insecure) nor on a fixed port (it might be occupied). Instead, we should listen on 127.0.0.1:0, and allow the listener to allocate a port. ListenAddrs() should then return the actual addresses, not the user-supplied ones.

ctx := context.Background()

// create host 1 with introspector
h1, err := New(ctx, Introspector(introspection.NewDefaultIntrospector(iaddr)), BandwidthReporter(metrics.NewBandwidthCounter()))
require.NoError(t, err)
defer h1.Close()

// create host 2
h2, err := New(ctx)
defer h2.Close()

// create host 3
h3, err := New(ctx)
defer h3.Close()

// host1 -> CONNECTS -> host2
require.NoError(t, h1.Connect(ctx, peer.AddrInfo{h2.ID(), h2.Addrs()}))

// host3 -> CONNECTS -> host1
require.NoError(t, h3.Connect(ctx, peer.AddrInfo{h1.ID(), h1.Addrs()}))

// host1 -> OPENS STREAM 1 -> host3, Writes a message & then reads the response
var wg sync.WaitGroup
p1 := protocol.ID("h1h3")
h3.SetStreamHandler(p1, func(s network.Stream) {
bz := make([]byte, len(msg1))
_, err := s.Read(bz)
require.NoError(t, err)
_, err = s.Write(msg2)
require.NoError(t, err)
wg.Done()
})
s1, err := h1.NewStream(ctx, h3.ID(), p1)
require.NoError(t, err)
wg.Add(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is racy. It's possible for wg.Done() to get called before the the wg.Add(1). Move the Add to before opening the stream to remove the race.

_, err = s1.Write(msg1)
require.NoError(t, err)
bz1 := make([]byte, len(msg2))
wg.Wait()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need this wait at all. You can use io.ReadFull instead.

_, err = s1.Read(bz1)
require.NoError(t, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do require := require.New(t) at the top, and omit the first t parameter from all assertions.


// host2 -> OPENS Stream 2 -> host1 , writes a message & reads the response
p2 := protocol.ID("h2h1")
h1.SetStreamHandler(p2, func(s network.Stream) {
bz := make([]byte, len(msg3))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use buf. It shadows previous usages, and it's idiomatic.

_, err := s.Read(bz)
require.NoError(t, err)
_, err = s.Write(msg4)
require.NoError(t, err)
wg.Done()
})

s2, err := h2.NewStream(ctx, h1.ID(), p2)
require.NoError(t, err)
wg.Add(1)
_, err = s2.Write(msg3)
require.NoError(t, err)
bz2 := make([]byte, len(msg4))
wg.Wait()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for waitgroups. This code is racy, and we can use io.ReadFull instead.

_, err = s2.Read(bz2)
require.NoError(t, err)

// call introspection server & fetch state
u := url.URL{Scheme: "ws", Host: iaddr, Path: "/introspect"}

// wait till connection is established
i := 0
var connection *websocket.Conn
for {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this loop declaration can be made more elegant, by moving the init, test, increment up here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

require.Less(t, i, 5, "failed to start server even after 5 attempts")
connection, _, err = websocket.DefaultDialer.Dial(u.String(), nil)
if err == nil {
break
}
i++
time.Sleep(500 * time.Millisecond)
}
defer connection.Close()

// fetch & unmarshal h1 state till ALL BANDWIDTH METERES kick in
var state *introspect.State
for {
require.NoError(t, connection.WriteMessage(websocket.TextMessage, []byte("trigger fetch")))
// read snapshot
_, msg, err := connection.ReadMessage()
require.NoError(t, err)
require.NotEmpty(t, msg)

state = &introspect.State{}
require.NoError(t, proto.Unmarshal(msg, state))
if state.Traffic.TrafficOut.CumBytes != 0 &&
state.Subsystems.Connections[0].Traffic.TrafficOut.CumBytes != 0 && state.Subsystems.Connections[1].Traffic.TrafficOut.CumBytes != 0 {
break
}
}

// Assert State

// Version
require.Equal(t, introspect.ProtoVersion, state.Version.Number)

// Runtime
require.Equal(t, h1.ID().String(), state.Runtime.PeerId)
require.Equal(t, runtime.GOOS, state.Runtime.Platform)
require.Equal(t, "go-libp2p", state.Runtime.Implementation)

// Overall Traffic
require.Greater(t, state.Traffic.TrafficIn.CumBytes, uint64(100))
require.Greater(t, state.Traffic.TrafficOut.CumBytes, uint64(100))

// Connections
conns := state.Subsystems.Connections
peerIdToConns := make(map[string]*introspect.Connection)
for _, c := range conns {
peerIdToConns[c.PeerId] = c
}
require.Len(t, peerIdToConns, 2)

pconn := make(map[string]network.Conn)
for _, c := range h1.Network().Conns() {
pconn[c.RemotePeer().String()] = c
}
require.Len(t, pconn, 2)

// host1 -> host2 connection
h2Conn := peerIdToConns[h2.ID().String()]
require.NotEmpty(t, h2Conn.Id)
require.Equal(t, introspect.Status_ACTIVE, h2Conn.Status)
require.Equal(t, pconn[h2.ID().String()].LocalMultiaddr().String(), h2Conn.Endpoints.SrcMultiaddr)
require.Equal(t, pconn[h2.ID().String()].RemoteMultiaddr().String(), h2Conn.Endpoints.DstMultiaddr)
require.Equal(t, introspect.Role_INITIATOR, h2Conn.Role)
require.Equal(t, uint64(len(msg3)), h2Conn.Traffic.TrafficIn.CumBytes)

// host3 -> host1 connection
h3Conn := peerIdToConns[h3.ID().String()]
require.NotEmpty(t, h3Conn.Id)
require.Equal(t, introspect.Status_ACTIVE, h3Conn.Status)
require.Equal(t, pconn[h3.ID().String()].LocalMultiaddr().String(), h3Conn.Endpoints.SrcMultiaddr)
require.Equal(t, pconn[h3.ID().String()].RemoteMultiaddr().String(), h3Conn.Endpoints.DstMultiaddr)
require.Equal(t, introspect.Role_RESPONDER, h3Conn.Role)
require.Equal(t, uint64(len(msg2)), h3Conn.Traffic.TrafficIn.CumBytes)
require.Equal(t, uint64(len(msg1)), h3Conn.Traffic.TrafficOut.CumBytes)

// stream1
require.Len(t, h3Conn.Streams.Streams, 1)
h3Stream := h3Conn.Streams.Streams[0]
require.NotEmpty(t, h3Stream.Id)
require.Equal(t, string(p1), h3Stream.Protocol)
require.Equal(t, introspect.Role_INITIATOR, h3Stream.Role)
require.Equal(t, introspect.Status_ACTIVE, h3Stream.Status)
require.True(t, len(msg1) == int(h3Stream.Traffic.TrafficOut.CumBytes))
require.True(t, len(msg2) == int(h3Stream.Traffic.TrafficIn.CumBytes))

// stream 2
require.Len(t, h2Conn.Streams.Streams, 1)
h1Stream := h2Conn.Streams.Streams[0]
require.NotEmpty(t, h1Stream.Id)
require.Equal(t, string(p2), h1Stream.Protocol)
require.Equal(t, introspect.Role_RESPONDER, h1Stream.Role)
require.Equal(t, introspect.Status_ACTIVE, h1Stream.Status)
require.True(t, len(msg3) == int(h1Stream.Traffic.TrafficIn.CumBytes))
}
12 changes: 12 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/introspect"
"github.com/libp2p/go-libp2p-core/metrics"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
Expand Down Expand Up @@ -193,6 +194,17 @@ func ConnectionManager(connman connmgr.ConnManager) Option {
}
}

// Introspector configures the host to use the given introspector
func Introspector(i introspect.Introspector) Option {
return func(cfg *Config) error {
if cfg.Introspector != nil {
return fmt.Errorf("cannot specify multiple introspectors")
}
cfg.Introspector = i
return nil
}
}

// AddrsFactory configures libp2p to use the given address factory.
func AddrsFactory(factory config.AddrsFactory) Option {
return func(cfg *Config) error {
Expand Down
Loading