Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of the libp2p introspection server #1

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
.DS_Store

# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib

# Test binary, built with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Build output dir
build

# Dependency directories (remove the comment below to include it)
# vendor/
12 changes: 12 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
module github.com/libp2p/go-libp2p-introspection

go 1.13

require (
github.com/golang/protobuf v1.3.2
github.com/gorilla/mux v1.7.3
github.com/gorilla/websocket v1.4.1
github.com/imdario/mergo v0.3.8
github.com/ipfs/go-log v1.0.1
github.com/pkg/errors v0.9.1
)
35 changes: 35 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/imdario/mergo v0.3.8 h1:CGgOkSJeqMRmt0D9XLWExdT4m4F1vd3FV3VPt+0VxkQ=
github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/ipfs/go-log v1.0.1 h1:5lIEEOQTk/vd1WuPFBRqz2mcp+5G1fMVcW+Ib/H5Hfo=
github.com/ipfs/go-log v1.0.1/go.mod h1:HuWlQttfN6FWNHRhlY5yMk/lW7evQC0HHGOxEwMRR8I=
github.com/ipfs/go-log/v2 v2.0.1 h1:mnR9XFltezAtO8A6tj5U7nKkRzhEQNEw/wT11U2HhPM=
github.com/ipfs/go-log/v2 v2.0.1/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/libp2p/go-libp2p-core v0.3.0 h1:F7PqduvrztDtFsAa/bcheQ3azmNo+Nq7m8hQY5GiUW8=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
89 changes: 89 additions & 0 deletions introspection/default_introspector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package introspection

import (
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/imdario/mergo"
coreit "github.com/libp2p/go-libp2p-core/introspection"
"github.com/pkg/errors"
"sync"
"time"
)

var _ coreit.Introspector = (*DefaultIntrospector)(nil)

// DefaultIntrospector is a registry of subsystem data/metrics providers and also allows
// clients to inspect the system state by calling all the providers registered with it
type DefaultIntrospector struct {
treeMu sync.RWMutex
tree *coreit.ProvidersTree
}

func NewDefaultIntrospector() *DefaultIntrospector {
return &DefaultIntrospector{tree: &coreit.ProvidersTree{}}
}

func (d *DefaultIntrospector) RegisterProviders(provs *coreit.ProvidersTree) error {
d.treeMu.Lock()
defer d.treeMu.Unlock()

if err := mergo.Merge(d.tree, provs); err != nil {
return err
}

return nil
}

func (d *DefaultIntrospector) FetchCurrentState() (*coreit.State, error) {
d.treeMu.RLock()
defer d.treeMu.RUnlock()

s := &coreit.State{}

// subsystems
s.Subsystems = &coreit.Subsystems{}

// version
s.Version = &coreit.Version{Number: coreit.ProtoVersion}

// runtime
// TODO Figure out how & where a runtime provider would be injected
if d.tree.Runtime != nil {
r, err := d.tree.Runtime.Get()
if err != nil {
return nil, errors.Wrap(err, "failed to fetch runtime info")
}
s.Runtime = r
}

// timestamps
s.InstantTs = &timestamp.Timestamp{Seconds: time.Now().Unix()}
// TODO Figure out the other two timestamp fields

// connections
if d.tree.Conn.List != nil {
conns, err := d.tree.Conn.List()
if err != nil {
return nil, errors.Wrap(err, "failed to fetch connection list")
}

s.Subsystems.Connections = conns

// streams
if d.tree.Stream.List != nil {
for _, c := range conns {
s, err := d.tree.Stream.List(coreit.StreamListQuery{
Type: coreit.StreamListQueryTypeConn,
ConnID: coreit.ConnID(c.Id),
})
if err != nil {
return nil, errors.Wrap(err, "failed to fetch stream list")
}

c.Streams = &coreit.StreamList{}
c.Streams.Streams = append(c.Streams.Streams, s...)
}
}
}

return nil, nil
}
102 changes: 102 additions & 0 deletions introspection/ws_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package introspection

import (
"context"
"github.com/golang/protobuf/proto"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
logging "github.com/ipfs/go-log"
coreit "github.com/libp2p/go-libp2p-core/introspection"
"net/http"
"net/http/pprof"
"time"
)

var logger = logging.Logger("introspection-server")
var upgrader = websocket.Upgrader{}

// StartServer starts the ws introspection server with the given introspector
func StartServer(addr string, introspector coreit.Introspector) func() error {
// register handlers on a muxed router
r := mux.NewRouter()

// introspection handler
r.HandleFunc("/introspection", toHttpHandler(introspector))

// Register pprof handlers
r.HandleFunc("/debug/pprof/", pprof.Index)
r.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
r.HandleFunc("/debug/pprof/profile", pprof.Profile)
r.HandleFunc("/debug/pprof/symbol", pprof.Symbol)

r.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine"))
r.Handle("/debug/pprof/heap", pprof.Handler("heap"))
r.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
r.Handle("/debug/pprof/block", pprof.Handler("block"))

// register router with the http handler
http.Handle("/", r)

// start server
serverInstance := http.Server{
Addr: addr,
}

// start server
go func() {
if err := serverInstance.ListenAndServe(); err != http.ErrServerClosed {
logger.Errorf("failed to start server, err=%s", err)
}
}()

logger.Infof("server starting, listening on %s", addr)

return func() error {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
return serverInstance.Shutdown(shutdownCtx)
}
}

func toHttpHandler(introspector coreit.Introspector) http.HandlerFunc {
return func(w http.ResponseWriter, rq *http.Request) {
upgrader.CheckOrigin = func(rq *http.Request) bool { return true }
wsConn, err := upgrader.Upgrade(w, rq, nil)
if err != nil {
logger.Errorf("upgrade to websocket failed, err=%s", err)
return
}
defer wsConn.Close()

for {
// TODO : Do we need a read timeout here ? -> probably not.
// wait for server to ask for the state
mt, message, err := wsConn.ReadMessage()
if err != nil {
logger.Errorf("failed to read message from ws connection, err=%s", err)
return
}
logger.Debugf("received message from ws connection, type: %d. recv: %s", mt, message)

// fetch the current state & marshal to bytes
state, err := introspector.FetchCurrentState()
if err != nil {
logger.Errorf("failed to fetch current state in introspector, err=%s", err)
return
}

bz, err := proto.Marshal(state)
if err != nil {
logger.Errorf("failed to marshal introspector state, err=%s", err)
return
}

// send the response
wsConn.SetWriteDeadline(time.Now().Add(5 * time.Second))
if err = wsConn.WriteMessage(websocket.BinaryMessage, bz); err != nil {
logger.Errorf("failed to write response to ws connection, err=%s", err)
return
}
}
}
}