Skip to content

Commit

Permalink
Merge branch 'master' into petera/refactor-grpc-converters
Browse files Browse the repository at this point in the history
  • Loading branch information
peterargue authored Jul 4, 2023
2 parents 6191498 + 6c51ddf commit 9dbabf7
Show file tree
Hide file tree
Showing 111 changed files with 2,722 additions and 1,508 deletions.
30 changes: 14 additions & 16 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ import (
"github.com/onflow/flow-go/network/p2p/middleware"
"github.com/onflow/flow-go/network/p2p/p2pbuilder"
p2pconfig "github.com/onflow/flow-go/network/p2p/p2pbuilder/config"
"github.com/onflow/flow-go/network/p2p/p2pbuilder/inspector"
"github.com/onflow/flow-go/network/p2p/subscription"
"github.com/onflow/flow-go/network/p2p/tracer"
"github.com/onflow/flow-go/network/p2p/translator"
Expand Down Expand Up @@ -1199,25 +1198,26 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri
builder.IdentityProvider,
builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval)

// setup RPC inspectors
rpcInspectorBuilder := inspector.NewGossipSubInspectorBuilder(builder.Logger, builder.SporkID, &builder.FlowConfig.NetworkConfig.GossipSubConfig.GossipSubRPCInspectorsConfig, builder.IdentityProvider, builder.Metrics.Network)
rpcInspectorSuite, err := rpcInspectorBuilder.
SetNetworkType(network.PublicNetwork).
SetMetrics(&p2pconfig.MetricsConfig{
HeroCacheFactory: builder.HeroCacheMetricsFactory(),
Metrics: builder.Metrics.Network,
}).Build()
if err != nil {
return nil, fmt.Errorf("failed to create gossipsub rpc inspectors for access node: %w", err)
}

libp2pNode, err := p2pbuilder.NewNodeBuilder(
builder.Logger,
networkMetrics,
&p2pconfig.MetricsConfig{
HeroCacheFactory: builder.HeroCacheMetricsFactory(),
Metrics: networkMetrics,
},
network.PublicNetwork,
bindAddress,
networkKey,
builder.SporkID,
builder.IdentityProvider,
&builder.FlowConfig.NetworkConfig.ResourceManagerConfig,
&builder.FlowConfig.NetworkConfig.GossipSubConfig.GossipSubRPCInspectorsConfig,
&p2pconfig.PeerManagerConfig{
// TODO: eventually, we need pruning enabled even on public network. However, it needs a modified version of
// the peer manager that also operate on the public identities.
ConnectionPruning: connection.PruningDisabled,
UpdateInterval: builder.FlowConfig.NetworkConfig.PeerUpdateInterval,
ConnectorFactory: connection.DefaultLibp2pBackoffConnectorFactory(),
},
&p2p.DisallowListCacheConfig{
MaxSize: builder.FlowConfig.NetworkConfig.DisallowListNotificationCacheSize,
Metrics: metrics.DisallowListCacheMetricsFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
Expand All @@ -1240,11 +1240,9 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri
)
}).
// disable connection pruning for the access node which supports the observer
SetPeerManagerOptions(connection.PruningDisabled, builder.FlowConfig.NetworkConfig.PeerUpdateInterval).
SetStreamCreationRetryInterval(builder.FlowConfig.NetworkConfig.UnicastCreateStreamRetryDelay).
SetGossipSubTracer(meshTracer).
SetGossipSubScoreTracerInterval(builder.FlowConfig.NetworkConfig.GossipSubConfig.ScoreTracerInterval).
SetGossipSubRpcInspectorSuite(rpcInspectorSuite).
Build()

if err != nil {
Expand Down
9 changes: 1 addition & 8 deletions cmd/collection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/onflow/flow-go/module/mempool/queue"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/network/channels"
"github.com/onflow/flow-go/network/p2p"
"github.com/onflow/flow-go/state/protocol"
badgerState "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/blocktimer"
Expand Down Expand Up @@ -593,13 +592,7 @@ func main() {

// register the manager for protocol events
node.ProtocolEvents.AddConsumer(manager)

for _, rpcInspector := range node.GossipSubRpcInspectorSuite.Inspectors() {
if r, ok := rpcInspector.(p2p.GossipSubMsgValidationRpcInspector); ok {
clusterEvents.AddConsumer(r)
}
}

clusterEvents.AddConsumer(node.LibP2PNode)
return manager, err
})

Expand Down
3 changes: 0 additions & 3 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,6 @@ type NodeConfig struct {

// UnicastRateLimiterDistributor notifies consumers when a peer's unicast message is rate limited.
UnicastRateLimiterDistributor p2p.UnicastRateLimiterDistributor

// GossipSubRpcInspectorSuite rpc inspector suite.
GossipSubRpcInspectorSuite p2p.GossipSubInspectorSuite
}

// StateExcerptAtBoot stores information about the root snapshot and latest finalized block for use in bootstrapping.
Expand Down
21 changes: 8 additions & 13 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ import (
"github.com/onflow/flow-go/network/p2p/middleware"
"github.com/onflow/flow-go/network/p2p/p2pbuilder"
p2pconfig "github.com/onflow/flow-go/network/p2p/p2pbuilder/config"
"github.com/onflow/flow-go/network/p2p/p2pbuilder/inspector"
"github.com/onflow/flow-go/network/p2p/subscription"
"github.com/onflow/flow-go/network/p2p/tracer"
"github.com/onflow/flow-go/network/p2p/translator"
Expand Down Expand Up @@ -710,23 +709,20 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
builder.IdentityProvider,
builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval)

rpcInspectorSuite, err := inspector.NewGossipSubInspectorBuilder(builder.Logger, builder.SporkID, &builder.FlowConfig.NetworkConfig.GossipSubConfig.GossipSubRPCInspectorsConfig, builder.IdentityProvider, builder.Metrics.Network).
SetNetworkType(network.PublicNetwork).
SetMetrics(&p2pconfig.MetricsConfig{
HeroCacheFactory: builder.HeroCacheMetricsFactory(),
Metrics: builder.Metrics.Network,
}).Build()
if err != nil {
return nil, fmt.Errorf("could not initialize gossipsub inspectors for observer node: %w", err)
}

node, err := p2pbuilder.NewNodeBuilder(
builder.Logger,
builder.Metrics.Network,
&p2pconfig.MetricsConfig{
HeroCacheFactory: builder.HeroCacheMetricsFactory(),
Metrics: builder.Metrics.Network,
},
network.PublicNetwork,
builder.BaseConfig.BindAddr,
networkKey,
builder.SporkID,
builder.IdentityProvider,
&builder.FlowConfig.NetworkConfig.ResourceManagerConfig,
&builder.FlowConfig.NetworkConfig.GossipSubConfig.GossipSubRPCInspectorsConfig,
p2pconfig.PeerManagerDisableConfig(), // disable peer manager for observer node.
&p2p.DisallowListCacheConfig{
MaxSize: builder.FlowConfig.NetworkConfig.DisallowListNotificationCacheSize,
Metrics: metrics.DisallowListCacheMetricsFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
Expand All @@ -747,7 +743,6 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
SetStreamCreationRetryInterval(builder.FlowConfig.NetworkConfig.UnicastCreateStreamRetryDelay).
SetGossipSubTracer(meshTracer).
SetGossipSubScoreTracerInterval(builder.FlowConfig.NetworkConfig.GossipSubConfig.ScoreTracerInterval).
SetGossipSubRpcInspectorSuite(rpcInspectorSuite).
Build()

if err != nil {
Expand Down
26 changes: 8 additions & 18 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ import (
"github.com/onflow/flow-go/network/p2p"
"github.com/onflow/flow-go/network/p2p/cache"
"github.com/onflow/flow-go/network/p2p/conduit"
"github.com/onflow/flow-go/network/p2p/connection"
"github.com/onflow/flow-go/network/p2p/dns"
"github.com/onflow/flow-go/network/p2p/middleware"
"github.com/onflow/flow-go/network/p2p/p2pbuilder"
p2pconfig "github.com/onflow/flow-go/network/p2p/p2pbuilder/config"
"github.com/onflow/flow-go/network/p2p/p2pbuilder/inspector"
"github.com/onflow/flow-go/network/p2p/ping"
"github.com/onflow/flow-go/network/p2p/subscription"
"github.com/onflow/flow-go/network/p2p/unicast/protocols"
Expand Down Expand Up @@ -336,6 +336,7 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit() {
peerManagerCfg := &p2pconfig.PeerManagerConfig{
ConnectionPruning: fnb.FlowConfig.NetworkConfig.NetworkConnectionPruning,
UpdateInterval: fnb.FlowConfig.NetworkConfig.PeerUpdateInterval,
ConnectorFactory: connection.DefaultLibp2pBackoffConnectorFactory(),
}

fnb.Component(LibP2PNodeComponent, func(node *NodeConfig) (module.ReadyDoneAware, error) {
Expand All @@ -344,34 +345,23 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit() {
myAddr = fnb.BaseConfig.BindAddr
}

metricsCfg := &p2pconfig.MetricsConfig{
Metrics: fnb.Metrics.Network,
HeroCacheFactory: fnb.HeroCacheMetricsFactory(),
}

rpcInspectorSuite, err := inspector.NewGossipSubInspectorBuilder(fnb.Logger, fnb.SporkID, &fnb.FlowConfig.NetworkConfig.GossipSubConfig.GossipSubRPCInspectorsConfig, fnb.IdentityProvider, fnb.Metrics.Network).
SetNetworkType(network.PrivateNetwork).
SetMetrics(metricsCfg).
Build()
if err != nil {
return nil, fmt.Errorf("failed to create gossipsub rpc inspectors for default libp2p node: %w", err)
}

fnb.GossipSubRpcInspectorSuite = rpcInspectorSuite

builder, err := p2pbuilder.DefaultNodeBuilder(
fnb.Logger,
myAddr,
network.PrivateNetwork,
fnb.NetworkKey,
fnb.SporkID,
fnb.IdentityProvider,
metricsCfg,
&p2pconfig.MetricsConfig{
Metrics: fnb.Metrics.Network,
HeroCacheFactory: fnb.HeroCacheMetricsFactory(),
},
fnb.Resolver,
fnb.BaseConfig.NodeRole,
connGaterCfg,
peerManagerCfg,
&fnb.FlowConfig.NetworkConfig.GossipSubConfig,
fnb.GossipSubRpcInspectorSuite,
&fnb.FlowConfig.NetworkConfig.GossipSubRPCInspectorsConfig,
&fnb.FlowConfig.NetworkConfig.ResourceManagerConfig,
uniCfg,
&fnb.FlowConfig.NetworkConfig.ConnectionManagerConfig,
Expand Down
6 changes: 6 additions & 0 deletions cmd/util/cmd/execution-state-extract/export_report.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"EpochCounter": 0,
"PreviousStateCommitment": "18eb0e8beef7ce851e552ecd29c813fde0a9e6f0c5614d7615642076602a48cf",
"CurrentStateCommitment": "18eb0e8beef7ce851e552ecd29c813fde0a9e6f0c5614d7615642076602a48cf",
"ReportSucceeded": true
}
2 changes: 1 addition & 1 deletion config/base_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package config
import (
"github.com/spf13/pflag"

netconf "github.com/onflow/flow-go/config/network"
"github.com/onflow/flow-go/network/netconf"
)

const (
Expand Down
39 changes: 36 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/spf13/pflag"
"github.com/spf13/viper"

"github.com/onflow/flow-go/config/network"
"github.com/onflow/flow-go/network/netconf"
)

var (
Expand All @@ -28,7 +28,7 @@ var (
type FlowConfig struct {
// ConfigFile used to set a path to a config.yml file used to override the default-config.yml file.
ConfigFile string `validate:"filepath" mapstructure:"config-file"`
NetworkConfig *network.Config `mapstructure:"network-config"`
NetworkConfig *netconf.Config `mapstructure:"network-config"`
}

// Validate checks validity of the Flow config. Errors indicate that either the configuration is broken,
Expand Down Expand Up @@ -184,12 +184,45 @@ func LogConfig(logger *zerolog.Event, flags *pflag.FlagSet) map[string]struct{}
// keys do not match the CLI flags 1:1. ie: networking-connection-pruning -> network-config.networking-connection-pruning. After aliases
// are set the conf store will override values with any CLI flag values that are set as expected.
func setAliases() {
err := network.SetAliases(conf)
err := SetAliases(conf)
if err != nil {
panic(fmt.Errorf("failed to set network aliases: %w", err))
}
}

// SetAliases this func sets an aliases for each CLI flag defined for network config overrides to it's corresponding
// full key in the viper config store. This is required because in our config.yml file all configuration values for the
// Flow network are stored one level down on the network-config property. When the default config is bootstrapped viper will
// store these values with the "network-config." prefix on the config key, because we do not want to use CLI flags like --network-config.networking-connection-pruning
// to override default values we instead use cleans flags like --networking-connection-pruning and create an alias from networking-connection-pruning -> network-config.networking-connection-pruning
// to ensure overrides happen as expected.
// Args:
// *viper.Viper: instance of the viper store to register network config aliases on.
// Returns:
// error: if a flag does not have a corresponding key in the viper store.
func SetAliases(conf *viper.Viper) error {
m := make(map[string]string)
// create map of key -> full pathkey
// ie: "networking-connection-pruning" -> "network-config.networking-connection-pruning"
for _, key := range conf.AllKeys() {
s := strings.Split(key, ".")
// check len of s, we expect all network keys to have a single prefix "network-config"
// s should always contain only 2 elements
if len(s) == 2 {
m[s[1]] = key
}
}
// each flag name should correspond to exactly one key in our config store after it is loaded with the default config
for _, flagName := range netconf.AllFlagNames() {
fullKey, ok := m[flagName]
if !ok {
return fmt.Errorf("invalid network configuration missing configuration key flag name %s check config file and cli flags", flagName)
}
conf.RegisterAlias(fullKey, flagName)
}
return nil
}

// overrideConfigFile overrides the default config file by reading in the config file at the path set
// by the --config-file flag in our viper config store.
//
Expand Down
11 changes: 9 additions & 2 deletions engine/access/rest/middleware/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,19 @@ import (
"github.com/onflow/flow-go/module"
)

func MetricsMiddleware(restCollector module.RestMetrics) mux.MiddlewareFunc {
func MetricsMiddleware(restCollector module.RestMetrics, urlToRoute func(string) (string, error)) mux.MiddlewareFunc {
metricsMiddleware := middleware.New(middleware.Config{Recorder: restCollector})
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
//urlToRoute transforms specific URL to generic url pattern
routeName, err := urlToRoute(req.URL.Path)
if err != nil {
// In case of an error, an empty route name filled with "unknown"
routeName = "unknown"
}

// This is a custom metric being called on every http request
restCollector.AddTotalRequests(req.Context(), req.Method, req.URL.Path)
restCollector.AddTotalRequests(req.Context(), req.Method, routeName)

// Modify the writer
respWriter := &responseWriter{w, http.StatusOK}
Expand Down
61 changes: 60 additions & 1 deletion engine/access/rest/router.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package rest

import (
"fmt"
"net/http"
"regexp"
"strings"

"github.com/gorilla/mux"
"github.com/rs/zerolog"
Expand All @@ -21,7 +24,7 @@ func newRouter(backend access.API, logger zerolog.Logger, chain flow.Chain, rest
v1SubRouter.Use(middleware.LoggingMiddleware(logger))
v1SubRouter.Use(middleware.QueryExpandable())
v1SubRouter.Use(middleware.QuerySelect())
v1SubRouter.Use(middleware.MetricsMiddleware(restCollector))
v1SubRouter.Use(middleware.MetricsMiddleware(restCollector, URLToRoute))

linkGenerator := models.NewLinkGeneratorImpl(v1SubRouter)

Expand Down Expand Up @@ -114,3 +117,59 @@ var Routes = []route{{
Name: "getNodeVersionInfo",
Handler: GetNodeVersionInfo,
}}

var routeUrlMap = map[string]string{}
var routeRE = regexp.MustCompile(`(?i)/v1/(\w+)(/(\w+)(/(\w+))?)?`)

func init() {
for _, r := range Routes {
routeUrlMap[r.Pattern] = r.Name
}
}

func URLToRoute(url string) (string, error) {
normalized, err := normalizeURL(url)
if err != nil {
return "", err
}

name, ok := routeUrlMap[normalized]
if !ok {
return "", fmt.Errorf("invalid url")
}
return name, nil
}

func normalizeURL(url string) (string, error) {
matches := routeRE.FindAllStringSubmatch(url, -1)
if len(matches) != 1 || len(matches[0]) != 6 {
return "", fmt.Errorf("invalid url")
}

// given a URL like
// /v1/blocks/1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef/payload
// groups [ 1 ] [ 3 ] [ 5 ]
// normalized form like /v1/blocks/{id}/payload

parts := []string{matches[0][1]}

switch len(matches[0][3]) {
case 0:
// top level resource. e.g. /v1/blocks
case 64:
// id based resource. e.g. /v1/blocks/1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef
parts = append(parts, "{id}")
case 16:
// address based resource. e.g. /v1/accounts/1234567890abcdef
parts = append(parts, "{address}")
default:
// named resource. e.g. /v1/network/parameters
parts = append(parts, matches[0][3])
}

if matches[0][5] != "" {
parts = append(parts, matches[0][5])
}

return "/" + strings.Join(parts, "/"), nil
}
Loading

0 comments on commit 9dbabf7

Please sign in to comment.