diff --git a/discovery_test.go b/discovery_test.go index 8d5d4bd4..2a38285b 100644 --- a/discovery_test.go +++ b/discovery_test.go @@ -244,7 +244,7 @@ func TestGossipSubDiscoveryAfterBootstrap(t *testing.T) { s = server2 } disc := &mockDiscoveryClient{h, s} - ps := getGossipsub(ctx, h, WithDiscovery(disc, WithDiscoveryOpts(discOpts...))) + ps := getGossipSub(ctx, h, WithDiscovery(disc, WithDiscoveryOpts(discOpts...))) psubs[i] = ps topicHandlers[i], _ = ps.Join(topic) } diff --git a/go.mod b/go.mod index fbcdac26..a0aa79cd 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/yhassanzadeh13/go-libp2p-pubsub +module github.com/libp2p/go-libp2p-pubsub go 1.17 diff --git a/gossip_tracer.go b/gossip_tracer.go index 7435cf74..10bf1f62 100644 --- a/gossip_tracer.go +++ b/gossip_tracer.go @@ -9,10 +9,10 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" ) -// gossipTracer is an internal tracer that tracks IWANT requests in order to penalize +// GossipTracer is an internal tracer that tracks IWANT requests in order to penalize // peers who don't follow up on IWANT requests after an IHAVE advertisement. // The tracking of promises is probabilistic to avoid using too much memory. -type gossipTracer struct { +type GossipTracer struct { sync.Mutex idGen *msgIDGenerator @@ -27,15 +27,15 @@ type gossipTracer struct { peerPromises map[peer.ID]map[string]struct{} } -func newGossipTracer() *gossipTracer { - return &gossipTracer{ +func newGossipTracer() *GossipTracer { + return &GossipTracer{ idGen: newMsgIdGenerator(), promises: make(map[string]map[peer.ID]time.Time), peerPromises: make(map[peer.ID]map[string]struct{}), } } -func (gt *gossipTracer) Start(gs *GossipSubRouter) { +func (gt *GossipTracer) Start(gs *GossipSubRouter) { if gt == nil { return } @@ -45,7 +45,7 @@ func (gt *gossipTracer) Start(gs *GossipSubRouter) { } // track a promise to deliver a message from a list of msgIDs we are requesting -func (gt *gossipTracer) AddPromise(p peer.ID, msgIDs []string) { +func (gt *GossipTracer) AddPromise(p peer.ID, msgIDs []string) { if gt == nil { return } @@ -76,7 +76,7 @@ func (gt *gossipTracer) AddPromise(p peer.ID, msgIDs []string) { // returns the number of broken promises for each peer who didn't follow up // on an IWANT request. -func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int { +func (gt *GossipTracer) GetBrokenPromises() map[peer.ID]int { if gt == nil { return nil } @@ -114,9 +114,9 @@ func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int { return res } -var _ RawTracer = (*gossipTracer)(nil) +var _ RawTracer = (*GossipTracer)(nil) -func (gt *gossipTracer) fulfillPromise(msg *Message) { +func (gt *GossipTracer) fulfillPromise(msg *Message) { mid := gt.idGen.ID(msg) gt.Lock() @@ -140,12 +140,12 @@ func (gt *gossipTracer) fulfillPromise(msg *Message) { } } -func (gt *gossipTracer) DeliverMessage(msg *Message) { +func (gt *GossipTracer) DeliverMessage(msg *Message) { // someone delivered a message, fulfill promises for it gt.fulfillPromise(msg) } -func (gt *gossipTracer) RejectMessage(msg *Message, reason string) { +func (gt *GossipTracer) RejectMessage(msg *Message, reason string) { // A message got rejected, so we can fulfill promises and let the score penalty apply // from invalid message delivery. // We do take exception and apply promise penalty regardless in the following cases, where @@ -160,26 +160,26 @@ func (gt *gossipTracer) RejectMessage(msg *Message, reason string) { gt.fulfillPromise(msg) } -func (gt *gossipTracer) ValidateMessage(msg *Message) { +func (gt *GossipTracer) ValidateMessage(msg *Message) { // we consider the promise fulfilled as soon as the message begins validation // if it was a case of signature issue it would have been rejected immediately // without triggering the Validate trace gt.fulfillPromise(msg) } -func (gt *gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {} -func (gt *gossipTracer) RemovePeer(p peer.ID) {} -func (gt *gossipTracer) Join(topic string) {} -func (gt *gossipTracer) Leave(topic string) {} -func (gt *gossipTracer) Graft(p peer.ID, topic string) {} -func (gt *gossipTracer) Prune(p peer.ID, topic string) {} -func (gt *gossipTracer) DuplicateMessage(msg *Message) {} -func (gt *gossipTracer) RecvRPC(rpc *RPC) {} -func (gt *gossipTracer) SendRPC(rpc *RPC, p peer.ID) {} -func (gt *gossipTracer) DropRPC(rpc *RPC, p peer.ID) {} -func (gt *gossipTracer) UndeliverableMessage(msg *Message) {} - -func (gt *gossipTracer) ThrottlePeer(p peer.ID) { +func (gt *GossipTracer) AddPeer(p peer.ID, proto protocol.ID) {} +func (gt *GossipTracer) RemovePeer(p peer.ID) {} +func (gt *GossipTracer) Join(topic string) {} +func (gt *GossipTracer) Leave(topic string) {} +func (gt *GossipTracer) Graft(p peer.ID, topic string) {} +func (gt *GossipTracer) Prune(p peer.ID, topic string) {} +func (gt *GossipTracer) DuplicateMessage(msg *Message) {} +func (gt *GossipTracer) RecvRPC(rpc *RPC) {} +func (gt *GossipTracer) SendRPC(rpc *RPC, p peer.ID) {} +func (gt *GossipTracer) DropRPC(rpc *RPC, p peer.ID) {} +func (gt *GossipTracer) UndeliverableMessage(msg *Message) {} + +func (gt *GossipTracer) ThrottlePeer(p peer.ID) { gt.Lock() defer gt.Unlock() diff --git a/gossipsub.go b/gossipsub.go index 02655b6b..61271845 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -206,7 +206,10 @@ type GossipSubParams struct { // NewGossipSub returns a new PubSub object using the default GossipSubRouter as the router. func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { - rt := DefaultGossipSubRouter(h) + rt, err := DefaultGossipSubRouter(h) + if err != nil { + return nil, fmt.Errorf("failed to create default gossipsub router: %w", err) + } opts = append(opts, WithRawTracer(rt.tagTracer)) return NewGossipSubWithRouter(ctx, h, rt, opts...) } @@ -216,10 +219,12 @@ func NewGossipSubWithRouter(ctx context.Context, h host.Host, rt PubSubRouter, o return NewPubSub(ctx, h, rt, opts...) } +type GossipSubRouterOption func(*GossipSubRouter) error + // DefaultGossipSubRouter returns a new GossipSubRouter with default parameters. -func DefaultGossipSubRouter(h host.Host) *GossipSubRouter { +func DefaultGossipSubRouter(h host.Host, opts ...GossipSubRouterOption) (*GossipSubRouter, error) { params := DefaultGossipSubParams() - return &GossipSubRouter{ + rt := &GossipSubRouter{ peers: make(map[peer.ID]protocol.ID), mesh: make(map[string]map[peer.ID]struct{}), fanout: make(map[string]map[peer.ID]struct{}), @@ -237,6 +242,14 @@ func DefaultGossipSubRouter(h host.Host) *GossipSubRouter { tagTracer: newTagTracer(h.ConnManager()), params: params, } + + for _, opt := range opts { + if err := opt(rt); err != nil { + return nil, fmt.Errorf("failed to apply gossipsub router option: %w", err) + } + } + + return rt, nil } // DefaultGossipSubParams returns the default gossip sub parameters @@ -277,7 +290,7 @@ func DefaultGossipSubParams() GossipSubParams { // WithPeerScore is a gossipsub router option that enables peer scoring. func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Option { return func(ps *PubSub) error { - gs, ok := ps.rt.(*GossipSubRouter) + gs, ok := ps.rt.(GossipPubSubRouter) if !ok { return fmt.Errorf("pubsub router is not gossipsub") } @@ -294,21 +307,17 @@ func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Opt return err } - gs.score = newPeerScore(params) - gs.gossipThreshold = thresholds.GossipThreshold - gs.publishThreshold = thresholds.PublishThreshold - gs.graylistThreshold = thresholds.GraylistThreshold - gs.acceptPXThreshold = thresholds.AcceptPXThreshold - gs.opportunisticGraftThreshold = thresholds.OpportunisticGraftThreshold + gs.SetPeerScore(newPeerScore(params)) + gs.SetPeerScoreThresholds(thresholds) - gs.gossipTracer = newGossipTracer() + gs.SetGossipTracer(newGossipTracer()) // hook the tracer if ps.tracer != nil { - ps.tracer.raw = append(ps.tracer.raw, gs.score, gs.gossipTracer) + ps.tracer.raw = append(ps.tracer.raw, gs.GetPeerScore(), gs.GetGossipTracer()) } else { ps.tracer = &pubsubTracer{ - raw: []RawTracer{gs.score, gs.gossipTracer}, + raw: []RawTracer{gs.GetPeerScore(), gs.GetGossipTracer()}, pid: ps.host.ID(), idGen: ps.idGen, } @@ -321,15 +330,9 @@ func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Opt // WithFloodPublish is a gossipsub router option that enables flood publishing. // When this is enabled, published messages are forwarded to all peers with score >= // to publishThreshold -func WithFloodPublish(floodPublish bool) Option { - return func(ps *PubSub) error { - gs, ok := ps.rt.(*GossipSubRouter) - if !ok { - return fmt.Errorf("pubsub router is not gossipsub") - } - +func WithFloodPublish(floodPublish bool) GossipSubRouterOption { + return func(gs *GossipSubRouter) error { gs.floodPublish = floodPublish - return nil } } @@ -337,15 +340,9 @@ func WithFloodPublish(floodPublish bool) Option { // WithPeerExchange is a gossipsub router option that enables Peer eXchange on PRUNE. // This should generally be enabled in bootstrappers and well connected/trusted nodes // used for bootstrapping. -func WithPeerExchange(doPX bool) Option { - return func(ps *PubSub) error { - gs, ok := ps.rt.(*GossipSubRouter) - if !ok { - return fmt.Errorf("pubsub router is not gossipsub") - } - +func WithPeerExchange(doPX bool) GossipSubRouterOption { + return func(gs *GossipSubRouter) error { gs.doPX = doPX - return nil } } @@ -357,7 +354,7 @@ func WithPeerExchange(doPX bool) Option { // symmetrically configured at both ends. func WithDirectPeers(pis []peer.AddrInfo) Option { return func(ps *PubSub) error { - gs, ok := ps.rt.(*GossipSubRouter) + gs, ok := ps.rt.(GossipPubSubRouter) if !ok { return fmt.Errorf("pubsub router is not gossipsub") } @@ -368,10 +365,10 @@ func WithDirectPeers(pis []peer.AddrInfo) Option { ps.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.PermanentAddrTTL) } - gs.direct = direct + gs.SetDirectPeers(direct) - if gs.tagTracer != nil { - gs.tagTracer.direct = direct + if gs.GetTagTracer() != nil { + gs.GetTagTracer().direct = direct } return nil @@ -382,12 +379,8 @@ func WithDirectPeers(pis []peer.AddrInfo) Option { // heartbeat ticks between attempting to reconnect direct peers that are not // currently connected. A "tick" is based on the heartbeat interval, which is // 1s by default. The default value for direct connect ticks is 300. -func WithDirectConnectTicks(t uint64) Option { - return func(ps *PubSub) error { - gs, ok := ps.rt.(*GossipSubRouter) - if !ok { - return fmt.Errorf("pubsub router is not gossipsub") - } +func WithDirectConnectTicks(t uint64) GossipSubRouterOption { + return func(gs *GossipSubRouter) error { gs.params.DirectConnectTicks = t return nil } @@ -395,12 +388,8 @@ func WithDirectConnectTicks(t uint64) Option { // WithGossipSubParams is a gossip sub router option that allows a custom // config to be set when instantiating the gossipsub router. -func WithGossipSubParams(cfg GossipSubParams) Option { - return func(ps *PubSub) error { - gs, ok := ps.rt.(*GossipSubRouter) - if !ok { - return fmt.Errorf("pubsub router is not gossipsub") - } +func WithGossipSubParams(cfg GossipSubParams) GossipSubRouterOption { + return func(gs *GossipSubRouter) error { // Overwrite current config and associated variables in the router. gs.params = cfg gs.connect = make(chan connectInfo, cfg.MaxPendingConnections) @@ -410,6 +399,25 @@ func WithGossipSubParams(cfg GossipSubParams) Option { } } +type GossipPubSubRouter interface { + PubSubRouter + + SetPeerScore(*PeerScore) + GetPeerScore() *PeerScore + + SetPeerScoreThresholds(*PeerScoreThresholds) + + SetGossipTracer(*GossipTracer) + GetGossipTracer() *GossipTracer + + GetTagTracer() *TagTracer + + SetDirectPeers(map[peer.ID]struct{}) + + SetPeerGater(*PeerGater) + GetPeerGater() *PeerGater +} + // GossipSubRouter is a router that implements the gossipsub protocol. // For each topic we have joined, we maintain an overlay through which // messages flow; this is the mesh map. @@ -437,10 +445,10 @@ type GossipSubRouter struct { mcache *MessageCache tracer *pubsubTracer - score *peerScore - gossipTracer *gossipTracer - tagTracer *tagTracer - gate *peerGater + score *PeerScore + gossipTracer *GossipTracer + tagTracer *TagTracer + gate *PeerGater // config for gossipsub parameters params GossipSubParams @@ -476,6 +484,8 @@ type GossipSubRouter struct { heartbeatTicks uint64 } +var _ GossipPubSubRouter = (*GossipSubRouter)(nil) + type connectInfo struct { p peer.ID spr *record.Envelope @@ -1923,7 +1933,51 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID return peers } -func (gs *GossipSubRouter) WithTagTracerPubsubOption() Option { +func (gs *GossipSubRouter) SetPeerScore(score *PeerScore) { + gs.score = score +} + +func (gs *GossipSubRouter) GetPeerScore() *PeerScore { + return gs.score +} + +func (gs *GossipSubRouter) SetPeerScoreThresholds(thresholds *PeerScoreThresholds) { + gs.gossipThreshold = thresholds.GossipThreshold + gs.publishThreshold = thresholds.PublishThreshold + gs.graylistThreshold = thresholds.GraylistThreshold + gs.acceptPXThreshold = thresholds.AcceptPXThreshold + gs.opportunisticGraftThreshold = thresholds.OpportunisticGraftThreshold +} + +func (gs *GossipSubRouter) SetGossipTracer(tracer *GossipTracer) { + gs.gossipTracer = tracer +} + +func (gs *GossipSubRouter) GetGossipTracer() *GossipTracer { + return gs.gossipTracer +} + +func (gs *GossipSubRouter) GetTagTracer() *TagTracer { + return gs.tagTracer +} + +func (gs *GossipSubRouter) SetDirectPeers(direct map[peer.ID]struct{}) { + gs.direct = direct +} + +func (gs *GossipSubRouter) SetPeerGater(gater *PeerGater) { + gs.gate = gater +} + +func (gs *GossipSubRouter) GetPeerGater() *PeerGater { + return gs.gate +} + +// WithDefaultTagTracer returns the tag tracer of the GossipSubRouter as a PubSub option. +// This is useful for cases where the GossipSubRouter is instantiated externally, and is +// injected into the GossipSub constructor as a dependency. This allows the tag tracer to be +// also injected into the GossipSub constructor as a PubSub option dependency. +func (gs *GossipSubRouter) WithDefaultTagTracer() Option { return WithRawTracer(gs.tagTracer) } diff --git a/gossipsub_connmgr_test.go b/gossipsub_connmgr_test.go index 0a97312c..9502b941 100644 --- a/gossipsub_connmgr_test.go +++ b/gossipsub_connmgr_test.go @@ -79,8 +79,8 @@ func TestGossipsubConnTagMessageDeliveries(t *testing.T) { // use flood publishing, so non-mesh peers will still be delivering messages // to everyone - psubs := getGossipsubs(ctx, honestHosts, - WithFloodPublish(true)) + routers := getGossipSubRouters(honestHosts, WithFloodPublish(true)) + psubs := getGossipSubsWithRouters(ctx, honestHosts, routers) // sybil squatters to be connected later sybilHosts := getNetHosts(t, ctx, nSquatter) diff --git a/gossipsub_feat.go b/gossipsub_feat.go index d5750af3..83a3b9d9 100644 --- a/gossipsub_feat.go +++ b/gossipsub_feat.go @@ -1,8 +1,6 @@ package pubsub import ( - "fmt" - "github.com/libp2p/go-libp2p/core/protocol" ) @@ -37,16 +35,10 @@ func GossipSubDefaultFeatures(feat GossipSubFeature, proto protocol.ID) bool { // WithGossipSubProtocols is a gossipsub router option that configures a custom protocol list // and feature test function -func WithGossipSubProtocols(protos []protocol.ID, feature GossipSubFeatureTest) Option { - return func(ps *PubSub) error { - gs, ok := ps.rt.(*GossipSubRouter) - if !ok { - return fmt.Errorf("pubsub router is not gossipsub") - } - +func WithGossipSubProtocols(protos []protocol.ID, feature GossipSubFeatureTest) GossipSubRouterOption { + return func(gs *GossipSubRouter) error { gs.protos = protos gs.feature = feature - return nil } } diff --git a/gossipsub_feat_test.go b/gossipsub_feat_test.go index 712f16df..0643d9e9 100644 --- a/gossipsub_feat_test.go +++ b/gossipsub_feat_test.go @@ -43,8 +43,8 @@ func TestGossipSubCustomProtocols(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() hosts := getNetHosts(t, ctx, 3) - - gsubs := getGossipsubs(ctx, hosts[:2], WithGossipSubProtocols(protos, features)) + routers := getGossipSubRouters(hosts[:2], WithGossipSubProtocols(protos, features)) + gsubs := getGossipSubsWithRouters(ctx, hosts[:2], routers) fsub := getPubsub(ctx, hosts[2]) psubs := append(gsubs, fsub) diff --git a/gossipsub_matchfn_test.go b/gossipsub_matchfn_test.go index 516fdf5f..a5f5d8a8 100644 --- a/gossipsub_matchfn_test.go +++ b/gossipsub_matchfn_test.go @@ -19,10 +19,10 @@ func TestGossipSubMatchingFn(t *testing.T) { h := getNetHosts(t, ctx, 4) psubs := []*PubSub{ - getGossipsub(ctx, h[0], WithProtocolMatchFn(protocolNameMatch), WithGossipSubProtocols([]protocol.ID{customsubA100, GossipSubID_v11}, GossipSubDefaultFeatures)), - getGossipsub(ctx, h[1], WithProtocolMatchFn(protocolNameMatch), WithGossipSubProtocols([]protocol.ID{customsubA101Beta}, GossipSubDefaultFeatures)), - getGossipsub(ctx, h[2], WithProtocolMatchFn(protocolNameMatch), WithGossipSubProtocols([]protocol.ID{GossipSubID_v11}, GossipSubDefaultFeatures)), - getGossipsub(ctx, h[3], WithProtocolMatchFn(protocolNameMatch), WithGossipSubProtocols([]protocol.ID{customsubB100}, GossipSubDefaultFeatures)), + getGossipSubWithRouter(ctx, h[0], getGossipSubRouter(h[0], WithGossipSubProtocols([]protocol.ID{customsubA100, GossipSubID_v11}, GossipSubDefaultFeatures)), WithProtocolMatchFn(protocolNameMatch)), + getGossipSubWithRouter(ctx, h[1], getGossipSubRouter(h[1], WithGossipSubProtocols([]protocol.ID{customsubA101Beta}, GossipSubDefaultFeatures)), WithProtocolMatchFn(protocolNameMatch)), + getGossipSubWithRouter(ctx, h[2], getGossipSubRouter(h[2], WithGossipSubProtocols([]protocol.ID{GossipSubID_v11}, GossipSubDefaultFeatures)), WithProtocolMatchFn(protocolNameMatch)), + getGossipSubWithRouter(ctx, h[3], getGossipSubRouter(h[3], WithGossipSubProtocols([]protocol.ID{customsubB100}, GossipSubDefaultFeatures)), WithProtocolMatchFn(protocolNameMatch)), } connect(t, h[0], h[1]) diff --git a/gossipsub_test.go b/gossipsub_test.go index ed7859a1..d7544ca8 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -25,7 +25,7 @@ import ( "github.com/libp2p/go-msgio/protoio" ) -func getGossipsub(ctx context.Context, h host.Host, opts ...Option) *PubSub { +func getGossipSub(ctx context.Context, h host.Host, opts ...Option) *PubSub { ps, err := NewGossipSub(ctx, h, opts...) if err != nil { panic(err) @@ -33,20 +33,57 @@ func getGossipsub(ctx context.Context, h host.Host, opts ...Option) *PubSub { return ps } -func getGossipsubs(ctx context.Context, hs []host.Host, opts ...Option) []*PubSub { +func getGossipSubs(ctx context.Context, hs []host.Host, opts ...Option) []*PubSub { var psubs []*PubSub for _, h := range hs { - psubs = append(psubs, getGossipsub(ctx, h, opts...)) + psubs = append(psubs, getGossipSub(ctx, h, opts...)) } return psubs } +func getGossipSubsWithRouters(ctx context.Context, hs []host.Host, routers []GossipPubSubRouter, opts ...Option) []*PubSub { + var psubs []*PubSub + if len(hs) != len(routers) { + panic("hosts and routers must have the same length") + } + + for i, h := range hs { + psubs = append(psubs, getGossipSubWithRouter(ctx, h, routers[i], opts...)) + } + + return psubs +} + +func getGossipSubWithRouter(ctx context.Context, hs host.Host, router GossipPubSubRouter, opts ...Option) *PubSub { + ps, err := NewGossipSubWithRouter(ctx, hs, router, opts...) + if err != nil { + panic(err) + } + return ps +} + +func getGossipSubRouter(h host.Host, opts ...GossipSubRouterOption) *GossipSubRouter { + ps, err := DefaultGossipSubRouter(h, opts...) + if err != nil { + panic(err) + } + return ps +} + +func getGossipSubRouters(hs []host.Host, opts ...GossipSubRouterOption) []GossipPubSubRouter { + var routers []GossipPubSubRouter + for _, h := range hs { + routers = append(routers, getGossipSubRouter(h, opts...)) + } + return routers +} + func TestSparseGossipsub(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() hosts := getNetHosts(t, ctx, 20) - psubs := getGossipsubs(ctx, hosts) + psubs := getGossipSubs(ctx, hosts) var msgs []*Subscription for _, ps := range psubs { @@ -87,7 +124,7 @@ func TestDenseGossipsub(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 20) - psubs := getGossipsubs(ctx, hosts) + psubs := getGossipSubs(ctx, hosts) var msgs []*Subscription for _, ps := range psubs { @@ -128,7 +165,7 @@ func TestGossipsubFanout(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 20) - psubs := getGossipsubs(ctx, hosts) + psubs := getGossipSubs(ctx, hosts) var msgs []*Subscription for _, ps := range psubs[1:] { @@ -197,7 +234,7 @@ func TestGossipsubFanoutMaintenance(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 20) - psubs := getGossipsubs(ctx, hosts) + psubs := getGossipSubs(ctx, hosts) var msgs []*Subscription for _, ps := range psubs[1:] { @@ -282,7 +319,7 @@ func TestGossipsubFanoutExpiry(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 10) - psubs := getGossipsubs(ctx, hosts) + psubs := getGossipSubs(ctx, hosts) var msgs []*Subscription for _, ps := range psubs[1:] { @@ -341,7 +378,7 @@ func TestGossipsubGossip(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 20) - psubs := getGossipsubs(ctx, hosts) + psubs := getGossipSubs(ctx, hosts) var msgs []*Subscription for _, ps := range psubs { @@ -389,7 +426,7 @@ func TestGossipsubGossipPiggyback(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 20) - psubs := getGossipsubs(ctx, hosts) + psubs := getGossipSubs(ctx, hosts) var msgs []*Subscription for _, ps := range psubs { @@ -457,7 +494,7 @@ func TestGossipsubGossipPropagation(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 20) - psubs := getGossipsubs(ctx, hosts) + psubs := getGossipSubs(ctx, hosts) hosts1 := hosts[:GossipSubD+1] hosts2 := append(hosts[GossipSubD+1:], hosts[0]) @@ -538,7 +575,7 @@ func TestGossipsubPrune(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 20) - psubs := getGossipsubs(ctx, hosts) + psubs := getGossipSubs(ctx, hosts) var msgs []*Subscription for _, ps := range psubs { @@ -594,7 +631,9 @@ func TestGossipsubPruneBackoffTime(t *testing.T) { params.HeartbeatInitialDelay = time.Millisecond * 10 params.HeartbeatInterval = time.Millisecond * 100 - psubs := getGossipsubs(ctx, hosts, WithGossipSubParams(params), WithPeerScore( + routers := getGossipSubRouters(hosts, WithGossipSubParams(params)) + + psubs := getGossipSubsWithRouters(ctx, hosts, routers, WithPeerScore( &PeerScoreParams{ AppSpecificScore: func(p peer.ID) float64 { if p == hosts[0].ID() { @@ -685,7 +724,7 @@ func TestGossipsubGraft(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 20) - psubs := getGossipsubs(ctx, hosts) + psubs := getGossipSubs(ctx, hosts) sparseConnect(t, hosts) @@ -730,7 +769,7 @@ func TestGossipsubRemovePeer(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 20) - psubs := getGossipsubs(ctx, hosts) + psubs := getGossipSubs(ctx, hosts) var msgs []*Subscription for _, ps := range psubs { @@ -779,7 +818,7 @@ func TestGossipsubGraftPruneRetry(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 10) - psubs := getGossipsubs(ctx, hosts) + psubs := getGossipSubs(ctx, hosts) denseConnect(t, hosts) var topics []string @@ -829,7 +868,7 @@ func TestGossipsubControlPiggyback(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 10) - psubs := getGossipsubs(ctx, hosts) + psubs := getGossipSubs(ctx, hosts) denseConnect(t, hosts) for _, ps := range psubs { @@ -911,7 +950,7 @@ func TestMixedGossipsub(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 30) - gsubs := getGossipsubs(ctx, hosts[:20]) + gsubs := getGossipSubs(ctx, hosts[:20]) fsubs := getPubsubs(ctx, hosts[20:]) psubs := append(gsubs, fsubs...) @@ -955,7 +994,7 @@ func TestGossipsubMultihops(t *testing.T) { hosts := getNetHosts(t, ctx, 6) - psubs := getGossipsubs(ctx, hosts) + psubs := getGossipSubs(ctx, hosts) connect(t, hosts[0], hosts[1]) connect(t, hosts[1], hosts[2]) @@ -997,7 +1036,7 @@ func TestGossipsubTreeTopology(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 10) - psubs := getGossipsubs(ctx, hosts) + psubs := getGossipSubs(ctx, hosts) connect(t, hosts[0], hosts[1]) connect(t, hosts[1], hosts[2]) @@ -1061,7 +1100,8 @@ func TestGossipsubStarTopology(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 20) - psubs := getGossipsubs(ctx, hosts, WithPeerExchange(true), WithFloodPublish(true)) + routers := getGossipSubRouters(hosts, WithPeerExchange(true), WithFloodPublish(true)) + psubs := getGossipSubsWithRouters(ctx, hosts, routers) // configure the center of the star with a very low D psubs[0].eval <- func() { @@ -1145,7 +1185,8 @@ func TestGossipsubStarTopologyWithSignedPeerRecords(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 20) - psubs := getGossipsubs(ctx, hosts, WithPeerExchange(true), WithFloodPublish(true)) + routers := getGossipSubRouters(hosts, WithPeerExchange(true), WithFloodPublish(true)) + psubs := getGossipSubsWithRouters(ctx, hosts, routers) // configure the center of the star with a very low D psubs[0].eval <- func() { @@ -1224,9 +1265,9 @@ func TestGossipsubDirectPeers(t *testing.T) { h := getNetHosts(t, ctx, 3) psubs := []*PubSub{ - getGossipsub(ctx, h[0], WithDirectConnectTicks(2)), - getGossipsub(ctx, h[1], WithDirectPeers([]peer.AddrInfo{{ID: h[2].ID(), Addrs: h[2].Addrs()}}), WithDirectConnectTicks(2)), - getGossipsub(ctx, h[2], WithDirectPeers([]peer.AddrInfo{{ID: h[1].ID(), Addrs: h[1].Addrs()}}), WithDirectConnectTicks(2)), + getGossipSubWithRouter(ctx, h[0], getGossipSubRouter(h[0], WithDirectConnectTicks(2))), + getGossipSubWithRouter(ctx, h[1], getGossipSubRouter(h[1], WithDirectConnectTicks(2)), WithDirectPeers([]peer.AddrInfo{{ID: h[2].ID(), Addrs: h[2].Addrs()}})), + getGossipSubWithRouter(ctx, h[2], getGossipSubRouter(h[2], WithDirectConnectTicks(2)), WithDirectPeers([]peer.AddrInfo{{ID: h[1].ID(), Addrs: h[1].Addrs()}})), } connect(t, h[0], h[1]) @@ -1288,13 +1329,13 @@ func TestGossipSubPeerFilter(t *testing.T) { h := getNetHosts(t, ctx, 3) psubs := []*PubSub{ - getGossipsub(ctx, h[0], WithPeerFilter(func(pid peer.ID, topic string) bool { + getGossipSub(ctx, h[0], WithPeerFilter(func(pid peer.ID, topic string) bool { return pid == h[1].ID() })), - getGossipsub(ctx, h[1], WithPeerFilter(func(pid peer.ID, topic string) bool { + getGossipSub(ctx, h[1], WithPeerFilter(func(pid peer.ID, topic string) bool { return pid == h[0].ID() })), - getGossipsub(ctx, h[2]), + getGossipSub(ctx, h[2]), } connect(t, h[0], h[1]) @@ -1330,9 +1371,9 @@ func TestGossipsubDirectPeersFanout(t *testing.T) { h := getNetHosts(t, ctx, 3) psubs := []*PubSub{ - getGossipsub(ctx, h[0]), - getGossipsub(ctx, h[1], WithDirectPeers([]peer.AddrInfo{{ID: h[2].ID(), Addrs: h[2].Addrs()}})), - getGossipsub(ctx, h[2], WithDirectPeers([]peer.AddrInfo{{ID: h[1].ID(), Addrs: h[1].Addrs()}})), + getGossipSub(ctx, h[0]), + getGossipSub(ctx, h[1], WithDirectPeers([]peer.AddrInfo{{ID: h[2].ID(), Addrs: h[2].Addrs()}})), + getGossipSub(ctx, h[2], WithDirectPeers([]peer.AddrInfo{{ID: h[1].ID(), Addrs: h[1].Addrs()}})), } connect(t, h[0], h[1]) @@ -1416,7 +1457,8 @@ func TestGossipsubFloodPublish(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 20) - psubs := getGossipsubs(ctx, hosts, WithFloodPublish(true)) + routers := getGossipSubRouters(hosts, WithFloodPublish(true)) + psubs := getGossipSubsWithRouters(ctx, hosts, routers) // build the star for i := 1; i < 20; i++ { @@ -1451,7 +1493,7 @@ func TestGossipsubEnoughPeers(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 20) - psubs := getGossipsubs(ctx, hosts) + psubs := getGossipSubs(ctx, hosts) for _, ps := range psubs { _, err := ps.Subscribe("test") @@ -1500,8 +1542,8 @@ func TestGossipsubCustomParams(t *testing.T) { wantedMaxPendingConns := 23 params.MaxPendingConnections = wantedMaxPendingConns hosts := getNetHosts(t, ctx, 1) - psubs := getGossipsubs(ctx, hosts, - WithGossipSubParams(params)) + routers := getGossipSubRouters(hosts, WithGossipSubParams(params)) + psubs := getGossipSubsWithRouters(ctx, hosts, routers) if len(psubs) != 1 { t.Fatalf("incorrect number of pusbub objects received: wanted %d but got %d", 1, len(psubs)) @@ -1529,7 +1571,7 @@ func TestGossipsubNegativeScore(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 20) - psubs := getGossipsubs(ctx, hosts, + psubs := getGossipSubs(ctx, hosts, WithPeerScore( &PeerScoreParams{ AppSpecificScore: func(p peer.ID) float64 { @@ -1613,7 +1655,7 @@ func TestGossipsubScoreValidatorEx(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 3) - psubs := getGossipsubs(ctx, hosts, + psubs := getGossipSubs(ctx, hosts, WithPeerScore( &PeerScoreParams{ AppSpecificScore: func(p peer.ID) float64 { return 0 }, @@ -1702,7 +1744,7 @@ func TestGossipsubPiggybackControl(t *testing.T) { h := bhost.NewBlankHost(swarmt.GenSwarm(t)) defer h.Close() - ps := getGossipsub(ctx, h) + ps := getGossipSub(ctx, h) blah := peer.ID("bogotr0n") @@ -1750,7 +1792,7 @@ func TestGossipsubMultipleGraftTopics(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 2) - psubs := getGossipsubs(ctx, hosts) + psubs := getGossipSubs(ctx, hosts) sparseConnect(t, hosts) time.Sleep(time.Second * 1) @@ -1819,8 +1861,8 @@ func TestGossipsubOpportunisticGrafting(t *testing.T) { hosts := getNetHosts(t, ctx, 50) // pubsubs for the first 10 hosts - psubs := getGossipsubs(ctx, hosts[:10], - WithFloodPublish(true), + routers := getGossipSubRouters(hosts[:10], WithFloodPublish(true)) + psubs := getGossipSubsWithRouters(ctx, hosts[:10], routers, WithPeerScore( &PeerScoreParams{ AppSpecificScore: func(peer.ID) float64 { return 0 }, @@ -1920,8 +1962,8 @@ func TestGossipSubLeaveTopic(t *testing.T) { h := getNetHosts(t, ctx, 2) psubs := []*PubSub{ - getGossipsub(ctx, h[0]), - getGossipsub(ctx, h[1]), + getGossipSub(ctx, h[0]), + getGossipSub(ctx, h[1]), } connect(t, h[0], h[1]) @@ -1991,9 +2033,9 @@ func TestGossipSubJoinTopic(t *testing.T) { h := getNetHosts(t, ctx, 3) psubs := []*PubSub{ - getGossipsub(ctx, h[0]), - getGossipsub(ctx, h[1]), - getGossipsub(ctx, h[2]), + getGossipSub(ctx, h[0]), + getGossipSub(ctx, h[1]), + getGossipSub(ctx, h[2]), } connect(t, h[0], h[1]) @@ -2072,9 +2114,8 @@ func TestGossipsubPeerScoreInspect(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 2) - inspector := &mockPeerScoreInspector{} - psub1 := getGossipsub(ctx, hosts[0], + psub1 := getGossipSub(ctx, hosts[0], WithPeerScore( &PeerScoreParams{ Topics: map[string]*TopicScoreParams{ @@ -2097,8 +2138,9 @@ func TestGossipsubPeerScoreInspect(t *testing.T) { PublishThreshold: -10, GraylistThreshold: -1000, }), - WithPeerScoreInspect(inspector.inspect, time.Second)) - psub2 := getGossipsub(ctx, hosts[1]) + WithPeerScoreInspect(inspector.inspect, time.Second), + ) + psub2 := getGossipSub(ctx, hosts[1]) psubs := []*PubSub{psub1, psub2} connect(t, hosts[0], hosts[1]) @@ -2133,7 +2175,7 @@ func TestGossipsubPeerScoreResetTopicParams(t *testing.T) { hosts := getNetHosts(t, ctx, 1) - ps := getGossipsub(ctx, hosts[0], + ps := getGossipSub(ctx, hosts[0], WithPeerScore( &PeerScoreParams{ Topics: map[string]*TopicScoreParams{ @@ -2199,7 +2241,7 @@ func TestGossipsubRPCFragmentation(t *testing.T) { defer cancel() hosts := getNetHosts(t, ctx, 2) - ps := getGossipsub(ctx, hosts[0]) + ps := getGossipSub(ctx, hosts[0]) // make a fake peer that requests everything through IWANT gossip iwe := iwantEverything{h: hosts[1]} diff --git a/peer_gater.go b/peer_gater.go index 4e26bc26..af778539 100644 --- a/peer_gater.go +++ b/peer_gater.go @@ -116,7 +116,7 @@ func DefaultPeerGaterParams() *PeerGaterParams { } // the gater object. -type peerGater struct { +type PeerGater struct { sync.Mutex host host.Host @@ -163,7 +163,7 @@ type peerGaterStats struct { // interval. func WithPeerGater(params *PeerGaterParams) Option { return func(ps *PubSub) error { - gs, ok := ps.rt.(*GossipSubRouter) + gs, ok := ps.rt.(GossipPubSubRouter) if !ok { return fmt.Errorf("pubsub router is not gossipsub") } @@ -173,14 +173,14 @@ func WithPeerGater(params *PeerGaterParams) Option { return err } - gs.gate = newPeerGater(ps.ctx, ps.host, params) + gs.SetPeerGater(newPeerGater(ps.ctx, ps.host, params)) // hook the tracer if ps.tracer != nil { - ps.tracer.raw = append(ps.tracer.raw, gs.gate) + ps.tracer.raw = append(ps.tracer.raw, gs.GetPeerGater()) } else { ps.tracer = &pubsubTracer{ - raw: []RawTracer{gs.gate}, + raw: []RawTracer{gs.GetPeerGater()}, pid: ps.host.ID(), idGen: ps.idGen, } @@ -190,8 +190,8 @@ func WithPeerGater(params *PeerGaterParams) Option { } } -func newPeerGater(ctx context.Context, host host.Host, params *PeerGaterParams) *peerGater { - pg := &peerGater{ +func newPeerGater(ctx context.Context, host host.Host, params *PeerGaterParams) *PeerGater { + pg := &PeerGater{ params: params, peerStats: make(map[peer.ID]*peerGaterStats), ipStats: make(map[string]*peerGaterStats), @@ -201,7 +201,7 @@ func newPeerGater(ctx context.Context, host host.Host, params *PeerGaterParams) return pg } -func (pg *peerGater) background(ctx context.Context) { +func (pg *PeerGater) background(ctx context.Context) { tick := time.NewTicker(pg.params.DecayInterval) defer tick.Stop() @@ -216,7 +216,7 @@ func (pg *peerGater) background(ctx context.Context) { } } -func (pg *peerGater) decayStats() { +func (pg *PeerGater) decayStats() { pg.Lock() defer pg.Unlock() @@ -258,7 +258,7 @@ func (pg *peerGater) decayStats() { } } -func (pg *peerGater) getPeerStats(p peer.ID) *peerGaterStats { +func (pg *PeerGater) getPeerStats(p peer.ID) *peerGaterStats { st, ok := pg.peerStats[p] if !ok { st = pg.getIPStats(p) @@ -267,7 +267,7 @@ func (pg *peerGater) getPeerStats(p peer.ID) *peerGaterStats { return st } -func (pg *peerGater) getIPStats(p peer.ID) *peerGaterStats { +func (pg *PeerGater) getIPStats(p peer.ID) *peerGaterStats { ip := pg.getPeerIP(p) st, ok := pg.ipStats[ip] if !ok { @@ -277,7 +277,7 @@ func (pg *peerGater) getIPStats(p peer.ID) *peerGaterStats { return st } -func (pg *peerGater) getPeerIP(p peer.ID) string { +func (pg *PeerGater) getPeerIP(p peer.ID) string { if pg.getIP != nil { return pg.getIP(p) } @@ -317,7 +317,7 @@ func (pg *peerGater) getPeerIP(p peer.ID) string { } // router interface -func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus { +func (pg *PeerGater) AcceptFrom(p peer.ID) AcceptStatus { if pg == nil { return AcceptAll } @@ -363,10 +363,10 @@ func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus { } // -- RawTracer interface methods -var _ RawTracer = (*peerGater)(nil) +var _ RawTracer = (*PeerGater)(nil) // tracer interface -func (pg *peerGater) AddPeer(p peer.ID, proto protocol.ID) { +func (pg *PeerGater) AddPeer(p peer.ID, proto protocol.ID) { pg.Lock() defer pg.Unlock() @@ -374,7 +374,7 @@ func (pg *peerGater) AddPeer(p peer.ID, proto protocol.ID) { st.connected++ } -func (pg *peerGater) RemovePeer(p peer.ID) { +func (pg *PeerGater) RemovePeer(p peer.ID) { pg.Lock() defer pg.Unlock() @@ -385,19 +385,19 @@ func (pg *peerGater) RemovePeer(p peer.ID) { delete(pg.peerStats, p) } -func (pg *peerGater) Join(topic string) {} -func (pg *peerGater) Leave(topic string) {} -func (pg *peerGater) Graft(p peer.ID, topic string) {} -func (pg *peerGater) Prune(p peer.ID, topic string) {} +func (pg *PeerGater) Join(topic string) {} +func (pg *PeerGater) Leave(topic string) {} +func (pg *PeerGater) Graft(p peer.ID, topic string) {} +func (pg *PeerGater) Prune(p peer.ID, topic string) {} -func (pg *peerGater) ValidateMessage(msg *Message) { +func (pg *PeerGater) ValidateMessage(msg *Message) { pg.Lock() defer pg.Unlock() pg.validate++ } -func (pg *peerGater) DeliverMessage(msg *Message) { +func (pg *PeerGater) DeliverMessage(msg *Message) { pg.Lock() defer pg.Unlock() @@ -413,7 +413,7 @@ func (pg *peerGater) DeliverMessage(msg *Message) { st.deliver += weight } -func (pg *peerGater) RejectMessage(msg *Message, reason string) { +func (pg *PeerGater) RejectMessage(msg *Message, reason string) { pg.Lock() defer pg.Unlock() @@ -434,7 +434,7 @@ func (pg *peerGater) RejectMessage(msg *Message, reason string) { } } -func (pg *peerGater) DuplicateMessage(msg *Message) { +func (pg *PeerGater) DuplicateMessage(msg *Message) { pg.Lock() defer pg.Unlock() @@ -442,12 +442,12 @@ func (pg *peerGater) DuplicateMessage(msg *Message) { st.duplicate++ } -func (pg *peerGater) ThrottlePeer(p peer.ID) {} +func (pg *PeerGater) ThrottlePeer(p peer.ID) {} -func (pg *peerGater) RecvRPC(rpc *RPC) {} +func (pg *PeerGater) RecvRPC(rpc *RPC) {} -func (pg *peerGater) SendRPC(rpc *RPC, p peer.ID) {} +func (pg *PeerGater) SendRPC(rpc *RPC, p peer.ID) {} -func (pg *peerGater) DropRPC(rpc *RPC, p peer.ID) {} +func (pg *PeerGater) DropRPC(rpc *RPC, p peer.ID) {} -func (pg *peerGater) UndeliverableMessage(msg *Message) {} +func (pg *PeerGater) UndeliverableMessage(msg *Message) {} diff --git a/score.go b/score.go index ec6c8213..eb84e262 100644 --- a/score.go +++ b/score.go @@ -61,7 +61,7 @@ type topicStats struct { invalidMessageDeliveries float64 } -type peerScore struct { +type PeerScore struct { sync.Mutex // the score parameters @@ -85,7 +85,7 @@ type peerScore struct { inspectPeriod time.Duration } -var _ RawTracer = (*peerScore)(nil) +var _ RawTracer = (*PeerScore)(nil) type messageDeliveries struct { seenMsgTTL time.Duration @@ -151,41 +151,41 @@ type TopicScoreSnapshot struct { // This option must be passed _after_ the WithPeerScore option. func WithPeerScoreInspect(inspect interface{}, period time.Duration) Option { return func(ps *PubSub) error { - gs, ok := ps.rt.(*GossipSubRouter) + gs, ok := ps.rt.(GossipPubSubRouter) if !ok { return fmt.Errorf("pubsub router is not gossipsub") } - if gs.score == nil { + if gs.GetPeerScore() == nil { return fmt.Errorf("peer scoring is not enabled") } - if gs.score.inspect != nil || gs.score.inspectEx != nil { + if gs.GetPeerScore().inspect != nil || gs.GetPeerScore().inspectEx != nil { return fmt.Errorf("duplicate peer score inspector") } switch i := inspect.(type) { case PeerScoreInspectFn: - gs.score.inspect = i + gs.GetPeerScore().inspect = i case ExtendedPeerScoreInspectFn: - gs.score.inspectEx = i + gs.GetPeerScore().inspectEx = i default: return fmt.Errorf("unknown peer score insector type: %v", inspect) } - gs.score.inspectPeriod = period + gs.GetPeerScore().inspectPeriod = period return nil } } // implementation -func newPeerScore(params *PeerScoreParams) *peerScore { +func newPeerScore(params *PeerScoreParams) *PeerScore { seenMsgTTL := params.SeenMsgTTL if seenMsgTTL == 0 { seenMsgTTL = TimeCacheDuration } - return &peerScore{ + return &PeerScore{ params: params, peerStats: make(map[peer.ID]*peerStats), peerIPs: make(map[string]map[peer.ID]struct{}), @@ -198,7 +198,7 @@ func newPeerScore(params *PeerScoreParams) *peerScore { // If the topic previously had parameters and the parameters are lowering delivery caps, // then the score counters are recapped appropriately. // Note: assumes that the topic score parameters have already been validated -func (ps *peerScore) SetTopicScoreParams(topic string, p *TopicScoreParams) error { +func (ps *PeerScore) SetTopicScoreParams(topic string, p *TopicScoreParams) error { ps.Lock() defer ps.Unlock() @@ -241,7 +241,7 @@ func (ps *peerScore) SetTopicScoreParams(topic string, p *TopicScoreParams) erro } // router interface -func (ps *peerScore) Start(gs *GossipSubRouter) { +func (ps *PeerScore) Start(gs *GossipSubRouter) { if ps == nil { return } @@ -251,7 +251,7 @@ func (ps *peerScore) Start(gs *GossipSubRouter) { go ps.background(gs.p.ctx) } -func (ps *peerScore) Score(p peer.ID) float64 { +func (ps *PeerScore) Score(p peer.ID) float64 { if ps == nil { return 0 } @@ -262,7 +262,7 @@ func (ps *peerScore) Score(p peer.ID) float64 { return ps.score(p) } -func (ps *peerScore) score(p peer.ID) float64 { +func (ps *PeerScore) score(p peer.ID) float64 { pstats, ok := ps.peerStats[p] if !ok { return 0 @@ -341,7 +341,7 @@ func (ps *peerScore) score(p peer.ID) float64 { return score } -func (ps *peerScore) ipColocationFactor(p peer.ID) float64 { +func (ps *PeerScore) ipColocationFactor(p peer.ID) float64 { pstats, ok := ps.peerStats[p] if !ok { return 0 @@ -388,7 +388,7 @@ loop: } // behavioural pattern penalties -func (ps *peerScore) AddPenalty(p peer.ID, count int) { +func (ps *PeerScore) AddPenalty(p peer.ID, count int) { if ps == nil { return } @@ -405,7 +405,7 @@ func (ps *peerScore) AddPenalty(p peer.ID, count int) { } // periodic maintenance -func (ps *peerScore) background(ctx context.Context) { +func (ps *PeerScore) background(ctx context.Context) { refreshScores := time.NewTicker(ps.params.DecayInterval) defer refreshScores.Stop() @@ -445,7 +445,7 @@ func (ps *peerScore) background(ctx context.Context) { } // inspectScores dumps all tracked scores into the inspect function. -func (ps *peerScore) inspectScores() { +func (ps *PeerScore) inspectScores() { if ps.inspect != nil { ps.inspectScoresSimple() } @@ -454,7 +454,7 @@ func (ps *peerScore) inspectScores() { } } -func (ps *peerScore) inspectScoresSimple() { +func (ps *PeerScore) inspectScoresSimple() { ps.Lock() scores := make(map[peer.ID]float64, len(ps.peerStats)) for p := range ps.peerStats { @@ -469,7 +469,7 @@ func (ps *peerScore) inspectScoresSimple() { go ps.inspect(scores) } -func (ps *peerScore) inspectScoresExtended() { +func (ps *PeerScore) inspectScoresExtended() { ps.Lock() scores := make(map[peer.ID]*PeerScoreSnapshot, len(ps.peerStats)) for p, pstats := range ps.peerStats { @@ -501,7 +501,7 @@ func (ps *peerScore) inspectScoresExtended() { // refreshScores decays scores, and purges score records for disconnected peers, // once their expiry has elapsed. -func (ps *peerScore) refreshScores() { +func (ps *PeerScore) refreshScores() { ps.Lock() defer ps.Unlock() @@ -565,7 +565,7 @@ func (ps *peerScore) refreshScores() { } // refreshIPs refreshes IPs we know of peers we're tracking. -func (ps *peerScore) refreshIPs() { +func (ps *PeerScore) refreshIPs() { ps.Lock() defer ps.Unlock() @@ -584,7 +584,7 @@ func (ps *peerScore) refreshIPs() { } } -func (ps *peerScore) gcDeliveryRecords() { +func (ps *PeerScore) gcDeliveryRecords() { ps.Lock() defer ps.Unlock() @@ -592,7 +592,7 @@ func (ps *peerScore) gcDeliveryRecords() { } // tracer interface -func (ps *peerScore) AddPeer(p peer.ID, proto protocol.ID) { +func (ps *PeerScore) AddPeer(p peer.ID, proto protocol.ID) { ps.Lock() defer ps.Unlock() @@ -608,7 +608,7 @@ func (ps *peerScore) AddPeer(p peer.ID, proto protocol.ID) { pstats.ips = ips } -func (ps *peerScore) RemovePeer(p peer.ID) { +func (ps *PeerScore) RemovePeer(p peer.ID) { ps.Lock() defer ps.Unlock() @@ -643,10 +643,10 @@ func (ps *peerScore) RemovePeer(p peer.ID) { pstats.expire = time.Now().Add(ps.params.RetainScore) } -func (ps *peerScore) Join(topic string) {} -func (ps *peerScore) Leave(topic string) {} +func (ps *PeerScore) Join(topic string) {} +func (ps *PeerScore) Leave(topic string) {} -func (ps *peerScore) Graft(p peer.ID, topic string) { +func (ps *PeerScore) Graft(p peer.ID, topic string) { ps.Lock() defer ps.Unlock() @@ -666,7 +666,7 @@ func (ps *peerScore) Graft(p peer.ID, topic string) { tstats.meshMessageDeliveriesActive = false } -func (ps *peerScore) Prune(p peer.ID, topic string) { +func (ps *PeerScore) Prune(p peer.ID, topic string) { ps.Lock() defer ps.Unlock() @@ -690,7 +690,7 @@ func (ps *peerScore) Prune(p peer.ID, topic string) { tstats.inMesh = false } -func (ps *peerScore) ValidateMessage(msg *Message) { +func (ps *PeerScore) ValidateMessage(msg *Message) { ps.Lock() defer ps.Unlock() @@ -699,7 +699,7 @@ func (ps *peerScore) ValidateMessage(msg *Message) { _ = ps.deliveries.getRecord(ps.idGen.ID(msg)) } -func (ps *peerScore) DeliverMessage(msg *Message) { +func (ps *PeerScore) DeliverMessage(msg *Message) { ps.Lock() defer ps.Unlock() @@ -725,7 +725,7 @@ func (ps *peerScore) DeliverMessage(msg *Message) { } } -func (ps *peerScore) RejectMessage(msg *Message, reason string) { +func (ps *PeerScore) RejectMessage(msg *Message, reason string) { ps.Lock() defer ps.Unlock() @@ -792,7 +792,7 @@ func (ps *peerScore) RejectMessage(msg *Message, reason string) { drec.peers = nil } -func (ps *peerScore) DuplicateMessage(msg *Message) { +func (ps *PeerScore) DuplicateMessage(msg *Message) { ps.Lock() defer ps.Unlock() @@ -826,15 +826,15 @@ func (ps *peerScore) DuplicateMessage(msg *Message) { } } -func (ps *peerScore) ThrottlePeer(p peer.ID) {} +func (ps *PeerScore) ThrottlePeer(p peer.ID) {} -func (ps *peerScore) RecvRPC(rpc *RPC) {} +func (ps *PeerScore) RecvRPC(rpc *RPC) {} -func (ps *peerScore) SendRPC(rpc *RPC, p peer.ID) {} +func (ps *PeerScore) SendRPC(rpc *RPC, p peer.ID) {} -func (ps *peerScore) DropRPC(rpc *RPC, p peer.ID) {} +func (ps *PeerScore) DropRPC(rpc *RPC, p peer.ID) {} -func (ps *peerScore) UndeliverableMessage(msg *Message) {} +func (ps *PeerScore) UndeliverableMessage(msg *Message) {} // message delivery records func (d *messageDeliveries) getRecord(id string) *deliveryRecord { @@ -898,7 +898,7 @@ func (pstats *peerStats) getTopicStats(topic string, params *PeerScoreParams) (* // markInvalidMessageDelivery increments the "invalid message deliveries" // counter for all scored topics the message is published in. -func (ps *peerScore) markInvalidMessageDelivery(p peer.ID, msg *Message) { +func (ps *PeerScore) markInvalidMessageDelivery(p peer.ID, msg *Message) { pstats, ok := ps.peerStats[p] if !ok { return @@ -916,7 +916,7 @@ func (ps *peerScore) markInvalidMessageDelivery(p peer.ID, msg *Message) { // markFirstMessageDelivery increments the "first message deliveries" counter // for all scored topics the message is published in, as well as the "mesh // message deliveries" counter, if the peer is in the mesh for the topic. -func (ps *peerScore) markFirstMessageDelivery(p peer.ID, msg *Message) { +func (ps *PeerScore) markFirstMessageDelivery(p peer.ID, msg *Message) { pstats, ok := ps.peerStats[p] if !ok { return @@ -948,7 +948,7 @@ func (ps *peerScore) markFirstMessageDelivery(p peer.ID, msg *Message) { // markDuplicateMessageDelivery increments the "mesh message deliveries" counter // for messages we've seen before, as long the message was received within the // P3 window. -func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, validated time.Time) { +func (ps *PeerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, validated time.Time) { pstats, ok := ps.peerStats[p] if !ok { return @@ -981,7 +981,7 @@ func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, valid } // getIPs gets the current IPs for a peer. -func (ps *peerScore) getIPs(p peer.ID) []string { +func (ps *PeerScore) getIPs(p peer.ID) []string { // in unit tests this can be nil if ps.host == nil { return nil @@ -1025,7 +1025,7 @@ func (ps *peerScore) getIPs(p peer.ID) []string { // setIPs adds tracking for the new IPs in the list, and removes tracking from // the obsolete IPs. -func (ps *peerScore) setIPs(p peer.ID, newips, oldips []string) { +func (ps *PeerScore) setIPs(p peer.ID, newips, oldips []string) { addNewIPs: // add the new IPs to the tracking for _, ip := range newips { @@ -1066,7 +1066,7 @@ removeOldIPs: } // removeIPs removes an IP list from the tracking list for a peer. -func (ps *peerScore) removeIPs(p peer.ID, ips []string) { +func (ps *PeerScore) removeIPs(p peer.ID, ips []string) { for _, ip := range ips { peers, ok := ps.peerIPs[ip] if !ok { diff --git a/score_test.go b/score_test.go index 6a6c6890..85979c02 100644 --- a/score_test.go +++ b/score_test.go @@ -811,7 +811,7 @@ func TestScoreBehaviourPenalty(t *testing.T) { peerA := peer.ID("A") - var ps *peerScore + var ps *PeerScore // first check AddPenalty on a nil peerScore ps.AddPenalty(peerA, 1) @@ -1069,7 +1069,7 @@ func withinVariance(score float64, expected float64, variance float64) bool { } // hack to set IPs for a peer without having to spin up real hosts with shared IPs -func setIPsForPeer(t *testing.T, ps *peerScore, p peer.ID, ips ...string) { +func setIPsForPeer(t *testing.T, ps *PeerScore, p peer.ID, ips ...string) { t.Helper() ps.setIPs(p, ips, []string{}) pstats, ok := ps.peerStats[p] diff --git a/tag_tracer.go b/tag_tracer.go index d813c188..305b6637 100644 --- a/tag_tracer.go +++ b/tag_tracer.go @@ -30,7 +30,7 @@ var ( GossipSubConnTagMessageDeliveryCap = 15 ) -// tagTracer is an internal tracer that applies connection manager tags to peer +// TagTracer is an internal tracer that applies connection manager tags to peer // connections based on their behavior. // // We tag a peer's connections for the following reasons: @@ -41,7 +41,7 @@ var ( // first. // The delivery tags have a maximum value, GossipSubConnTagMessageDeliveryCap, and they decay at // a rate of GossipSubConnTagDecayAmount / GossipSubConnTagDecayInterval. -type tagTracer struct { +type TagTracer struct { sync.RWMutex cmgr connmgr.ConnManager @@ -55,12 +55,12 @@ type tagTracer struct { nearFirst map[string]map[peer.ID]struct{} } -func newTagTracer(cmgr connmgr.ConnManager) *tagTracer { +func newTagTracer(cmgr connmgr.ConnManager) *TagTracer { decayer, ok := connmgr.SupportsDecay(cmgr) if !ok { log.Debugf("connection manager does not support decaying tags, delivery tags will not be applied") } - return &tagTracer{ + return &TagTracer{ cmgr: cmgr, idGen: newMsgIdGenerator(), decayer: decayer, @@ -69,7 +69,7 @@ func newTagTracer(cmgr connmgr.ConnManager) *tagTracer { } } -func (t *tagTracer) Start(gs *GossipSubRouter) { +func (t *TagTracer) Start(gs *GossipSubRouter) { if t == nil { return } @@ -78,7 +78,7 @@ func (t *tagTracer) Start(gs *GossipSubRouter) { t.direct = gs.direct } -func (t *tagTracer) tagPeerIfDirect(p peer.ID) { +func (t *TagTracer) tagPeerIfDirect(p peer.ID) { if t.direct == nil { return } @@ -90,12 +90,12 @@ func (t *tagTracer) tagPeerIfDirect(p peer.ID) { } } -func (t *tagTracer) tagMeshPeer(p peer.ID, topic string) { +func (t *TagTracer) tagMeshPeer(p peer.ID, topic string) { tag := topicTag(topic) t.cmgr.Protect(p, tag) } -func (t *tagTracer) untagMeshPeer(p peer.ID, topic string) { +func (t *TagTracer) untagMeshPeer(p peer.ID, topic string) { tag := topicTag(topic) t.cmgr.Unprotect(p, tag) } @@ -104,7 +104,7 @@ func topicTag(topic string) string { return fmt.Sprintf("pubsub:%s", topic) } -func (t *tagTracer) addDeliveryTag(topic string) { +func (t *TagTracer) addDeliveryTag(topic string) { if t.decayer == nil { return } @@ -125,7 +125,7 @@ func (t *tagTracer) addDeliveryTag(topic string) { t.decaying[topic] = tag } -func (t *tagTracer) removeDeliveryTag(topic string) { +func (t *TagTracer) removeDeliveryTag(topic string) { t.Lock() defer t.Unlock() tag, ok := t.decaying[topic] @@ -139,7 +139,7 @@ func (t *tagTracer) removeDeliveryTag(topic string) { delete(t.decaying, topic) } -func (t *tagTracer) bumpDeliveryTag(p peer.ID, topic string) error { +func (t *TagTracer) bumpDeliveryTag(p peer.ID, topic string) error { t.RLock() defer t.RUnlock() @@ -150,7 +150,7 @@ func (t *tagTracer) bumpDeliveryTag(p peer.ID, topic string) error { return tag.Bump(p, GossipSubConnTagBumpMessageDelivery) } -func (t *tagTracer) bumpTagsForMessage(p peer.ID, msg *Message) { +func (t *TagTracer) bumpTagsForMessage(p peer.ID, msg *Message) { topic := msg.GetTopic() err := t.bumpDeliveryTag(p, topic) if err != nil { @@ -159,7 +159,7 @@ func (t *tagTracer) bumpTagsForMessage(p peer.ID, msg *Message) { } // nearFirstPeers returns the peers who delivered the message while it was still validating -func (t *tagTracer) nearFirstPeers(msg *Message) []peer.ID { +func (t *TagTracer) nearFirstPeers(msg *Message) []peer.ID { t.Lock() defer t.Unlock() peersMap, ok := t.nearFirst[t.idGen.ID(msg)] @@ -174,17 +174,17 @@ func (t *tagTracer) nearFirstPeers(msg *Message) []peer.ID { } // -- RawTracer interface methods -var _ RawTracer = (*tagTracer)(nil) +var _ RawTracer = (*TagTracer)(nil) -func (t *tagTracer) AddPeer(p peer.ID, proto protocol.ID) { +func (t *TagTracer) AddPeer(p peer.ID, proto protocol.ID) { t.tagPeerIfDirect(p) } -func (t *tagTracer) Join(topic string) { +func (t *TagTracer) Join(topic string) { t.addDeliveryTag(topic) } -func (t *tagTracer) DeliverMessage(msg *Message) { +func (t *TagTracer) DeliverMessage(msg *Message) { nearFirst := t.nearFirstPeers(msg) t.bumpTagsForMessage(msg.ReceivedFrom, msg) @@ -198,19 +198,19 @@ func (t *tagTracer) DeliverMessage(msg *Message) { t.Unlock() } -func (t *tagTracer) Leave(topic string) { +func (t *TagTracer) Leave(topic string) { t.removeDeliveryTag(topic) } -func (t *tagTracer) Graft(p peer.ID, topic string) { +func (t *TagTracer) Graft(p peer.ID, topic string) { t.tagMeshPeer(p, topic) } -func (t *tagTracer) Prune(p peer.ID, topic string) { +func (t *TagTracer) Prune(p peer.ID, topic string) { t.untagMeshPeer(p, topic) } -func (t *tagTracer) ValidateMessage(msg *Message) { +func (t *TagTracer) ValidateMessage(msg *Message) { t.Lock() defer t.Unlock() @@ -222,7 +222,7 @@ func (t *tagTracer) ValidateMessage(msg *Message) { t.nearFirst[id] = make(map[peer.ID]struct{}) } -func (t *tagTracer) DuplicateMessage(msg *Message) { +func (t *TagTracer) DuplicateMessage(msg *Message) { t.Lock() defer t.Unlock() @@ -234,7 +234,7 @@ func (t *tagTracer) DuplicateMessage(msg *Message) { peers[msg.ReceivedFrom] = struct{}{} } -func (t *tagTracer) RejectMessage(msg *Message, reason string) { +func (t *TagTracer) RejectMessage(msg *Message, reason string) { t.Lock() defer t.Unlock() @@ -251,9 +251,9 @@ func (t *tagTracer) RejectMessage(msg *Message, reason string) { } } -func (t *tagTracer) RemovePeer(peer.ID) {} -func (t *tagTracer) ThrottlePeer(p peer.ID) {} -func (t *tagTracer) RecvRPC(rpc *RPC) {} -func (t *tagTracer) SendRPC(rpc *RPC, p peer.ID) {} -func (t *tagTracer) DropRPC(rpc *RPC, p peer.ID) {} -func (t *tagTracer) UndeliverableMessage(msg *Message) {} +func (t *TagTracer) RemovePeer(peer.ID) {} +func (t *TagTracer) ThrottlePeer(p peer.ID) {} +func (t *TagTracer) RecvRPC(rpc *RPC) {} +func (t *TagTracer) SendRPC(rpc *RPC, p peer.ID) {} +func (t *TagTracer) DropRPC(rpc *RPC, p peer.ID) {} +func (t *TagTracer) UndeliverableMessage(msg *Message) {} diff --git a/topic.go b/topic.go index c08b081b..de866383 100644 --- a/topic.go +++ b/topic.go @@ -56,18 +56,18 @@ func (t *Topic) SetScoreParams(p *TopicScoreParams) error { result := make(chan error, 1) update := func() { - gs, ok := t.p.rt.(*GossipSubRouter) + gs, ok := t.p.rt.(GossipPubSubRouter) if !ok { result <- fmt.Errorf("pubsub router is not gossipsub") return } - if gs.score == nil { + if gs.GetPeerScore() == nil { result <- fmt.Errorf("peer scoring is not enabled in router") return } - err := gs.score.SetTopicScoreParams(t.topic, p) + err := gs.GetPeerScore().SetTopicScoreParams(t.topic, p) result <- err } diff --git a/trace_test.go b/trace_test.go index fb8cb56d..5a16d55f 100644 --- a/trace_test.go +++ b/trace_test.go @@ -28,10 +28,10 @@ func testWithTracer(t *testing.T, tracer EventTracer) { defer cancel() hosts := getNetHosts(t, ctx, 20) - psubs := getGossipsubs(ctx, hosts, + routers := getGossipSubRouters(hosts, WithPeerExchange(true)) + psubs := getGossipSubsWithRouters(ctx, hosts, routers, WithEventTracer(tracer), // to bootstrap from star topology - WithPeerExchange(true), // to exercise the score paths in the tracer WithPeerScore( &PeerScoreParams{