Skip to content

Commit

Permalink
Merge #4511
Browse files Browse the repository at this point in the history
4511: [Networking] Sets GossipSub RPC Inspector mandatory default r=yhassanzadeh13 a=yhassanzadeh13

This PR improves how the libp2p node, a component involved in network communication, is structured. Before the update, the RPC inspector suite, a vital tool for logging and monitoring network communication, was an optional add-on to the libp2p node. Now, it's a mandatory part of the libp2p node.

What does this mean? The RPC inspector suite helps keep an eye on the network communications, ensuring they're healthy and secure. It's particularly useful for logging data and investigating suspicious activities. With the changes made, instead of setting a specific RPC inspector suite, you can replace its underlying mechanism (a factory). If you don’t provide one, don't worry – the system will use a default one.

Another key change is organization and simplification. Before, the libp2p node and its components were intertwined complicatedly. The RPC inspector suite was closely connected to many other parts, which made the system harder to work with. This update neatly packs the RPC inspector suite inside the gossipsub builder. This, in turn, is neatly packed inside the libp2p node. This new arrangement makes the code cleaner and easier to understand and maintain.

Additionally, this update lays the foundation that the libp2p node itself listens for important protocol-level events (i.e., cluster change) and passes the information to the necessary components. This makes things more efficient and takes some burden off the Flow node builder.

Co-authored-by: Yahya Hassanzadeh <yhassanzadeh13@ku.edu.tr>
Co-authored-by: Yahya Hassanzadeh <yhassanzadeh@ieee.org>
  • Loading branch information
3 people authored Jun 29, 2023
2 parents f5d3aec + a20627f commit 487981d
Show file tree
Hide file tree
Showing 53 changed files with 813 additions and 651 deletions.
22 changes: 7 additions & 15 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ import (
"github.com/onflow/flow-go/network/p2p/middleware"
"github.com/onflow/flow-go/network/p2p/p2pbuilder"
p2pconfig "github.com/onflow/flow-go/network/p2p/p2pbuilder/config"
"github.com/onflow/flow-go/network/p2p/p2pbuilder/inspector"
"github.com/onflow/flow-go/network/p2p/subscription"
"github.com/onflow/flow-go/network/p2p/tracer"
"github.com/onflow/flow-go/network/p2p/translator"
Expand Down Expand Up @@ -1199,25 +1198,19 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri
builder.IdentityProvider,
builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval)

// setup RPC inspectors
rpcInspectorBuilder := inspector.NewGossipSubInspectorBuilder(builder.Logger, builder.SporkID, &builder.FlowConfig.NetworkConfig.GossipSubConfig.GossipSubRPCInspectorsConfig, builder.IdentityProvider, builder.Metrics.Network)
rpcInspectorSuite, err := rpcInspectorBuilder.
SetNetworkType(network.PublicNetwork).
SetMetrics(&p2pconfig.MetricsConfig{
HeroCacheFactory: builder.HeroCacheMetricsFactory(),
Metrics: builder.Metrics.Network,
}).Build()
if err != nil {
return nil, fmt.Errorf("failed to create gossipsub rpc inspectors for access node: %w", err)
}

libp2pNode, err := p2pbuilder.NewNodeBuilder(
builder.Logger,
networkMetrics,
&p2pconfig.MetricsConfig{
HeroCacheFactory: builder.HeroCacheMetricsFactory(),
Metrics: networkMetrics,
},
network.PublicNetwork,
bindAddress,
networkKey,
builder.SporkID,
builder.IdentityProvider,
&builder.FlowConfig.NetworkConfig.ResourceManagerConfig,
&builder.FlowConfig.NetworkConfig.GossipSubConfig.GossipSubRPCInspectorsConfig,
&p2p.DisallowListCacheConfig{
MaxSize: builder.FlowConfig.NetworkConfig.DisallowListNotificationCacheSize,
Metrics: metrics.DisallowListCacheMetricsFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
Expand All @@ -1244,7 +1237,6 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri
SetStreamCreationRetryInterval(builder.FlowConfig.NetworkConfig.UnicastCreateStreamRetryDelay).
SetGossipSubTracer(meshTracer).
SetGossipSubScoreTracerInterval(builder.FlowConfig.NetworkConfig.GossipSubConfig.ScoreTracerInterval).
SetGossipSubRpcInspectorSuite(rpcInspectorSuite).
Build()

if err != nil {
Expand Down
9 changes: 1 addition & 8 deletions cmd/collection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/onflow/flow-go/module/mempool/queue"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/network/channels"
"github.com/onflow/flow-go/network/p2p"
"github.com/onflow/flow-go/state/protocol"
badgerState "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/blocktimer"
Expand Down Expand Up @@ -593,13 +592,7 @@ func main() {

// register the manager for protocol events
node.ProtocolEvents.AddConsumer(manager)

for _, rpcInspector := range node.GossipSubRpcInspectorSuite.Inspectors() {
if r, ok := rpcInspector.(p2p.GossipSubMsgValidationRpcInspector); ok {
clusterEvents.AddConsumer(r)
}
}

clusterEvents.AddConsumer(node.LibP2PNode)
return manager, err
})

Expand Down
3 changes: 0 additions & 3 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,6 @@ type NodeConfig struct {

// UnicastRateLimiterDistributor notifies consumers when a peer's unicast message is rate limited.
UnicastRateLimiterDistributor p2p.UnicastRateLimiterDistributor

// GossipSubRpcInspectorSuite rpc inspector suite.
GossipSubRpcInspectorSuite p2p.GossipSubInspectorSuite
}

// StateExcerptAtBoot stores information about the root snapshot and latest finalized block for use in bootstrapping.
Expand Down
20 changes: 7 additions & 13 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ import (
"github.com/onflow/flow-go/network/p2p/middleware"
"github.com/onflow/flow-go/network/p2p/p2pbuilder"
p2pconfig "github.com/onflow/flow-go/network/p2p/p2pbuilder/config"
"github.com/onflow/flow-go/network/p2p/p2pbuilder/inspector"
"github.com/onflow/flow-go/network/p2p/subscription"
"github.com/onflow/flow-go/network/p2p/tracer"
"github.com/onflow/flow-go/network/p2p/translator"
Expand Down Expand Up @@ -710,23 +709,19 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
builder.IdentityProvider,
builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval)

rpcInspectorSuite, err := inspector.NewGossipSubInspectorBuilder(builder.Logger, builder.SporkID, &builder.FlowConfig.NetworkConfig.GossipSubConfig.GossipSubRPCInspectorsConfig, builder.IdentityProvider, builder.Metrics.Network).
SetNetworkType(network.PublicNetwork).
SetMetrics(&p2pconfig.MetricsConfig{
HeroCacheFactory: builder.HeroCacheMetricsFactory(),
Metrics: builder.Metrics.Network,
}).Build()
if err != nil {
return nil, fmt.Errorf("could not initialize gossipsub inspectors for observer node: %w", err)
}

node, err := p2pbuilder.NewNodeBuilder(
builder.Logger,
builder.Metrics.Network,
&p2pconfig.MetricsConfig{
HeroCacheFactory: builder.HeroCacheMetricsFactory(),
Metrics: builder.Metrics.Network,
},
network.PublicNetwork,
builder.BaseConfig.BindAddr,
networkKey,
builder.SporkID,
builder.IdentityProvider,
&builder.FlowConfig.NetworkConfig.ResourceManagerConfig,
&builder.FlowConfig.NetworkConfig.GossipSubConfig.GossipSubRPCInspectorsConfig,
&p2p.DisallowListCacheConfig{
MaxSize: builder.FlowConfig.NetworkConfig.DisallowListNotificationCacheSize,
Metrics: metrics.DisallowListCacheMetricsFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
Expand All @@ -747,7 +742,6 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
SetStreamCreationRetryInterval(builder.FlowConfig.NetworkConfig.UnicastCreateStreamRetryDelay).
SetGossipSubTracer(meshTracer).
SetGossipSubScoreTracerInterval(builder.FlowConfig.NetworkConfig.GossipSubConfig.ScoreTracerInterval).
SetGossipSubRpcInspectorSuite(rpcInspectorSuite).
Build()

if err != nil {
Expand Down
24 changes: 6 additions & 18 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ import (
"github.com/onflow/flow-go/network/p2p/middleware"
"github.com/onflow/flow-go/network/p2p/p2pbuilder"
p2pconfig "github.com/onflow/flow-go/network/p2p/p2pbuilder/config"
"github.com/onflow/flow-go/network/p2p/p2pbuilder/inspector"
"github.com/onflow/flow-go/network/p2p/ping"
"github.com/onflow/flow-go/network/p2p/subscription"
"github.com/onflow/flow-go/network/p2p/unicast/protocols"
Expand Down Expand Up @@ -344,34 +343,23 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit() {
myAddr = fnb.BaseConfig.BindAddr
}

metricsCfg := &p2pconfig.MetricsConfig{
Metrics: fnb.Metrics.Network,
HeroCacheFactory: fnb.HeroCacheMetricsFactory(),
}

rpcInspectorSuite, err := inspector.NewGossipSubInspectorBuilder(fnb.Logger, fnb.SporkID, &fnb.FlowConfig.NetworkConfig.GossipSubConfig.GossipSubRPCInspectorsConfig, fnb.IdentityProvider, fnb.Metrics.Network).
SetNetworkType(network.PrivateNetwork).
SetMetrics(metricsCfg).
Build()
if err != nil {
return nil, fmt.Errorf("failed to create gossipsub rpc inspectors for default libp2p node: %w", err)
}

fnb.GossipSubRpcInspectorSuite = rpcInspectorSuite

builder, err := p2pbuilder.DefaultNodeBuilder(
fnb.Logger,
myAddr,
network.PrivateNetwork,
fnb.NetworkKey,
fnb.SporkID,
fnb.IdentityProvider,
metricsCfg,
&p2pconfig.MetricsConfig{
Metrics: fnb.Metrics.Network,
HeroCacheFactory: fnb.HeroCacheMetricsFactory(),
},
fnb.Resolver,
fnb.BaseConfig.NodeRole,
connGaterCfg,
peerManagerCfg,
&fnb.FlowConfig.NetworkConfig.GossipSubConfig,
fnb.GossipSubRpcInspectorSuite,
&fnb.FlowConfig.NetworkConfig.GossipSubRPCInspectorsConfig,
&fnb.FlowConfig.NetworkConfig.ResourceManagerConfig,
uniCfg,
&fnb.FlowConfig.NetworkConfig.ConnectionManagerConfig,
Expand Down
2 changes: 1 addition & 1 deletion config/base_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package config
import (
"github.com/spf13/pflag"

netconf "github.com/onflow/flow-go/config/network"
"github.com/onflow/flow-go/network/netconf"
)

const (
Expand Down
39 changes: 36 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/spf13/pflag"
"github.com/spf13/viper"

"github.com/onflow/flow-go/config/network"
"github.com/onflow/flow-go/network/netconf"
)

var (
Expand All @@ -28,7 +28,7 @@ var (
type FlowConfig struct {
// ConfigFile used to set a path to a config.yml file used to override the default-config.yml file.
ConfigFile string `validate:"filepath" mapstructure:"config-file"`
NetworkConfig *network.Config `mapstructure:"network-config"`
NetworkConfig *netconf.Config `mapstructure:"network-config"`
}

// Validate checks validity of the Flow config. Errors indicate that either the configuration is broken,
Expand Down Expand Up @@ -184,12 +184,45 @@ func LogConfig(logger *zerolog.Event, flags *pflag.FlagSet) map[string]struct{}
// keys do not match the CLI flags 1:1. ie: networking-connection-pruning -> network-config.networking-connection-pruning. After aliases
// are set the conf store will override values with any CLI flag values that are set as expected.
func setAliases() {
err := network.SetAliases(conf)
err := SetAliases(conf)
if err != nil {
panic(fmt.Errorf("failed to set network aliases: %w", err))
}
}

// SetAliases this func sets an aliases for each CLI flag defined for network config overrides to it's corresponding
// full key in the viper config store. This is required because in our config.yml file all configuration values for the
// Flow network are stored one level down on the network-config property. When the default config is bootstrapped viper will
// store these values with the "network-config." prefix on the config key, because we do not want to use CLI flags like --network-config.networking-connection-pruning
// to override default values we instead use cleans flags like --networking-connection-pruning and create an alias from networking-connection-pruning -> network-config.networking-connection-pruning
// to ensure overrides happen as expected.
// Args:
// *viper.Viper: instance of the viper store to register network config aliases on.
// Returns:
// error: if a flag does not have a corresponding key in the viper store.
func SetAliases(conf *viper.Viper) error {
m := make(map[string]string)
// create map of key -> full pathkey
// ie: "networking-connection-pruning" -> "network-config.networking-connection-pruning"
for _, key := range conf.AllKeys() {
s := strings.Split(key, ".")
// check len of s, we expect all network keys to have a single prefix "network-config"
// s should always contain only 2 elements
if len(s) == 2 {
m[s[1]] = key
}
}
// each flag name should correspond to exactly one key in our config store after it is loaded with the default config
for _, flagName := range netconf.AllFlagNames() {
fullKey, ok := m[flagName]
if !ok {
return fmt.Errorf("invalid network configuration missing configuration key flag name %s check config file and cli flags", flagName)
}
conf.RegisterAlias(fullKey, flagName)
}
return nil
}

// overrideConfigFile overrides the default config file by reading in the config file at the path set
// by the --config-file flag in our viper config store.
//
Expand Down
20 changes: 7 additions & 13 deletions follower/follower_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"github.com/onflow/flow-go/network/p2p/middleware"
"github.com/onflow/flow-go/network/p2p/p2pbuilder"
p2pconfig "github.com/onflow/flow-go/network/p2p/p2pbuilder/config"
"github.com/onflow/flow-go/network/p2p/p2pbuilder/inspector"
"github.com/onflow/flow-go/network/p2p/subscription"
"github.com/onflow/flow-go/network/p2p/tracer"
"github.com/onflow/flow-go/network/p2p/translator"
Expand Down Expand Up @@ -611,23 +610,20 @@ func (builder *FollowerServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
builder.Metrics.Network,
builder.IdentityProvider,
builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval)
rpcInspectorSuite, err := inspector.NewGossipSubInspectorBuilder(builder.Logger, builder.SporkID, &builder.FlowConfig.NetworkConfig.GossipSubConfig.GossipSubRPCInspectorsConfig, builder.IdentityProvider, builder.Metrics.Network).
SetNetworkType(network.PublicNetwork).
SetMetrics(&p2pconfig.MetricsConfig{
HeroCacheFactory: builder.HeroCacheMetricsFactory(),
Metrics: builder.Metrics.Network,
}).Build()
if err != nil {
return nil, fmt.Errorf("failed to create gossipsub rpc inspectors for public libp2p node: %w", err)
}

node, err := p2pbuilder.NewNodeBuilder(
builder.Logger,
builder.Metrics.Network,
&p2pconfig.MetricsConfig{
HeroCacheFactory: builder.HeroCacheMetricsFactory(),
Metrics: builder.Metrics.Network,
},
network.PublicNetwork,
builder.BaseConfig.BindAddr,
networkKey,
builder.SporkID,
builder.IdentityProvider,
&builder.FlowConfig.NetworkConfig.ResourceManagerConfig,
&builder.FlowConfig.NetworkConfig.GossipSubConfig.GossipSubRPCInspectorsConfig,
&p2p.DisallowListCacheConfig{
MaxSize: builder.FlowConfig.NetworkConfig.DisallowListNotificationCacheSize,
Metrics: metrics.DisallowListCacheMetricsFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
Expand All @@ -648,9 +644,7 @@ func (builder *FollowerServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
SetStreamCreationRetryInterval(builder.FlowConfig.NetworkConfig.UnicastCreateStreamRetryDelay).
SetGossipSubTracer(meshTracer).
SetGossipSubScoreTracerInterval(builder.FlowConfig.NetworkConfig.GossipSubConfig.ScoreTracerInterval).
SetGossipSubRpcInspectorSuite(rpcInspectorSuite).
Build()

if err != nil {
return nil, fmt.Errorf("could not build public libp2p node: %w", err)
}
Expand Down
18 changes: 5 additions & 13 deletions insecure/corruptlibp2p/libp2p_node_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@ import (
"github.com/rs/zerolog"
corrupt "github.com/yhassanzadeh13/go-libp2p-pubsub"

netconf "github.com/onflow/flow-go/config/network"
fcrypto "github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/netconf"
"github.com/onflow/flow-go/network/p2p"
"github.com/onflow/flow-go/network/p2p/p2pbuilder"
p2pconfig "github.com/onflow/flow-go/network/p2p/p2pbuilder/config"
"github.com/onflow/flow-go/network/p2p/p2pbuilder/inspector"
)

// InitCorruptLibp2pNode initializes and returns a corrupt libp2p node that should only be used for BFT testing in
Expand Down Expand Up @@ -74,17 +73,10 @@ func InitCorruptLibp2pNode(
Metrics: metricsCfg,
}

rpcInspectorSuite, err := inspector.NewGossipSubInspectorBuilder(log, sporkId, &netConfig.GossipSubConfig.GossipSubRPCInspectorsConfig, idProvider, metricsCfg).
SetNetworkType(network.PrivateNetwork).
SetMetrics(metCfg).
Build()
if err != nil {
return nil, fmt.Errorf("failed to create gossipsub rpc inspectors for default libp2p node: %w", err)
}

builder, err := p2pbuilder.DefaultNodeBuilder(
log,
address,
network.PrivateNetwork,
flowKey,
sporkId,
idProvider,
Expand All @@ -94,7 +86,7 @@ func InitCorruptLibp2pNode(
connGaterCfg,
peerManagerCfg,
&netConfig.GossipSubConfig,
rpcInspectorSuite,
&netConfig.GossipSubRPCInspectorsConfig,
&netConfig.ResourceManagerConfig,
uniCfg,
&netConfig.ConnectionManagerConfig,
Expand All @@ -114,8 +106,8 @@ func InitCorruptLibp2pNode(
// CorruptGossipSubFactory returns a factory function that creates a new instance of the forked gossipsub module from
// github.com/yhassanzadeh13/go-libp2p-pubsub for the purpose of BFT testing and attack vector implementation.
func CorruptGossipSubFactory(routerOpts ...func(*corrupt.GossipSubRouter)) p2p.GossipSubFactoryFunc {
factory := func(ctx context.Context, logger zerolog.Logger, host host.Host, cfg p2p.PubSubAdapterConfig) (p2p.PubSubAdapter, error) {
adapter, router, err := NewCorruptGossipSubAdapter(ctx, logger, host, cfg)
factory := func(ctx context.Context, logger zerolog.Logger, host host.Host, cfg p2p.PubSubAdapterConfig, clusterChangeConsumer p2p.CollectionClusterChangesConsumer) (p2p.PubSubAdapter, error) {
adapter, router, err := NewCorruptGossipSubAdapter(ctx, logger, host, cfg, clusterChangeConsumer)
for _, opt := range routerOpts {
opt(router)
}
Expand Down
Loading

0 comments on commit 487981d

Please sign in to comment.