Skip to content

Commit

Permalink
[Networking] Handling iHave overpromising part-1 (#4556)
Browse files Browse the repository at this point in the history
* adds scoring parameters for mesh message delivery

* refactors config-based approach to override-based approach

* generates mocks

* adds a documentation

* extends godoc

* adds test skeleton

* fixes test

* implements under-performing test

* adds test for under-delivery in two topics

* refactors test fixture

* wip

* err fix

* applies refactoring

* fixes test

* parallelize the fixture

* adds logs for overriding parameters

* adds logging for network type to builder

* fmt

* renames a fixture function

* adds warn logging for overriding score parameters

* revises godocs

* adds readme

* fixes scoring test

* fixing lint

* lint fix

* improves test quality

* Update network/p2p/builder.go

Co-authored-by: Tarak Ben Youssef <50252200+tarakby@users.noreply.github.com>

---------

Co-authored-by: Tarak Ben Youssef <50252200+tarakby@users.noreply.github.com>
  • Loading branch information
yhassanzadeh13 and tarakby authored Jul 18, 2023
1 parent e3411ef commit e7dc893
Show file tree
Hide file tree
Showing 21 changed files with 857 additions and 186 deletions.
6 changes: 3 additions & 3 deletions insecure/corruptlibp2p/spam_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func TestSpam_IHave(t *testing.T) {
// this is vital as the spammer will circumvent the normal pubsub subscription mechanism and send iHAVE messages directly to the victim.
// without a prior connection established, directly spamming pubsub messages may cause a race condition in the pubsub implementation.
p2ptest.TryConnectionAndEnsureConnected(t, ctx, nodes)
p2ptest.EnsurePubsubMessageExchange(t, ctx, nodes, func() (interface{}, channels.Topic) {
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkId)
return unittest.ProposalFixture(), blockTopic
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkId)
p2ptest.EnsurePubsubMessageExchange(t, ctx, nodes, blockTopic, 1, func() interface{} {
return unittest.ProposalFixture()
})

// prepare to spam - generate iHAVE control messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ func startNodesAndEnsureConnected(t *testing.T, ctx irrecoverable.SignalerContex
// this is vital as the spammer will circumvent the normal pubsub subscription mechanism and send iHAVE messages directly to the victim.
// without a prior connection established, directly spamming pubsub messages may cause a race condition in the pubsub implementation.
p2ptest.TryConnectionAndEnsureConnected(t, ctx, nodes)
p2ptest.EnsurePubsubMessageExchange(t, ctx, nodes, func() (interface{}, channels.Topic) {
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkID)
return unittest.ProposalFixture(), blockTopic
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkID)
p2ptest.EnsurePubsubMessageExchange(t, ctx, nodes, blockTopic, 1, func() interface{} {
return unittest.ProposalFixture()
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,7 @@ func TestGossipSubSpamMitigationIntegration(t *testing.T) {
t.Name(),
idProvider,
p2ptest.WithRole(flow.RoleConsensus),
p2ptest.WithPeerScoringEnabled(idProvider),
p2ptest.EnablePeerScoringWithOverride(p2p.PeerScoringConfigNoOverride),
)

ids := flow.IdentityList{&victimId, &spammer.SpammerId}
Expand Down Expand Up @@ -1196,9 +1196,9 @@ func TestGossipSubSpamMitigationIntegration(t *testing.T) {
p2ptest.LetNodesDiscoverEachOther(t, ctx, nodes, ids)

// as nodes started fresh and no spamming has happened yet, the nodes should be able to exchange messages on the topic.
p2ptest.EnsurePubsubMessageExchange(t, ctx, nodes, func() (interface{}, channels.Topic) {
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkID)
return unittest.ProposalFixture(), blockTopic
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkID)
p2ptest.EnsurePubsubMessageExchange(t, ctx, nodes, blockTopic, 1, func() interface{} {
return unittest.ProposalFixture()
})

// prepares spam graft and prune messages with different strategies.
Expand Down Expand Up @@ -1228,9 +1228,8 @@ func TestGossipSubSpamMitigationIntegration(t *testing.T) {

// now we expect the detection and mitigation to kick in and the victim node to disconnect from the spammer node.
// so the spammer and victim nodes should not be able to exchange messages on the topic.
p2ptest.EnsureNoPubsubExchangeBetweenGroups(t, ctx, []p2p.LibP2PNode{victimNode}, []p2p.LibP2PNode{spammer.SpammerNode}, func() (interface{}, channels.Topic) {
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkID)
return unittest.ProposalFixture(), blockTopic
p2ptest.EnsureNoPubsubExchangeBetweenGroups(t, ctx, []p2p.LibP2PNode{victimNode}, []p2p.LibP2PNode{spammer.SpammerNode}, blockTopic, 1, func() interface{} {
return unittest.ProposalFixture()
})
}

Expand Down
375 changes: 370 additions & 5 deletions insecure/integration/functional/test/gossipsub/scoring/scoring_test.go

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ import (
// and private (i.e., staked) networks.
type NetworkingType uint8

func (t NetworkingType) String() string {
switch t {
case PrivateNetwork:
return "private"
case PublicNetwork:
return "public"
default:
return "unknown"
}
}

const (
// PrivateNetwork indicates that the staked private-side of the Flow blockchain that nodes can only join and leave
// with a staking requirement.
Expand Down
64 changes: 43 additions & 21 deletions network/p2p/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ type GossipSubAdapterConfigFunc func(*BasePubSubAdapterConfig) PubSubAdapterConf

// GossipSubBuilder provides a builder pattern for creating a GossipSub pubsub system.
type GossipSubBuilder interface {
PeerScoringBuilder
// SetHost sets the host of the builder.
// If the host has already been set, a fatal error is logged.
SetHost(host.Host)
Expand All @@ -45,9 +44,16 @@ type GossipSubBuilder interface {
// We expect the node to initialize with a default gossipsub config. Hence, this function overrides the default config.
SetGossipSubConfigFunc(GossipSubAdapterConfigFunc)

// SetGossipSubPeerScoring sets the gossipsub peer scoring of the builder.
// If the gossipsub peer scoring flag has already been set, a fatal error is logged.
SetGossipSubPeerScoring(bool)
// EnableGossipSubScoringWithOverride enables peer scoring for the GossipSub pubsub system with the given override.
// Any existing peer scoring config attribute that is set in the override will override the default peer scoring config.
// Anything that is left to nil or zero value in the override will be ignored and the default value will be used.
// Note: it is not recommended to override the default peer scoring config in production unless you know what you are doing.
// Production Tip: use PeerScoringConfigNoOverride as the argument to this function to enable peer scoring without any override.
// Args:
// - PeerScoringConfigOverride: override for the peer scoring config- Recommended to use PeerScoringConfigNoOverride for production.
// Returns:
// none
EnableGossipSubScoringWithOverride(*PeerScoringConfigOverride)

// SetGossipSubScoreTracerInterval sets the gossipsub score tracer interval of the builder.
// If the gossipsub score tracer interval has already been set, a fatal error is logged.
Expand Down Expand Up @@ -81,16 +87,6 @@ type GossipSubBuilder interface {
Build(irrecoverable.SignalerContext) (PubSubAdapter, error)
}

type PeerScoringBuilder interface {
// SetTopicScoreParams sets the topic score parameters for the given topic.
// If the topic score parameters have already been set for the given topic, it is overwritten.
SetTopicScoreParams(topic channels.Topic, topicScoreParams *pubsub.TopicScoreParams)

// SetAppSpecificScoreParams sets the application specific score parameters for the given topic.
// If the application specific score parameters have already been set for the given topic, it is overwritten.
SetAppSpecificScoreParams(func(peer.ID) float64)
}

// GossipSubRpcInspectorSuiteFactoryFunc is a function that creates a new RPC inspector suite. It is used to create
// RPC inspectors for the gossipsub protocol. The RPC inspectors are used to inspect and validate
// incoming RPC messages before they are processed by the gossipsub protocol.
Expand Down Expand Up @@ -123,11 +119,16 @@ type NodeBuilder interface {
SetConnectionGater(ConnectionGater) NodeBuilder
SetRoutingSystem(func(context.Context, host.Host) (routing.Routing, error)) NodeBuilder

// EnableGossipSubPeerScoring enables peer scoring for the GossipSub pubsub system.
// Arguments:
// - module.IdentityProvider: the identity provider for the node (must be set before calling this method).
// - *PeerScoringConfig: the peer scoring configuration for the GossipSub pubsub system. If nil, the default configuration is used.
EnableGossipSubPeerScoring(*PeerScoringConfig) NodeBuilder
// EnableGossipSubScoringWithOverride enables peer scoring for the GossipSub pubsub system with the given override.
// Any existing peer scoring config attribute that is set in the override will override the default peer scoring config.
// Anything that is left to nil or zero value in the override will be ignored and the default value will be used.
// Note: it is not recommended to override the default peer scoring config in production unless you know what you are doing.
// Production Tip: use PeerScoringConfigNoOverride as the argument to this function to enable peer scoring without any override.
// Args:
// - PeerScoringConfigOverride: override for the peer scoring config- Recommended to use PeerScoringConfigNoOverride for production.
// Returns:
// none
EnableGossipSubScoringWithOverride(*PeerScoringConfigOverride) NodeBuilder
SetCreateNode(CreateNodeFunc) NodeBuilder
SetGossipSubFactory(GossipSubFactoryFunc, GossipSubAdapterConfigFunc) NodeBuilder
SetStreamCreationRetryInterval(time.Duration) NodeBuilder
Expand All @@ -138,10 +139,31 @@ type NodeBuilder interface {
Build() (LibP2PNode, error)
}

// PeerScoringConfig is a configuration for peer scoring parameters for a GossipSub pubsub system.
type PeerScoringConfig struct {
// PeerScoringConfigOverride is a structure that is used to carry over the override values for peer scoring configuration.
// Any attribute that is set in the override will override the default peer scoring config.
// Typically, we are not recommending to override the default peer scoring config in production unless you know what you are doing.
type PeerScoringConfigOverride struct {
// TopicScoreParams is a map of topic score parameters for each topic.
// Override criteria: any topic (i.e., key in the map) will override the default topic score parameters for that topic and
// the corresponding value in the map will be used instead of the default value.
// If you don't want to override topic score params for a given topic, simply don't include that topic in the map.
// If the map is nil, the default topic score parameters are used for all topics.
TopicScoreParams map[channels.Topic]*pubsub.TopicScoreParams

// AppSpecificScoreParams is a function that returns the application specific score parameters for a given peer.
// Override criteria: if the function is not nil, it will override the default application specific score parameters.
// If the function is nil, the default application specific score parameters are used.
AppSpecificScoreParams func(peer.ID) float64

// DecayInterval is the interval over which we decay the effect of past behavior, so that
// a good or bad behavior will not have a permanent effect on the penalty. It is also the interval
// that GossipSub uses to refresh the scores of all peers.
// Override criteria: if the value is not zero, it will override the default decay interval.
// If the value is zero, the default decay interval is used.
DecayInterval time.Duration
}

// PeerScoringConfigNoOverride is a default peer scoring configuration for a GossipSub pubsub system.
// It is set to nil, which means that no override is done to the default peer scoring configuration.
// It is the recommended way to use the default peer scoring configuration.
var PeerScoringConfigNoOverride = (*PeerScoringConfigOverride)(nil)
13 changes: 7 additions & 6 deletions network/p2p/connection/connection_gater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,19 +396,20 @@ func TestConnectionGater_Disallow_Integration(t *testing.T) {
func ensureCommunicationSilenceAmongGroups(t *testing.T, ctx context.Context, sporkId flow.Identifier, groupA []p2p.LibP2PNode, groupB []p2p.LibP2PNode) {
// ensures no connection, unicast, or pubsub going to the disallow-listed nodes
p2ptest.EnsureNotConnectedBetweenGroups(t, ctx, groupA, groupB)
p2ptest.EnsureNoPubsubExchangeBetweenGroups(t, ctx, groupA, groupB, func() (interface{}, channels.Topic) {
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkId)
return unittest.ProposalFixture(), blockTopic

blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkId)
p2ptest.EnsureNoPubsubExchangeBetweenGroups(t, ctx, groupA, groupB, blockTopic, 1, func() interface{} {
return unittest.ProposalFixture()
})
p2pfixtures.EnsureNoStreamCreationBetweenGroups(t, ctx, groupA, groupB)
}

// ensureCommunicationOverAllProtocols ensures that all nodes are connected to each other, and they can exchange messages over the pubsub and unicast.
func ensureCommunicationOverAllProtocols(t *testing.T, ctx context.Context, sporkId flow.Identifier, nodes []p2p.LibP2PNode, inbounds []chan string) {
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkId)
p2ptest.TryConnectionAndEnsureConnected(t, ctx, nodes)
p2ptest.EnsurePubsubMessageExchange(t, ctx, nodes, func() (interface{}, channels.Topic) {
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkId)
return unittest.ProposalFixture(), blockTopic
p2ptest.EnsurePubsubMessageExchange(t, ctx, nodes, blockTopic, 1, func() interface{} {
return unittest.ProposalFixture()
})
p2pfixtures.EnsureMessageExchangeOverUnicast(t, ctx, nodes, inbounds, p2pfixtures.LongStringMessageFactoryFixture(t))
}
2 changes: 1 addition & 1 deletion network/p2p/connection/peerManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type PeerManager struct {
// and it uses the connector to actually connect or disconnect from peers.
func NewPeerManager(logger zerolog.Logger, updateInterval time.Duration, connector p2p.PeerUpdater) *PeerManager {
pm := &PeerManager{
logger: logger,
logger: logger.With().Str("component", "peer-manager").Logger(),
connector: connector,
peerRequestQ: make(chan struct{}, 1),
peerUpdateInterval: updateInterval,
Expand Down
22 changes: 4 additions & 18 deletions network/p2p/mock/gossip_sub_builder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions network/p2p/mock/node_builder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e7dc893

Please sign in to comment.