Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing support #227

Merged
merged 25 commits into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
67275a6
tracing scaffolding
vyzo Nov 4, 2019
89c7ed4
trace publish
vyzo Nov 4, 2019
fd73973
add tracing to floodsub/randomsub
vyzo Nov 4, 2019
958e09a
remove useless nil check when initializing subsystem tracers
vyzo Nov 4, 2019
fb11aa9
initialize tracer with peer ID, trace RPC from join/leave announcements
vyzo Nov 11, 2019
0a25f24
trace event protobuf
vyzo Nov 11, 2019
040cabe
some minor fixes in trace pb
vyzo Nov 11, 2019
151ec25
implement tracing details
vyzo Nov 11, 2019
ae0fcc6
add traces for send/drop rpc
vyzo Nov 11, 2019
3f30acd
track topics in message tracing
vyzo Nov 11, 2019
3545acf
json tracer
vyzo Nov 12, 2019
8ff321c
move tracer implementation to its own file
vyzo Nov 12, 2019
f134d65
add protobuf file tracer
vyzo Nov 12, 2019
0aa629c
use *pb.TraceEvent as argument for Trace in the EventTracer interface
vyzo Nov 13, 2019
57ea27e
remote tracer
vyzo Nov 14, 2019
2fc5518
remote tracer: wait a second to accumulate batches
vyzo Nov 14, 2019
db8e219
remove CompressedTraceEventBatch from trace protobuf
vyzo Nov 18, 2019
abe4763
compress entire stream in remote tracer
vyzo Nov 18, 2019
cce30a4
reset remote tracer stream on errors
vyzo Nov 18, 2019
91527e2
lossy tracing for remote tracers
vyzo Nov 18, 2019
24a1181
move niling of trace buffer to the end
vyzo Nov 18, 2019
7a5aaa8
don't blanket wait for 1s to accumulate a batch.
vyzo Nov 18, 2019
40e5a49
store the remote trace peer address in the peerstore
vyzo Nov 18, 2019
cd7f42e
make tracer.Close safer
vyzo Nov 18, 2019
7065297
nits and beauty
vyzo Nov 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions floodsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, err
type FloodSubRouter struct {
p *PubSub
protocols []protocol.ID
tracer *pubsubTracer
}

func (fs *FloodSubRouter) Protocols() []protocol.ID {
Expand All @@ -39,11 +40,16 @@ func (fs *FloodSubRouter) Protocols() []protocol.ID {

func (fs *FloodSubRouter) Attach(p *PubSub) {
fs.p = p
fs.tracer = p.tracer
}

func (fs *FloodSubRouter) AddPeer(peer.ID, protocol.ID) {}
func (fs *FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
fs.tracer.AddPeer(p, proto)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in the router or in the part that calls AddPeer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have it in the control substrate, it just felt more natural to do it in the router.
Other than that there is no particular reason for the choice.

}

func (fs *FloodSubRouter) RemovePeer(peer.ID) {}
func (fs *FloodSubRouter) RemovePeer(p peer.ID) {
fs.tracer.RemovePeer(p)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, it could be in either place, just felt more natural to do it in the router.

}

func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool {
// check all peers in the topic
Expand Down Expand Up @@ -91,13 +97,19 @@ func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) {

select {
case mch <- out:
fs.tracer.SendRPC(out, pid)
default:
log.Infof("dropping message to peer %s: queue full", pid)
fs.tracer.DropRPC(out, pid)
// Drop it. The peer is too slow.
}
}
}

func (fs *FloodSubRouter) Join(topic string) {}
func (fs *FloodSubRouter) Join(topic string) {
fs.tracer.Join(topic)
}

func (fs *FloodSubRouter) Leave(topic string) {}
func (fs *FloodSubRouter) Leave(topic string) {
fs.tracer.Join(topic)
}
14 changes: 14 additions & 0 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type GossipSubRouter struct {
gossip map[peer.ID][]*pb.ControlIHave // pending gossip
control map[peer.ID]*pb.ControlMessage // pending control messages
mcache *MessageCache
tracer *pubsubTracer
}

func (gs *GossipSubRouter) Protocols() []protocol.ID {
Expand All @@ -73,16 +74,19 @@ func (gs *GossipSubRouter) Protocols() []protocol.ID {

func (gs *GossipSubRouter) Attach(p *PubSub) {
gs.p = p
gs.tracer = p.tracer
go gs.heartbeatTimer()
}

func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
log.Debugf("PEERUP: Add new peer %s using %s", p, proto)
gs.tracer.AddPeer(p, proto)
gs.peers[p] = proto
}

func (gs *GossipSubRouter) RemovePeer(p peer.ID) {
log.Debugf("PEERDOWN: Remove disconnected peer %s", p)
gs.tracer.RemovePeer(p)
delete(gs.peers, p)
for _, peers := range gs.mesh {
delete(peers, p)
Expand Down Expand Up @@ -208,6 +212,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
prune = append(prune, topic)
} else {
log.Debugf("GRAFT: Add mesh link from %s in %s", p, topic)
gs.tracer.Graft(p, topic)
peers[p] = struct{}{}
gs.tagPeer(p, topic)
}
Expand All @@ -231,6 +236,7 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
peers, ok := gs.mesh[topic]
if ok {
log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
delete(peers, p)
gs.untagPeer(p, topic)
}
Expand Down Expand Up @@ -294,6 +300,7 @@ func (gs *GossipSubRouter) Join(topic string) {
}

log.Debugf("JOIN %s", topic)
gs.tracer.Join(topic)

gmap, ok = gs.fanout[topic]
if ok {
Expand All @@ -319,6 +326,7 @@ func (gs *GossipSubRouter) Join(topic string) {

for p := range gmap {
log.Debugf("JOIN: Add mesh link to %s in %s", p, topic)
gs.tracer.Graft(p, topic)
gs.sendGraft(p, topic)
gs.tagPeer(p, topic)
}
Expand All @@ -331,11 +339,13 @@ func (gs *GossipSubRouter) Leave(topic string) {
}

log.Debugf("LEAVE %s", topic)
gs.tracer.Leave(topic)

delete(gs.mesh, topic)

for p := range gmap {
log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
gs.sendPrune(p, topic)
gs.untagPeer(p, topic)
}
Expand Down Expand Up @@ -384,8 +394,10 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {

select {
case mch <- out:
gs.tracer.SendRPC(out, p)
default:
log.Infof("dropping message to peer %s: queue full", p)
gs.tracer.DropRPC(out, p)
// push control messages that need to be retried
ctl := out.GetControl()
if ctl != nil {
Expand Down Expand Up @@ -443,6 +455,7 @@ func (gs *GossipSubRouter) heartbeat() {

for _, p := range plst {
log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic)
gs.tracer.Graft(p, topic)
peers[p] = struct{}{}
gs.tagPeer(p, topic)
topics := tograft[p]
Expand All @@ -458,6 +471,7 @@ func (gs *GossipSubRouter) heartbeat() {

for _, p := range plst[:idontneed] {
log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
delete(peers, p)
gs.untagPeer(p, topic)
topics := toprune[p]
Expand Down
Loading