From aea636a3097d595e5c7b2eea9fdddf0f4f5c8f99 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Mon, 12 Feb 2024 23:40:06 -0500 Subject: [PATCH 01/10] add invalid topic ID threshold configuration and flags --- config/default-config.yml | 6 ++++++ network/netconf/flags.go | 8 ++++++++ network/p2p/config/gossipsub_rpc_inspectors.go | 9 +++++++++ 3 files changed, 23 insertions(+) diff --git a/config/default-config.yml b/config/default-config.yml index c4d162331e9..54e96c35d52 100644 --- a/config/default-config.yml +++ b/config/default-config.yml @@ -160,6 +160,9 @@ network-config: # to avoid penalizing peers that are not malicious but are misbehaving due to bugs or other issues. # A topic id is considered duplicate if it appears more than once in a single GRAFT or PRUNE message. duplicate-topic-id-threshold: 50 + # Maximum number of total invalid topic ids in a single GRAFT or PRUNE message, ideally this should be 0 but we allow for some tolerance + # to avoid penalizing peers that are not malicious but are misbehaving due to bugs or other issues. + invalid-topic-id-threshold: 50 ihave: # The maximum allowed number of iHave messages in a single RPC message. # Each iHave message represents the list of message ids. When the total number of iHave messages @@ -181,6 +184,9 @@ network-config: # Ideally, an iHave message should not have any duplicate message IDs, hence a message id is considered duplicate when it is repeated more than once # within the same iHave message. When the total number of duplicate message ids in a single iHave message exceeds this threshold, the inspection of message will fail. duplicate-message-id-threshold: 100 + # Maximum number of total invalid topic ids in a single IHAVE message, ideally this should be 0 but we allow for some tolerance + # to avoid penalizing peers that are not malicious but are misbehaving due to bugs or other issues. + invalid-topic-id-threshold: 50 iwant: # The maximum allowed number of iWant messages in a single RPC message. # Each iWant message represents the list of message ids. When the total number of iWant messages diff --git a/network/netconf/flags.go b/network/netconf/flags.go index d6e58a4b340..82c333be9dd 100644 --- a/network/netconf/flags.go +++ b/network/netconf/flags.go @@ -116,7 +116,9 @@ func AllFlagNames() []string { BuildFlagName(gossipsubKey, p2pconfig.RpcInspectorKey, p2pconfig.ValidationConfigKey, p2pconfig.IHaveConfigKey, p2pconfig.MessageIdCountThreshold), BuildFlagName(gossipsubKey, p2pconfig.RpcInspectorKey, p2pconfig.ValidationConfigKey, p2pconfig.IHaveConfigKey, p2pconfig.DuplicateTopicIdThresholdKey), BuildFlagName(gossipsubKey, p2pconfig.RpcInspectorKey, p2pconfig.ValidationConfigKey, p2pconfig.IHaveConfigKey, p2pconfig.DuplicateMessageIdThresholdKey), + BuildFlagName(gossipsubKey, p2pconfig.RpcInspectorKey, p2pconfig.ValidationConfigKey, p2pconfig.IHaveConfigKey, p2pconfig.InvalidTopicIdThresholdKey), BuildFlagName(gossipsubKey, p2pconfig.RpcInspectorKey, p2pconfig.ValidationConfigKey, p2pconfig.GraftPruneKey, p2pconfig.DuplicateTopicIdThresholdKey), + BuildFlagName(gossipsubKey, p2pconfig.RpcInspectorKey, p2pconfig.ValidationConfigKey, p2pconfig.GraftPruneKey, p2pconfig.InvalidTopicIdThresholdKey), BuildFlagName(gossipsubKey, p2pconfig.RpcInspectorKey, p2pconfig.ValidationConfigKey, p2pconfig.GraftPruneKey, p2pconfig.MessageCountThreshold), BuildFlagName(gossipsubKey, p2pconfig.RpcInspectorKey, p2pconfig.ValidationConfigKey, p2pconfig.IWantConfigKey, p2pconfig.MessageCountThreshold), BuildFlagName(gossipsubKey, p2pconfig.RpcInspectorKey, p2pconfig.ValidationConfigKey, p2pconfig.IWantConfigKey, p2pconfig.MessageIdCountThreshold), @@ -353,6 +355,9 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) { "the max allowed duplicate topic IDs across all ihave control messages in a single RPC message, if exceeded a misbehavior report will be created") flags.Int(BuildFlagName(gossipsubKey, p2pconfig.RpcInspectorKey, p2pconfig.ValidationConfigKey, p2pconfig.IHaveConfigKey, p2pconfig.DuplicateMessageIdThresholdKey), config.GossipSub.RpcInspector.Validation.IHave.DuplicateMessageIdThreshold, + "the max allowed invalid topics in a single ihave control message, if exceeded a misbehavior report will be created") + flags.Int(BuildFlagName(gossipsubKey, p2pconfig.RpcInspectorKey, p2pconfig.ValidationConfigKey, p2pconfig.IHaveConfigKey, p2pconfig.InvalidTopicIdThresholdKey), + config.GossipSub.RpcInspector.Validation.IHave.InvalidTopicThreshold, "the max allowed duplicate message IDs in a single ihave control message, if exceeded a misbehavior report will be created") flags.Int(BuildFlagName(gossipsubKey, p2pconfig.RpcInspectorKey, p2pconfig.ValidationConfigKey, p2pconfig.GraftPruneKey, p2pconfig.MessageCountThreshold), config.GossipSub.RpcInspector.Validation.GraftPrune.MessageCountThreshold, @@ -378,6 +383,9 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) { flags.Int(BuildFlagName(gossipsubKey, p2pconfig.RpcInspectorKey, p2pconfig.ValidationConfigKey, p2pconfig.GraftPruneKey, p2pconfig.DuplicateTopicIdThresholdKey), config.GossipSub.RpcInspector.Validation.GraftPrune.DuplicateTopicIdThreshold, "the max allowed duplicate topic IDs across all graft or prune control messages in a single RPC message, if exceeded a misbehavior report will be created") + flags.Int(BuildFlagName(gossipsubKey, p2pconfig.RpcInspectorKey, p2pconfig.ValidationConfigKey, p2pconfig.GraftPruneKey, p2pconfig.InvalidTopicIdThresholdKey), + config.GossipSub.RpcInspector.Validation.GraftPrune.InvalidTopicThreshold, + "the max allowed invalid topic across all graft or prune control messages in a single RPC message, if exceeded a misbehavior report will be created") flags.Duration(BuildFlagName(gossipsubKey, p2pconfig.SubscriptionProviderKey, p2pconfig.UpdateIntervalKey), config.GossipSub.SubscriptionProvider.UpdateInterval, diff --git a/network/p2p/config/gossipsub_rpc_inspectors.go b/network/p2p/config/gossipsub_rpc_inspectors.go index a2e4b9f180e..490d304534d 100644 --- a/network/p2p/config/gossipsub_rpc_inspectors.go +++ b/network/p2p/config/gossipsub_rpc_inspectors.go @@ -138,6 +138,10 @@ type GraftPruneRpcInspectionParameters struct { // Ideally, a GRAFT or PRUNE message should not have any duplicate topics, hence a topic ID is counted as a duplicate only if it is repeated more than once. // When the total number of duplicate topic ids in a single GRAFT or PRUNE message exceeds this threshold, the inspection of message will fail. DuplicateTopicIdThreshold int `validate:"gte=0" mapstructure:"duplicate-topic-id-threshold"` + + // InvalidTopicIdThreshold Maximum number of total invalid topic ids in a single GRAFT or PRUNE message, ideally this should be 0 but we allow for some tolerance + // to avoid penalizing peers that are not malicious but are misbehaving due to bugs or other issues. + InvalidTopicIdThreshold int `validate:"gte=0" mapstructure:"invalid-topic-id-threshold"` } const ( @@ -145,6 +149,7 @@ const ( MessageIdCountThreshold = "message-id-count-threshold" CacheMissThresholdKey = "cache-miss-threshold" DuplicateMsgIDThresholdKey = "duplicate-message-id-threshold" + InvalidTopicIdThresholdKey = "invalid-topic-id-threshold" ) // IWantRpcInspectionParameters contains the "numerical values" for iwant rpc control inspection. @@ -207,6 +212,10 @@ type IHaveRpcInspectionParameters struct { // Ideally, an iHave message should not have any duplicate message IDs, hence a message id is considered duplicate when it is repeated more than once // within the same iHave message. When the total number of duplicate message ids in a single iHave message exceeds this threshold, the inspection of message will fail. DuplicateMessageIdThreshold int `validate:"gte=0" mapstructure:"duplicate-message-id-threshold"` + + // InvalidTopicIdThreshold Maximum number of total invalid topic ids in a single IHAVE message, ideally this should be 0 but we allow for some tolerance + // to avoid penalizing peers that are not malicious but are misbehaving due to bugs or other issues. + InvalidTopicIdThreshold int `validate:"gte=0" mapstructure:"invalid-topic-id-threshold"` } const ( From 1d572c56f92b4f3d08d32ee8e4482328d45d042e Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 13 Feb 2024 02:53:33 -0500 Subject: [PATCH 02/10] add invalid topic unit tests above/below threshold for each control message type --- .../gossipsub_rpc_validation_inspector.go | 2 +- network/netconf/flags.go | 10 +- .../control_message_validation_inspector.go | 18 +- ...ntrol_message_validation_inspector_test.go | 271 +++++++++++++++++- 4 files changed, 280 insertions(+), 21 deletions(-) diff --git a/module/metrics/gossipsub_rpc_validation_inspector.go b/module/metrics/gossipsub_rpc_validation_inspector.go index 6b79e8c477d..7460486b426 100644 --- a/module/metrics/gossipsub_rpc_validation_inspector.go +++ b/module/metrics/gossipsub_rpc_validation_inspector.go @@ -323,7 +323,7 @@ func (c *GossipSubRpcValidationInspectorMetrics) AsyncProcessingFinished(duratio c.rpcCtrlMsgAsyncProcessingTimeHistogram.Observe(duration.Seconds()) } -// OnControlMessageIDsTruncated tracks the number of times a control message was truncated. +// OnControlMessagesTruncated tracks the number of times a control message was truncated. // Args: // // messageType: the type of the control message that was truncated diff --git a/network/netconf/flags.go b/network/netconf/flags.go index 82c333be9dd..44767ca4969 100644 --- a/network/netconf/flags.go +++ b/network/netconf/flags.go @@ -355,10 +355,12 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) { "the max allowed duplicate topic IDs across all ihave control messages in a single RPC message, if exceeded a misbehavior report will be created") flags.Int(BuildFlagName(gossipsubKey, p2pconfig.RpcInspectorKey, p2pconfig.ValidationConfigKey, p2pconfig.IHaveConfigKey, p2pconfig.DuplicateMessageIdThresholdKey), config.GossipSub.RpcInspector.Validation.IHave.DuplicateMessageIdThreshold, - "the max allowed invalid topics in a single ihave control message, if exceeded a misbehavior report will be created") - flags.Int(BuildFlagName(gossipsubKey, p2pconfig.RpcInspectorKey, p2pconfig.ValidationConfigKey, p2pconfig.IHaveConfigKey, p2pconfig.InvalidTopicIdThresholdKey), - config.GossipSub.RpcInspector.Validation.IHave.InvalidTopicThreshold, "the max allowed duplicate message IDs in a single ihave control message, if exceeded a misbehavior report will be created") + flags.Int(BuildFlagName(gossipsubKey, p2pconfig.RpcInspectorKey, p2pconfig.ValidationConfigKey, p2pconfig.IHaveConfigKey, p2pconfig.InvalidTopicIdThresholdKey), + config.GossipSub.RpcInspector.Validation.IHave.InvalidTopicIdThreshold, + "the max allowed invalid topics in a single ihave control message, if exceeded a misbehavior report will be created", + ) + flags.Int(BuildFlagName(gossipsubKey, p2pconfig.RpcInspectorKey, p2pconfig.ValidationConfigKey, p2pconfig.GraftPruneKey, p2pconfig.MessageCountThreshold), config.GossipSub.RpcInspector.Validation.GraftPrune.MessageCountThreshold, "threshold for the number of graft or prune control messages to accept on a single RPC message, if exceeded the RPC message will be sampled and truncated") @@ -384,7 +386,7 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) { config.GossipSub.RpcInspector.Validation.GraftPrune.DuplicateTopicIdThreshold, "the max allowed duplicate topic IDs across all graft or prune control messages in a single RPC message, if exceeded a misbehavior report will be created") flags.Int(BuildFlagName(gossipsubKey, p2pconfig.RpcInspectorKey, p2pconfig.ValidationConfigKey, p2pconfig.GraftPruneKey, p2pconfig.InvalidTopicIdThresholdKey), - config.GossipSub.RpcInspector.Validation.GraftPrune.InvalidTopicThreshold, + config.GossipSub.RpcInspector.Validation.GraftPrune.InvalidTopicIdThreshold, "the max allowed invalid topic across all graft or prune control messages in a single RPC message, if exceeded a misbehavior report will be created") flags.Duration(BuildFlagName(gossipsubKey, p2pconfig.SubscriptionProviderKey, p2pconfig.UpdateIntervalKey), diff --git a/network/p2p/inspector/validation/control_message_validation_inspector.go b/network/p2p/inspector/validation/control_message_validation_inspector.go index 705e709a8cb..6547075c119 100644 --- a/network/p2p/inspector/validation/control_message_validation_inspector.go +++ b/network/p2p/inspector/validation/control_message_validation_inspector.go @@ -366,6 +366,7 @@ func (c *ControlMsgValidationInspector) inspectGraftMessages(from peer.ID, graft duplicateTopicTracker := make(duplicateStrTracker) totalDuplicateTopicIds := 0 + totalInvalidTopicIdErrs := 0 defer func() { // regardless of inspection result, update metrics c.metrics.OnGraftMessageInspected(totalDuplicateTopicIds) @@ -384,9 +385,12 @@ func (c *ControlMsgValidationInspector) inspectGraftMessages(from peer.ID, graft } err, ctrlMsgType := c.validateTopic(from, topic, activeClusterIDS) if err != nil { + totalInvalidTopicIdErrs++ // TODO: consider adding a threshold for this error similar to the duplicate topic id threshold. c.metrics.OnInvalidTopicIdDetectedForControlMessage(p2pmsg.CtrlMsgGraft) - return err, ctrlMsgType + if totalInvalidTopicIdErrs > c.config.GraftPrune.InvalidTopicIdThreshold { + return err, ctrlMsgType + } } } return nil, p2p.CtrlMsgNonClusterTopicType @@ -413,6 +417,7 @@ func (c *ControlMsgValidationInspector) inspectPruneMessages(from peer.ID, prune } tracker := make(duplicateStrTracker) totalDuplicateTopicIds := 0 + totalInvalidTopicIdErrs := 0 defer func() { // regardless of inspection result, update metrics c.metrics.OnPruneMessageInspected(totalDuplicateTopicIds) @@ -430,9 +435,12 @@ func (c *ControlMsgValidationInspector) inspectPruneMessages(from peer.ID, prune } err, ctrlMsgType := c.validateTopic(from, topic, activeClusterIDS) if err != nil { + totalInvalidTopicIdErrs++ // TODO: consider adding a threshold for this error similar to the duplicate topic id threshold. c.metrics.OnInvalidTopicIdDetectedForControlMessage(p2pmsg.CtrlMsgPrune) - return err, ctrlMsgType + if totalInvalidTopicIdErrs > c.config.GraftPrune.InvalidTopicIdThreshold { + return err, ctrlMsgType + } } } return nil, p2p.CtrlMsgNonClusterTopicType @@ -471,6 +479,7 @@ func (c *ControlMsgValidationInspector) inspectIHaveMessages(from peer.ID, ihave totalMessageIds := 0 totalDuplicateTopicIds := 0 totalDuplicateMessageIds := 0 + totalInvalidTopicIdErrs := 0 defer func() { // regardless of inspection result, update metrics c.metrics.OnIHaveMessagesInspected(totalDuplicateTopicIds, totalDuplicateMessageIds) @@ -483,9 +492,12 @@ func (c *ControlMsgValidationInspector) inspectIHaveMessages(from peer.ID, ihave // first check if the topic is valid, fail fast if it is not err, ctrlMsgType := c.validateTopic(from, channels.Topic(topic), activeClusterIDS) if err != nil { + totalInvalidTopicIdErrs++ // TODO: consider adding a threshold for this error similar to the duplicate topic id threshold. c.metrics.OnInvalidTopicIdDetectedForControlMessage(p2pmsg.CtrlMsgIHave) - return err, ctrlMsgType + if totalInvalidTopicIdErrs > c.config.IHave.InvalidTopicIdThreshold { + return err, ctrlMsgType + } } // then track the topic ensuring it is not beyond a duplicate threshold. diff --git a/network/p2p/inspector/validation/control_message_validation_inspector_test.go b/network/p2p/inspector/validation/control_message_validation_inspector_test.go index 02fc72f3a44..f5f8d1b8722 100644 --- a/network/p2p/inspector/validation/control_message_validation_inspector_test.go +++ b/network/p2p/inspector/validation/control_message_validation_inspector_test.go @@ -343,21 +343,79 @@ func TestControlMessageInspection_ValidRpc(t *testing.T) { unittest.RequireCloseBefore(t, inspector.Done(), 5*time.Second, "inspector did not stop") } -// TestGraftInspection_InvalidTopic ensures inspector disseminates an invalid control message notification for -// graft messages when the topic is invalid. -func TestGraftInspection_InvalidTopic(t *testing.T) { - inspector, signalerCtx, cancel, distributor, rpcTracker, sporkID, _, topicProviderOracle := inspectorFixture(t) - // create unknown topic - unknownTopic, malformedTopic, invalidSporkIDTopic := invalidTopics(t, sporkID) +// TestGraftInspection_InvalidTopic_BelowThreshold ensures inspector does not disseminate an invalid control message notification for +// graft messages when the invalid topic id count does not exceed the configured threshold. +func TestGraftInspection_InvalidTopic_BelowThreshold(t *testing.T) { + c, err := config.DefaultConfig() + require.NoError(t, err) + cfg := &c.NetworkConfig.GossipSub.RpcInspector.Validation + inspector, signalerCtx, cancel, distributor, rpcTracker, sporkID, _, topicProviderOracle := inspectorFixture(t, func(params *validation.InspectorParams) { + params.Config = cfg + }) + + var unknownTopicGrafts []*pubsub_pb.ControlGraft + var malformedTopicGrafts []*pubsub_pb.ControlGraft + var invalidSporkIDTopicGrafts []*pubsub_pb.ControlGraft + var allTopics []string + for i := 0; i < cfg.GraftPrune.InvalidTopicIdThreshold; i++ { + // create unknown topic + unknownTopic, malformedTopic, invalidSporkIDTopic := invalidTopics(t, sporkID) + allTopics = append(allTopics, unknownTopic, malformedTopic, invalidSporkIDTopic) + unknownTopicGrafts = append(unknownTopicGrafts, unittest.P2PRPCGraftFixture(&unknownTopic)) + malformedTopicGrafts = append(malformedTopicGrafts, unittest.P2PRPCGraftFixture(&malformedTopic)) + invalidSporkIDTopicGrafts = append(invalidSporkIDTopicGrafts, unittest.P2PRPCGraftFixture(&invalidSporkIDTopic)) + } // avoid unknown topics errors - topicProviderOracle.UpdateTopics([]string{unknownTopic, malformedTopic, invalidSporkIDTopic}) - unknownTopicGraft := unittest.P2PRPCGraftFixture(&unknownTopic) - malformedTopicGraft := unittest.P2PRPCGraftFixture(&malformedTopic) - invalidSporkIDTopicGraft := unittest.P2PRPCGraftFixture(&invalidSporkIDTopic) + topicProviderOracle.UpdateTopics(allTopics) + unknownTopicReq := unittest.P2PRPCFixture(unittest.WithGrafts(unknownTopicGrafts...)) + malformedTopicReq := unittest.P2PRPCFixture(unittest.WithGrafts(malformedTopicGrafts...)) + invalidSporkIDTopicReq := unittest.P2PRPCFixture(unittest.WithGrafts(invalidSporkIDTopicGrafts...)) - unknownTopicReq := unittest.P2PRPCFixture(unittest.WithGrafts(unknownTopicGraft)) - malformedTopicReq := unittest.P2PRPCFixture(unittest.WithGrafts(malformedTopicGraft)) - invalidSporkIDTopicReq := unittest.P2PRPCFixture(unittest.WithGrafts(invalidSporkIDTopicGraft)) + from := unittest.PeerIdFixture(t) + rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe() + // no notification should be disseminated for valid messages as long as the number of invalid topic ids is below the threshold + distributor.AssertNotCalled(t, "Distribute", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")) + inspector.Start(signalerCtx) + unittest.RequireComponentsReadyBefore(t, 1*time.Second, inspector) + + require.NoError(t, inspector.Inspect(from, unknownTopicReq)) + require.NoError(t, inspector.Inspect(from, malformedTopicReq)) + require.NoError(t, inspector.Inspect(from, invalidSporkIDTopicReq)) + + // sleep for 1 second to ensure rpc's is processed + time.Sleep(3 * time.Second) + cancel() + unittest.RequireCloseBefore(t, inspector.Done(), 5*time.Second, "inspector did not stop") +} + +// TestGraftInspection_InvalidTopic_AboveThreshold ensures inspector disseminates an invalid control message notification for +// graft messages when the invalid topic id count exceeds the configured threshold. +func TestGraftInspection_InvalidTopic_AboveThreshold(t *testing.T) { + c, err := config.DefaultConfig() + require.NoError(t, err) + cfg := &c.NetworkConfig.GossipSub.RpcInspector.Validation + inspector, signalerCtx, cancel, distributor, rpcTracker, sporkID, _, topicProviderOracle := inspectorFixture(t, func(params *validation.InspectorParams) { + params.Config = cfg + }) + + var unknownTopicGrafts []*pubsub_pb.ControlGraft + var malformedTopicGrafts []*pubsub_pb.ControlGraft + var invalidSporkIDTopicGrafts []*pubsub_pb.ControlGraft + var allTopics []string + for i := 0; i < cfg.GraftPrune.InvalidTopicIdThreshold+1; i++ { + // create unknown topic + unknownTopic, malformedTopic, invalidSporkIDTopic := invalidTopics(t, sporkID) + allTopics = append(allTopics, unknownTopic, malformedTopic, invalidSporkIDTopic) + unknownTopicGrafts = append(unknownTopicGrafts, unittest.P2PRPCGraftFixture(&unknownTopic)) + malformedTopicGrafts = append(malformedTopicGrafts, unittest.P2PRPCGraftFixture(&malformedTopic)) + invalidSporkIDTopicGrafts = append(invalidSporkIDTopicGrafts, unittest.P2PRPCGraftFixture(&invalidSporkIDTopic)) + } + + // avoid unknown topics errors + topicProviderOracle.UpdateTopics(allTopics) + unknownTopicReq := unittest.P2PRPCFixture(unittest.WithGrafts(unknownTopicGrafts...)) + malformedTopicReq := unittest.P2PRPCFixture(unittest.WithGrafts(malformedTopicGrafts...)) + invalidSporkIDTopicReq := unittest.P2PRPCFixture(unittest.WithGrafts(invalidSporkIDTopicGrafts...)) from := unittest.PeerIdFixture(t) checkNotification := checkNotificationFunc(t, from, p2pmsg.CtrlMsgGraft, channels.IsInvalidTopicErr, p2p.CtrlMsgNonClusterTopicType) @@ -370,6 +428,7 @@ func TestGraftInspection_InvalidTopic(t *testing.T) { require.NoError(t, inspector.Inspect(from, unknownTopicReq)) require.NoError(t, inspector.Inspect(from, malformedTopicReq)) require.NoError(t, inspector.Inspect(from, invalidSporkIDTopicReq)) + // sleep for 1 second to ensure rpc's is processed time.Sleep(time.Second) cancel() @@ -439,6 +498,99 @@ func TestGraftInspection_DuplicateTopicIds_AboveThreshold(t *testing.T) { unittest.RequireCloseBefore(t, inspector.Done(), 5*time.Second, "inspector did not stop") } +// TestPruneInspection_InvalidTopic_BelowThreshold ensures inspector does not disseminate an invalid control message notification for +// prune messages when the invalid topic id count does not exceed the configured threshold. +func TestPruneInspection_InvalidTopic_BelowThreshold(t *testing.T) { + c, err := config.DefaultConfig() + require.NoError(t, err) + cfg := &c.NetworkConfig.GossipSub.RpcInspector.Validation + inspector, signalerCtx, cancel, distributor, rpcTracker, sporkID, _, topicProviderOracle := inspectorFixture(t, func(params *validation.InspectorParams) { + params.Config = cfg + }) + + var unknownTopicPrunes []*pubsub_pb.ControlPrune + var malformedTopicPrunes []*pubsub_pb.ControlPrune + var invalidSporkIDTopicPrunes []*pubsub_pb.ControlPrune + var allTopics []string + for i := 0; i < cfg.GraftPrune.InvalidTopicIdThreshold; i++ { + // create unknown topic + unknownTopic, malformedTopic, invalidSporkIDTopic := invalidTopics(t, sporkID) + allTopics = append(allTopics, unknownTopic, malformedTopic, invalidSporkIDTopic) + unknownTopicPrunes = append(unknownTopicPrunes, unittest.P2PRPCPruneFixture(&unknownTopic)) + malformedTopicPrunes = append(malformedTopicPrunes, unittest.P2PRPCPruneFixture(&malformedTopic)) + invalidSporkIDTopicPrunes = append(invalidSporkIDTopicPrunes, unittest.P2PRPCPruneFixture(&invalidSporkIDTopic)) + } + + // avoid unknown topics errors + topicProviderOracle.UpdateTopics(allTopics) + unknownTopicReq := unittest.P2PRPCFixture(unittest.WithPrunes(unknownTopicPrunes...)) + malformedTopicReq := unittest.P2PRPCFixture(unittest.WithPrunes(malformedTopicPrunes...)) + invalidSporkIDTopicReq := unittest.P2PRPCFixture(unittest.WithPrunes(invalidSporkIDTopicPrunes...)) + + from := unittest.PeerIdFixture(t) + rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe() + // no notification should be disseminated for valid messages as long as the number of invalid topic ids is below the threshold + distributor.AssertNotCalled(t, "Distribute", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")) + inspector.Start(signalerCtx) + unittest.RequireComponentsReadyBefore(t, 1*time.Second, inspector) + + require.NoError(t, inspector.Inspect(from, unknownTopicReq)) + require.NoError(t, inspector.Inspect(from, malformedTopicReq)) + require.NoError(t, inspector.Inspect(from, invalidSporkIDTopicReq)) + + // sleep for 1 second to ensure rpc's is processed + time.Sleep(2 * time.Second) + cancel() + unittest.RequireCloseBefore(t, inspector.Done(), 5*time.Second, "inspector did not stop") +} + +// TestPruneInspection_InvalidTopic_AboveThreshold ensures inspector disseminates an invalid control message notification for +// prune messages when the invalid topic id count exceeds the configured threshold. +func TestPruneInspection_InvalidTopic_AboveThreshold(t *testing.T) { + c, err := config.DefaultConfig() + require.NoError(t, err) + cfg := &c.NetworkConfig.GossipSub.RpcInspector.Validation + inspector, signalerCtx, cancel, distributor, rpcTracker, sporkID, _, topicProviderOracle := inspectorFixture(t, func(params *validation.InspectorParams) { + params.Config = cfg + }) + + var unknownTopicPrunes []*pubsub_pb.ControlPrune + var malformedTopicPrunes []*pubsub_pb.ControlPrune + var invalidSporkIDTopicPrunes []*pubsub_pb.ControlPrune + var allTopics []string + for i := 0; i < cfg.GraftPrune.InvalidTopicIdThreshold+1; i++ { + // create unknown topic + unknownTopic, malformedTopic, invalidSporkIDTopic := invalidTopics(t, sporkID) + allTopics = append(allTopics, unknownTopic, malformedTopic, invalidSporkIDTopic) + unknownTopicPrunes = append(unknownTopicPrunes, unittest.P2PRPCPruneFixture(&unknownTopic)) + malformedTopicPrunes = append(malformedTopicPrunes, unittest.P2PRPCPruneFixture(&malformedTopic)) + invalidSporkIDTopicPrunes = append(invalidSporkIDTopicPrunes, unittest.P2PRPCPruneFixture(&invalidSporkIDTopic)) + } + + // avoid unknown topics errors + topicProviderOracle.UpdateTopics(allTopics) + unknownTopicReq := unittest.P2PRPCFixture(unittest.WithPrunes(unknownTopicPrunes...)) + malformedTopicReq := unittest.P2PRPCFixture(unittest.WithPrunes(malformedTopicPrunes...)) + invalidSporkIDTopicReq := unittest.P2PRPCFixture(unittest.WithPrunes(invalidSporkIDTopicPrunes...)) + + from := unittest.PeerIdFixture(t) + checkNotification := checkNotificationFunc(t, from, p2pmsg.CtrlMsgPrune, channels.IsInvalidTopicErr, p2p.CtrlMsgNonClusterTopicType) + rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe() + distributor.On("Distribute", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")).Return(nil).Times(3).Run(checkNotification) + + inspector.Start(signalerCtx) + unittest.RequireComponentsReadyBefore(t, 1*time.Second, inspector) + + require.NoError(t, inspector.Inspect(from, unknownTopicReq)) + require.NoError(t, inspector.Inspect(from, malformedTopicReq)) + require.NoError(t, inspector.Inspect(from, invalidSporkIDTopicReq)) + + // sleep for 1 second to ensure rpc's is processed + time.Sleep(time.Second) + cancel() + unittest.RequireCloseBefore(t, inspector.Done(), 5*time.Second, "inspector did not stop") +} + // TestPruneInspection_DuplicateTopicIds_AboveThreshold ensures inspector disseminates an invalid control message notification for // prune messages when the number of duplicate topic ids is above the threshold. func TestPruneInspection_DuplicateTopicIds_AboveThreshold(t *testing.T) { @@ -572,6 +724,99 @@ func TestIHaveInspection_InvalidTopic(t *testing.T) { unittest.RequireCloseBefore(t, inspector.Done(), 5*time.Second, "inspector did not stop") } +// TestIHaveInspection_InvalidTopic_BelowThreshold ensures inspector does not disseminate an invalid control message notification for +// ihave messages when the invalid topic id count does not exceed the configured threshold. +func TestIHaveInspection_InvalidTopic_BelowThreshold(t *testing.T) { + c, err := config.DefaultConfig() + require.NoError(t, err) + cfg := &c.NetworkConfig.GossipSub.RpcInspector.Validation + inspector, signalerCtx, cancel, distributor, rpcTracker, sporkID, _, topicProviderOracle := inspectorFixture(t, func(params *validation.InspectorParams) { + params.Config = cfg + }) + + var unknownTopicIHaves []*pubsub_pb.ControlIHave + var malformedTopicIHaves []*pubsub_pb.ControlIHave + var invalidSporkIDTopicIHaves []*pubsub_pb.ControlIHave + var allTopics []string + for i := 0; i < cfg.GraftPrune.InvalidTopicIdThreshold; i++ { + // create unknown topic + unknownTopic, malformedTopic, invalidSporkIDTopic := invalidTopics(t, sporkID) + allTopics = append(allTopics, unknownTopic, malformedTopic, invalidSporkIDTopic) + unknownTopicIHaves = append(unknownTopicIHaves, unittest.P2PRPCIHaveFixture(&unknownTopic, unittest.IdentifierListFixture(5).Strings()...)) + malformedTopicIHaves = append(malformedTopicIHaves, unittest.P2PRPCIHaveFixture(&malformedTopic, unittest.IdentifierListFixture(5).Strings()...)) + invalidSporkIDTopicIHaves = append(invalidSporkIDTopicIHaves, unittest.P2PRPCIHaveFixture(&invalidSporkIDTopic, unittest.IdentifierListFixture(5).Strings()...)) + } + + // avoid unknown topics errors + topicProviderOracle.UpdateTopics(allTopics) + unknownTopicReq := unittest.P2PRPCFixture(unittest.WithIHaves(unknownTopicIHaves...)) + malformedTopicReq := unittest.P2PRPCFixture(unittest.WithIHaves(malformedTopicIHaves...)) + invalidSporkIDTopicReq := unittest.P2PRPCFixture(unittest.WithIHaves(invalidSporkIDTopicIHaves...)) + + from := unittest.PeerIdFixture(t) + rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe() + // no notification should be disseminated for valid messages as long as the number of invalid topic ids is below the threshold + distributor.AssertNotCalled(t, "Distribute", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")) + inspector.Start(signalerCtx) + unittest.RequireComponentsReadyBefore(t, 1*time.Second, inspector) + + require.NoError(t, inspector.Inspect(from, unknownTopicReq)) + require.NoError(t, inspector.Inspect(from, malformedTopicReq)) + require.NoError(t, inspector.Inspect(from, invalidSporkIDTopicReq)) + + // sleep for 1 second to ensure rpc's is processed + time.Sleep(2 * time.Second) + cancel() + unittest.RequireCloseBefore(t, inspector.Done(), 5*time.Second, "inspector did not stop") +} + +// TestIHaveInspection_InvalidTopic_AboveThreshold ensures inspector disseminates an invalid control message notification for +// ihave messages when the invalid topic id count exceeds the configured threshold. +func TestIHaveInspection_InvalidTopic_AboveThreshold(t *testing.T) { + c, err := config.DefaultConfig() + require.NoError(t, err) + cfg := &c.NetworkConfig.GossipSub.RpcInspector.Validation + inspector, signalerCtx, cancel, distributor, rpcTracker, sporkID, _, topicProviderOracle := inspectorFixture(t, func(params *validation.InspectorParams) { + params.Config = cfg + }) + + var unknownTopicIHaves []*pubsub_pb.ControlIHave + var malformedTopicIHaves []*pubsub_pb.ControlIHave + var invalidSporkIDTopicIHaves []*pubsub_pb.ControlIHave + var allTopics []string + for i := 0; i < cfg.GraftPrune.InvalidTopicIdThreshold+1; i++ { + // create unknown topic + unknownTopic, malformedTopic, invalidSporkIDTopic := invalidTopics(t, sporkID) + allTopics = append(allTopics, unknownTopic, malformedTopic, invalidSporkIDTopic) + unknownTopicIHaves = append(unknownTopicIHaves, unittest.P2PRPCIHaveFixture(&unknownTopic, unittest.IdentifierListFixture(5).Strings()...)) + malformedTopicIHaves = append(malformedTopicIHaves, unittest.P2PRPCIHaveFixture(&malformedTopic, unittest.IdentifierListFixture(5).Strings()...)) + invalidSporkIDTopicIHaves = append(invalidSporkIDTopicIHaves, unittest.P2PRPCIHaveFixture(&invalidSporkIDTopic, unittest.IdentifierListFixture(5).Strings()...)) + } + + // avoid unknown topics errors + topicProviderOracle.UpdateTopics(allTopics) + unknownTopicReq := unittest.P2PRPCFixture(unittest.WithIHaves(unknownTopicIHaves...)) + malformedTopicReq := unittest.P2PRPCFixture(unittest.WithIHaves(malformedTopicIHaves...)) + invalidSporkIDTopicReq := unittest.P2PRPCFixture(unittest.WithIHaves(invalidSporkIDTopicIHaves...)) + + from := unittest.PeerIdFixture(t) + rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe() + checkNotification := checkNotificationFunc(t, from, p2pmsg.CtrlMsgIHave, channels.IsInvalidTopicErr, p2p.CtrlMsgNonClusterTopicType) + distributor.On("Distribute", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")).Return(nil).Times(3).Run(checkNotification) + + inspector.Start(signalerCtx) + unittest.RequireComponentsReadyBefore(t, 1*time.Second, inspector) + + require.NoError(t, inspector.Inspect(from, unknownTopicReq)) + require.NoError(t, inspector.Inspect(from, malformedTopicReq)) + require.NoError(t, inspector.Inspect(from, invalidSporkIDTopicReq)) + + // sleep for 1 second to ensure rpc's is processed + time.Sleep(2 * time.Second) + cancel() + unittest.RequireCloseBefore(t, inspector.Done(), 5*time.Second, "inspector did not stop") +} + // TestIHaveInspection_DuplicateTopicIds_BelowThreshold ensures inspector does not disseminate an invalid control message notification for // iHave messages when duplicate topic ids are below allowed threshold. func TestIHaveInspection_DuplicateTopicIds_BelowThreshold(t *testing.T) { From 46c1c7a56a3bb8691a6d5ec1c037f10a0349c95f Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 13 Feb 2024 23:43:18 -0500 Subject: [PATCH 03/10] update tests and add metrics --- module/metrics.go | 21 ++++- .../gossipsub_rpc_validation_inspector.go | 81 ++++++++++++++++++- module/metrics/noop.go | 36 +++++---- .../control_message_validation_inspector.go | 9 +-- ...ntrol_message_validation_inspector_test.go | 75 ++--------------- 5 files changed, 124 insertions(+), 98 deletions(-) diff --git a/module/metrics.go b/module/metrics.go index 834a7ec04ef..2d23549f2f5 100644 --- a/module/metrics.go +++ b/module/metrics.go @@ -310,12 +310,17 @@ type GossipSubRpcValidationInspectorMetrics interface { // // duplicateTopicIds: the total number of duplicate topic ids received by the node on the iHave messages at the end of the async inspection of the RPC. // duplicateMessageIds: the number of duplicate message ids received by the node on the iHave messages at the end of the async inspection of the RPC. - OnIHaveMessagesInspected(duplicateTopicIds int, duplicateMessageIds int) + // invalidTopicIds: the number of invalid message ids received by the node on the iHave messages at the end of the async inspection of the RPC. + OnIHaveMessagesInspected(duplicateTopicIds int, duplicateMessageIds, invalidTopicIds int) // OnIHaveDuplicateTopicIdsExceedThreshold tracks the number of times that the async inspection of iHave messages of a single RPC failed due to the total number of duplicate topic ids // received by the node on the iHave messages of that RPC exceeding the threshold, which results in a misbehaviour report. OnIHaveDuplicateTopicIdsExceedThreshold() + // OnIHaveInvalidTopicIdsExceedThreshold tracks the number of times that the async inspection of iHave messages of a single RPC failed due to the total number of invalid topic ids + // received by the node on the iHave messages of that RPC exceeding the threshold, which results in a misbehaviour report. + OnIHaveInvalidTopicIdsExceedThreshold() + // OnIHaveDuplicateMessageIdsExceedThreshold tracks the number of times that the async inspection of iHave messages of a single RPC failed due to the total number of duplicate message ids // received by the node on an iHave message exceeding the threshold, which results in a misbehaviour report. OnIHaveDuplicateMessageIdsExceedThreshold() @@ -343,19 +348,29 @@ type GossipSubRpcValidationInspectorMetrics interface { // received by the node on prune messages of the same RPC excesses threshold, which results in a misbehaviour report. OnPruneDuplicateTopicIdsExceedThreshold() + // OnPruneInvalidTopicIdsExceedThreshold tracks the number of times that the async inspection of prune messages for an RPC failed due to the number of invalid topic ids + // received by the node on prune messages of the same RPC excesses threshold, which results in a misbehaviour report. + OnPruneInvalidTopicIdsExceedThreshold() + // OnPruneMessageInspected is called at the end of the async inspection of prune messages of the RPC, regardless of the result of the inspection. // Args: // duplicateTopicIds: the number of duplicate topic ids received by the node on the prune messages of the RPC at the end of the async inspection prunes. - OnPruneMessageInspected(duplicateTopicIds int) + // invalidTopicIds: the number of invalid topic ids received by the node on the prune messages at the end of the async inspection of a single RPC. + OnPruneMessageInspected(duplicateTopicIds, invalidTopicIds int) // OnGraftDuplicateTopicIdsExceedThreshold tracks the number of times that the async inspection of the graft messages of a single RPC failed due to the number of duplicate topic ids // received by the node on graft messages of the same RPC excesses threshold, which results in a misbehaviour report. OnGraftDuplicateTopicIdsExceedThreshold() + // OnGraftInvalidTopicIdsExceedThreshold tracks the number of times that the async inspection of the graft messages of a single RPC failed due to the number of invalid topic ids + // received by the node on graft messages of the same RPC excesses threshold, which results in a misbehaviour report. + OnGraftInvalidTopicIdsExceedThreshold() + // OnGraftMessageInspected is called at the end of the async inspection of graft messages of a single RPC, regardless of the result of the inspection. // Args: // duplicateTopicIds: the number of duplicate topic ids received by the node on the graft messages at the end of the async inspection of a single RPC. - OnGraftMessageInspected(duplicateTopicIds int) + // invalidTopicIds: the number of invalid topic ids received by the node on the graft messages at the end of the async inspection of a single RPC. + OnGraftMessageInspected(duplicateTopicIds, invalidTopicIds int) // OnPublishMessageInspected is called at the end of the async inspection of publish messages of a single RPC, regardless of the result of the inspection. // It tracks the total number of errors detected during the async inspection of the rpc together with their individual breakdown. diff --git a/module/metrics/gossipsub_rpc_validation_inspector.go b/module/metrics/gossipsub_rpc_validation_inspector.go index 7460486b426..3dac6cbadc9 100644 --- a/module/metrics/gossipsub_rpc_validation_inspector.go +++ b/module/metrics/gossipsub_rpc_validation_inspector.go @@ -33,17 +33,23 @@ type GossipSubRpcValidationInspectorMetrics struct { // graft inspection graftDuplicateTopicIdsHistogram prometheus.Histogram + graftInvalidTopicIdsHistogram prometheus.Histogram graftDuplicateTopicIdsExceedThresholdCount prometheus.Counter + graftInvalidTopicIdsExceedThresholdCount prometheus.Counter // prune inspection pruneDuplicateTopicIdsHistogram prometheus.Histogram + pruneInvalidTopicIdsHistogram prometheus.Histogram pruneDuplicateTopicIdsExceedThresholdCount prometheus.Counter + pruneInvalidTopicIdsExceedThresholdCount prometheus.Counter // iHave inspection iHaveDuplicateMessageIdHistogram prometheus.Histogram iHaveDuplicateTopicIdHistogram prometheus.Histogram + iHaveInvalidTopicIdHistogram prometheus.Histogram iHaveDuplicateMessageIdExceedThresholdCount prometheus.Counter iHaveDuplicateTopicIdExceedThresholdCount prometheus.Counter + iHaveInvalidTopicIdExceedThresholdCount prometheus.Counter // iWant inspection iWantDuplicateMessageIdHistogram prometheus.Histogram @@ -167,6 +173,14 @@ func NewGossipSubRPCValidationInspectorMetrics(prefix string) *GossipSubRpcValid Help: "number of duplicate topic ids received from gossipsub protocol during the async inspection of a single RPC", }) + gc.iHaveInvalidTopicIdHistogram = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespaceNetwork, + Subsystem: subsystemGossip, + Buckets: []float64{1, 100, 1000}, + Name: gc.prefix + "rpc_inspection_ihave_invalid_topic_ids_count", + Help: "number of invalid topic ids received from gossipsub protocol during the async inspection of a single RPC", + }) + gc.iHaveDuplicateMessageIdExceedThresholdCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: namespaceNetwork, Subsystem: subsystemGossip, @@ -181,6 +195,13 @@ func NewGossipSubRPCValidationInspectorMetrics(prefix string) *GossipSubRpcValid Help: "total number of times that the async inspection of iHave messages failed due to the number of duplicate topic ids exceeding the threshold", }) + gc.iHaveInvalidTopicIdExceedThresholdCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespaceNetwork, + Subsystem: subsystemGossip, + Name: gc.prefix + "rpc_inspection_ihave_invalid_topic_ids_exceed_threshold_total", + Help: "total number of times that the async inspection of iHave messages failed due to the number of invalid topic ids exceeding the threshold", + }) + gc.iWantDuplicateMessageIdHistogram = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: namespaceNetwork, Subsystem: subsystemGossip, @@ -247,6 +268,14 @@ func NewGossipSubRPCValidationInspectorMetrics(prefix string) *GossipSubRpcValid Help: "number of duplicate topic ids on graft messages of a single RPC during the async inspection, regardless of the result of the inspection", }) + gc.graftInvalidTopicIdsHistogram = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespaceNetwork, + Subsystem: subsystemGossip, + Name: gc.prefix + "rpc_inspection_graft_invalid_topic_ids_count", + Buckets: []float64{1, 100, 1000}, + Help: "number of invalid topic ids on graft messages of a single RPC during the async inspection, regardless of the result of the inspection", + }) + gc.graftDuplicateTopicIdsExceedThresholdCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: namespaceNetwork, Subsystem: subsystemGossip, @@ -254,6 +283,13 @@ func NewGossipSubRPCValidationInspectorMetrics(prefix string) *GossipSubRpcValid Help: "number of times that the async inspection of graft messages of an rpc failed due to the number of duplicate topic ids exceeding the threshold", }) + gc.graftInvalidTopicIdsExceedThresholdCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespaceNetwork, + Subsystem: subsystemGossip, + Name: gc.prefix + "rpc_inspection_graft_invalid_topic_ids_exceed_threshold_total", + Help: "number of times that the async inspection of graft messages of an rpc failed due to the number of invalid topic ids exceeding the threshold", + }) + gc.pruneDuplicateTopicIdsHistogram = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: namespaceNetwork, Subsystem: subsystemGossip, @@ -262,6 +298,14 @@ func NewGossipSubRPCValidationInspectorMetrics(prefix string) *GossipSubRpcValid Help: "number of duplicate topic ids on prune messages of a single RPC during the async inspection, regardless of the result of the inspection", }) + gc.pruneInvalidTopicIdsHistogram = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespaceNetwork, + Subsystem: subsystemGossip, + Buckets: []float64{1, 100, 1000}, + Name: gc.prefix + "rpc_inspection_prune_invalid_topic_ids_count", + Help: "number of invalid topic ids on prune messages of a single RPC during the async inspection, regardless of the result of the inspection", + }) + gc.pruneDuplicateTopicIdsExceedThresholdCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: namespaceNetwork, Subsystem: subsystemGossip, @@ -269,6 +313,13 @@ func NewGossipSubRPCValidationInspectorMetrics(prefix string) *GossipSubRpcValid Help: "number of times that the async inspection of prune messages failed due to the number of duplicate topic ids exceeding the threshold", }) + gc.pruneInvalidTopicIdsExceedThresholdCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespaceNetwork, + Subsystem: subsystemGossip, + Name: gc.prefix + "rpc_inspection_prune_invalid_topic_ids_exceed_threshold_total", + Help: "number of times that the async inspection of prune messages failed due to the number of invalid topic ids exceeding the threshold", + }) + gc.publishMessageInspectedErrHistogram = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: namespaceNetwork, Subsystem: subsystemGossip, @@ -407,9 +458,11 @@ func (c *GossipSubRpcValidationInspectorMetrics) OnIWantCacheMissMessageIdsExcee // // duplicateTopicIds: the total number of duplicate topic ids received by the node on the iHave messages at the end of the async inspection of the RPC. // duplicateMessageIds: the number of duplicate message ids received by the node on the iHave messages at the end of the async inspection of the RPC. -func (c *GossipSubRpcValidationInspectorMetrics) OnIHaveMessagesInspected(duplicateTopicIds int, duplicateMessageIds int) { +// invalidTopicIds: the number of invalid message ids received by the node on the iHave messages at the end of the async inspection of the RPC. +func (c *GossipSubRpcValidationInspectorMetrics) OnIHaveMessagesInspected(duplicateTopicIds, duplicateMessageIds, invalidTopicIds int) { c.iHaveDuplicateTopicIdHistogram.Observe(float64(duplicateTopicIds)) c.iHaveDuplicateMessageIdHistogram.Observe(float64(duplicateMessageIds)) + c.iHaveInvalidTopicIdHistogram.Observe(float64(invalidTopicIds)) } // OnIHaveDuplicateTopicIdsExceedThreshold tracks the number of times that the async inspection of iHave messages of a single RPC failed due to the total number of duplicate topic ids @@ -424,6 +477,12 @@ func (c *GossipSubRpcValidationInspectorMetrics) OnIHaveDuplicateMessageIdsExcee c.iHaveDuplicateMessageIdExceedThresholdCount.Inc() } +// OnIHaveInvalidTopicIdsExceedThreshold tracks the number of times that the async inspection of iHave messages of a single RPC failed due to the total number of invalid topic ids +// received by the node on the iHave messages of that RPC exceeding the threshold, which results in a misbehaviour report. +func (c *GossipSubRpcValidationInspectorMetrics) OnIHaveInvalidTopicIdsExceedThreshold() { + c.iHaveInvalidTopicIdExceedThresholdCount.Inc() +} + // OnInvalidTopicIdDetectedForControlMessage tracks the number of times that the async inspection of a control message type on a single RPC failed due to an invalid topic id. // Args: // - messageType: the type of the control message that was truncated. @@ -455,12 +514,20 @@ func (c *GossipSubRpcValidationInspectorMetrics) OnPruneDuplicateTopicIdsExceedT c.pruneDuplicateTopicIdsExceedThresholdCount.Inc() } +// OnPruneInvalidTopicIdsExceedThreshold tracks the number of times that the async inspection of prune messages for an RPC failed due to the number of invalid topic ids +// received by the node on prune messages of the same RPC excesses threshold, which results in a misbehaviour report. +func (c *GossipSubRpcValidationInspectorMetrics) OnPruneInvalidTopicIdsExceedThreshold() { + c.pruneInvalidTopicIdsExceedThresholdCount.Inc() +} + // OnPruneMessageInspected is called at the end of the async inspection of prune messages of the RPC, regardless of the result of the inspection. // Args: // // duplicateTopicIds: the number of duplicate topic ids received by the node on the prune messages of the RPC at the end of the async inspection prunes. -func (c *GossipSubRpcValidationInspectorMetrics) OnPruneMessageInspected(duplicateTopicIds int) { +// invalidTopicIds: the number of invalid message ids received by the node on the prune messages at the end of the async inspection of the RPC. +func (c *GossipSubRpcValidationInspectorMetrics) OnPruneMessageInspected(duplicateTopicIds, invalidTopicIds int) { c.pruneDuplicateTopicIdsHistogram.Observe(float64(duplicateTopicIds)) + c.pruneInvalidTopicIdsHistogram.Observe(float64(invalidTopicIds)) } // OnGraftDuplicateTopicIdsExceedThreshold tracks the number of times that the async inspection of a graft message failed due to the number of duplicate topic ids. @@ -469,12 +536,20 @@ func (c *GossipSubRpcValidationInspectorMetrics) OnGraftDuplicateTopicIdsExceedT c.graftDuplicateTopicIdsExceedThresholdCount.Inc() } +// OnGraftInvalidTopicIdsExceedThreshold tracks the number of times that the async inspection of the graft messages of a single RPC failed due to the number of invalid topic ids +// received by the node on graft messages of the same RPC excesses threshold, which results in a misbehaviour report. +func (c *GossipSubRpcValidationInspectorMetrics) OnGraftInvalidTopicIdsExceedThreshold() { + c.graftInvalidTopicIdsExceedThresholdCount.Inc() +} + // OnGraftMessageInspected is called at the end of the async inspection of graft messages of a single RPC, regardless of the result of the inspection. // Args: // // duplicateTopicIds: the number of duplicate topic ids received by the node on the graft messages at the end of the async inspection of a single RPC. -func (c *GossipSubRpcValidationInspectorMetrics) OnGraftMessageInspected(duplicateTopicIds int) { +// invalidTopicIds: the number of invalid message ids received by the node on the graft messages at the end of the async inspection of the RPC. +func (c *GossipSubRpcValidationInspectorMetrics) OnGraftMessageInspected(duplicateTopicIds, invalidTopicIds int) { c.graftDuplicateTopicIdsHistogram.Observe(float64(duplicateTopicIds)) + c.graftInvalidTopicIdsHistogram.Observe(float64(invalidTopicIds)) } // OnPublishMessageInspected is called at the end of the async inspection of publish messages of a single RPC, regardless of the result of the inspection. diff --git a/module/metrics/noop.go b/module/metrics/noop.go index 04a6d80b70e..6610c3620a7 100644 --- a/module/metrics/noop.go +++ b/module/metrics/noop.go @@ -322,24 +322,28 @@ func (nc *NoopCollector) OnControlMessagesTruncated(messageType p2pmsg.ControlMe } func (nc *NoopCollector) OnIncomingRpcReceived(iHaveCount, iWantCount, graftCount, pruneCount, msgCount int) { } -func (nc *NoopCollector) AsyncProcessingStarted() {} -func (nc *NoopCollector) AsyncProcessingFinished(time.Duration) {} -func (nc *NoopCollector) OnIWantMessagesInspected(duplicateCount int, cacheMissCount int) {} -func (nc *NoopCollector) OnIWantDuplicateMessageIdsExceedThreshold() {} -func (nc *NoopCollector) OnIWantCacheMissMessageIdsExceedThreshold() {} -func (nc *NoopCollector) OnIHaveMessagesInspected(duplicateTopicIds int, duplicateMessageIds int) {} -func (nc *NoopCollector) OnIHaveDuplicateTopicIdsExceedThreshold() {} -func (nc *NoopCollector) OnIHaveDuplicateMessageIdsExceedThreshold() {} +func (nc *NoopCollector) AsyncProcessingStarted() {} +func (nc *NoopCollector) AsyncProcessingFinished(time.Duration) {} +func (nc *NoopCollector) OnIWantMessagesInspected(duplicateCount int, cacheMissCount int) {} +func (nc *NoopCollector) OnIWantDuplicateMessageIdsExceedThreshold() {} +func (nc *NoopCollector) OnIWantCacheMissMessageIdsExceedThreshold() {} +func (nc *NoopCollector) OnIHaveMessagesInspected(duplicateTopicIds int, duplicateMessageIds, invalidTopicIds int) { +} +func (nc *NoopCollector) OnIHaveDuplicateTopicIdsExceedThreshold() {} +func (nc *NoopCollector) OnIHaveInvalidTopicIdsExceedThreshold() {} +func (nc *NoopCollector) OnIHaveDuplicateMessageIdsExceedThreshold() {} func (nc *NoopCollector) OnInvalidTopicIdDetectedForControlMessage(messageType p2pmsg.ControlMessageType) { } -func (nc *NoopCollector) OnActiveClusterIDsNotSetErr() {} -func (nc *NoopCollector) OnUnstakedPeerInspectionFailed() {} -func (nc *NoopCollector) OnInvalidControlMessageNotificationSent() {} -func (nc *NoopCollector) OnPublishMessagesInspectionErrorExceedsThreshold() {} -func (nc *NoopCollector) OnPruneDuplicateTopicIdsExceedThreshold() {} -func (nc *NoopCollector) OnPruneMessageInspected(duplicateTopicIds int) {} -func (nc *NoopCollector) OnGraftDuplicateTopicIdsExceedThreshold() {} -func (nc *NoopCollector) OnGraftMessageInspected(duplicateTopicIds int) {} +func (nc *NoopCollector) OnActiveClusterIDsNotSetErr() {} +func (nc *NoopCollector) OnUnstakedPeerInspectionFailed() {} +func (nc *NoopCollector) OnInvalidControlMessageNotificationSent() {} +func (nc *NoopCollector) OnPublishMessagesInspectionErrorExceedsThreshold() {} +func (nc *NoopCollector) OnPruneDuplicateTopicIdsExceedThreshold() {} +func (nc *NoopCollector) OnPruneInvalidTopicIdsExceedThreshold() {} +func (nc *NoopCollector) OnPruneMessageInspected(duplicateTopicIds, invalidTopicIds int) {} +func (nc *NoopCollector) OnGraftDuplicateTopicIdsExceedThreshold() {} +func (nc *NoopCollector) OnGraftInvalidTopicIdsExceedThreshold() {} +func (nc *NoopCollector) OnGraftMessageInspected(duplicateTopicIds, invalidTopicIds int) {} func (nc *NoopCollector) OnPublishMessageInspected(totalErrCount int, invalidTopicIdsCount int, invalidSubscriptionsCount int, invalidSendersCount int) { } diff --git a/network/p2p/inspector/validation/control_message_validation_inspector.go b/network/p2p/inspector/validation/control_message_validation_inspector.go index 6547075c119..308869731af 100644 --- a/network/p2p/inspector/validation/control_message_validation_inspector.go +++ b/network/p2p/inspector/validation/control_message_validation_inspector.go @@ -369,7 +369,7 @@ func (c *ControlMsgValidationInspector) inspectGraftMessages(from peer.ID, graft totalInvalidTopicIdErrs := 0 defer func() { // regardless of inspection result, update metrics - c.metrics.OnGraftMessageInspected(totalDuplicateTopicIds) + c.metrics.OnGraftMessageInspected(totalDuplicateTopicIds, totalInvalidTopicIdErrs) }() for _, graft := range grafts { @@ -386,7 +386,6 @@ func (c *ControlMsgValidationInspector) inspectGraftMessages(from peer.ID, graft err, ctrlMsgType := c.validateTopic(from, topic, activeClusterIDS) if err != nil { totalInvalidTopicIdErrs++ - // TODO: consider adding a threshold for this error similar to the duplicate topic id threshold. c.metrics.OnInvalidTopicIdDetectedForControlMessage(p2pmsg.CtrlMsgGraft) if totalInvalidTopicIdErrs > c.config.GraftPrune.InvalidTopicIdThreshold { return err, ctrlMsgType @@ -420,7 +419,7 @@ func (c *ControlMsgValidationInspector) inspectPruneMessages(from peer.ID, prune totalInvalidTopicIdErrs := 0 defer func() { // regardless of inspection result, update metrics - c.metrics.OnPruneMessageInspected(totalDuplicateTopicIds) + c.metrics.OnPruneMessageInspected(totalDuplicateTopicIds, totalInvalidTopicIdErrs) }() for _, prune := range prunes { topic := channels.Topic(prune.GetTopicID()) @@ -436,7 +435,6 @@ func (c *ControlMsgValidationInspector) inspectPruneMessages(from peer.ID, prune err, ctrlMsgType := c.validateTopic(from, topic, activeClusterIDS) if err != nil { totalInvalidTopicIdErrs++ - // TODO: consider adding a threshold for this error similar to the duplicate topic id threshold. c.metrics.OnInvalidTopicIdDetectedForControlMessage(p2pmsg.CtrlMsgPrune) if totalInvalidTopicIdErrs > c.config.GraftPrune.InvalidTopicIdThreshold { return err, ctrlMsgType @@ -482,7 +480,7 @@ func (c *ControlMsgValidationInspector) inspectIHaveMessages(from peer.ID, ihave totalInvalidTopicIdErrs := 0 defer func() { // regardless of inspection result, update metrics - c.metrics.OnIHaveMessagesInspected(totalDuplicateTopicIds, totalDuplicateMessageIds) + c.metrics.OnIHaveMessagesInspected(totalDuplicateTopicIds, totalDuplicateMessageIds, totalInvalidTopicIdErrs) }() for _, ihave := range ihaves { messageIds := ihave.GetMessageIDs() @@ -493,7 +491,6 @@ func (c *ControlMsgValidationInspector) inspectIHaveMessages(from peer.ID, ihave err, ctrlMsgType := c.validateTopic(from, channels.Topic(topic), activeClusterIDS) if err != nil { totalInvalidTopicIdErrs++ - // TODO: consider adding a threshold for this error similar to the duplicate topic id threshold. c.metrics.OnInvalidTopicIdDetectedForControlMessage(p2pmsg.CtrlMsgIHave) if totalInvalidTopicIdErrs > c.config.IHave.InvalidTopicIdThreshold { return err, ctrlMsgType diff --git a/network/p2p/inspector/validation/control_message_validation_inspector_test.go b/network/p2p/inspector/validation/control_message_validation_inspector_test.go index f5f8d1b8722..6ebea061322 100644 --- a/network/p2p/inspector/validation/control_message_validation_inspector_test.go +++ b/network/p2p/inspector/validation/control_message_validation_inspector_test.go @@ -129,12 +129,11 @@ func TestControlMessageValidationInspector_truncateRPC(t *testing.T) { // topic validation is ignored set any topic oracle rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe() rpcTracker.On("WasIHaveRPCSent", mock.AnythingOfType("string")).Return(true).Maybe() - distributor.On("Distribute", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")).Return(nil).Twice() + distributor.On("Distribute", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")).Return(nil).Maybe() inspector.Start(signalerCtx) unittest.RequireComponentsReadyBefore(t, 1*time.Second, inspector) - // unittest.RequireCloseBefore(t, inspector.Ready(), 100*time.Millisecond, "inspector did not start") // topic validation not performed, so we can use random strings prunesGreaterThanMaxSampleSize := unittest.P2PRPCFixture(unittest.WithPrunes(unittest.P2PRPCPruneFixtures(unittest.IdentifierListFixture(2000).Strings()...)...)) require.Greater(t, len(prunesGreaterThanMaxSampleSize.GetControl().GetPrune()), graftPruneMessageMaxSampleSize) @@ -162,7 +161,7 @@ func TestControlMessageValidationInspector_truncateRPC(t *testing.T) { // topic validation is ignored set any topic oracle rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe() rpcTracker.On("WasIHaveRPCSent", mock.AnythingOfType("string")).Return(true).Maybe() - distributor.On("Distribute", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")).Return(nil).Twice() + distributor.On("Distribute", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")).Return(nil).Maybe() inspector.Start(signalerCtx) unittest.RequireComponentsReadyBefore(t, 1*time.Second, inspector) @@ -194,7 +193,7 @@ func TestControlMessageValidationInspector_truncateRPC(t *testing.T) { // topic validation is ignored set any topic oracle rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe() rpcTracker.On("WasIHaveRPCSent", mock.AnythingOfType("string")).Return(true).Maybe() - distributor.On("Distribute", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")).Return(nil).Twice() + distributor.On("Distribute", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")).Return(nil).Maybe() inspector.Start(signalerCtx) unittest.RequireComponentsReadyBefore(t, 1*time.Second, inspector) @@ -658,72 +657,6 @@ func TestPrueInspection_DuplicateTopicIds_BelowThreshold(t *testing.T) { unittest.RequireCloseBefore(t, inspector.Done(), 5*time.Second, "inspector did not stop") } -// TestPruneInspection_InvalidTopic ensures inspector disseminates an invalid control message notification for -// prune messages when the topic is invalid. -func TestPruneInspection_InvalidTopic(t *testing.T) { - inspector, signalerCtx, cancel, distributor, rpcTracker, sporkID, _, topicProviderOracle := inspectorFixture(t) - // create unknown topic - unknownTopic, malformedTopic, invalidSporkIDTopic := invalidTopics(t, sporkID) - unknownTopicPrune := unittest.P2PRPCPruneFixture(&unknownTopic) - malformedTopicPrune := unittest.P2PRPCPruneFixture(&malformedTopic) - invalidSporkIDTopicPrune := unittest.P2PRPCPruneFixture(&invalidSporkIDTopic) - // avoid unknown topics errors - topicProviderOracle.UpdateTopics([]string{unknownTopic, malformedTopic, invalidSporkIDTopic}) - unknownTopicRpc := unittest.P2PRPCFixture(unittest.WithPrunes(unknownTopicPrune)) - malformedTopicRpc := unittest.P2PRPCFixture(unittest.WithPrunes(malformedTopicPrune)) - invalidSporkIDTopicRpc := unittest.P2PRPCFixture(unittest.WithPrunes(invalidSporkIDTopicPrune)) - - from := unittest.PeerIdFixture(t) - checkNotification := checkNotificationFunc(t, from, p2pmsg.CtrlMsgPrune, channels.IsInvalidTopicErr, p2p.CtrlMsgNonClusterTopicType) - rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe() - - distributor.On("Distribute", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")).Return(nil).Times(3).Run(checkNotification) - - inspector.Start(signalerCtx) - unittest.RequireComponentsReadyBefore(t, 1*time.Second, inspector) - - require.NoError(t, inspector.Inspect(from, unknownTopicRpc)) - require.NoError(t, inspector.Inspect(from, malformedTopicRpc)) - require.NoError(t, inspector.Inspect(from, invalidSporkIDTopicRpc)) - // sleep for 1 second to ensure rpc's is processed - time.Sleep(time.Second) - cancel() - unittest.RequireCloseBefore(t, inspector.Done(), 5*time.Second, "inspector did not stop") -} - -// TestIHaveInspection_InvalidTopic ensures inspector disseminates an invalid control message notification for -// iHave messages when the topic is invalid. -func TestIHaveInspection_InvalidTopic(t *testing.T) { - inspector, signalerCtx, cancel, distributor, rpcTracker, sporkID, _, topicProviderOracle := inspectorFixture(t) - // create unknown topic - unknownTopic, malformedTopic, invalidSporkIDTopic := invalidTopics(t, sporkID) - // avoid unknown topics errors - topicProviderOracle.UpdateTopics([]string{unknownTopic, malformedTopic, invalidSporkIDTopic}) - unknownTopicIhave := unittest.P2PRPCIHaveFixture(&unknownTopic, unittest.IdentifierListFixture(5).Strings()...) - malformedTopicIhave := unittest.P2PRPCIHaveFixture(&malformedTopic, unittest.IdentifierListFixture(5).Strings()...) - invalidSporkIDTopicIhave := unittest.P2PRPCIHaveFixture(&invalidSporkIDTopic, unittest.IdentifierListFixture(5).Strings()...) - - unknownTopicRpc := unittest.P2PRPCFixture(unittest.WithIHaves(unknownTopicIhave)) - malformedTopicRpc := unittest.P2PRPCFixture(unittest.WithIHaves(malformedTopicIhave)) - invalidSporkIDTopicRpc := unittest.P2PRPCFixture(unittest.WithIHaves(invalidSporkIDTopicIhave)) - - from := unittest.PeerIdFixture(t) - checkNotification := checkNotificationFunc(t, from, p2pmsg.CtrlMsgIHave, channels.IsInvalidTopicErr, p2p.CtrlMsgNonClusterTopicType) - rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe() - - distributor.On("Distribute", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")).Return(nil).Times(3).Run(checkNotification) - inspector.Start(signalerCtx) - unittest.RequireComponentsReadyBefore(t, 1*time.Second, inspector) - - require.NoError(t, inspector.Inspect(from, unknownTopicRpc)) - require.NoError(t, inspector.Inspect(from, malformedTopicRpc)) - require.NoError(t, inspector.Inspect(from, invalidSporkIDTopicRpc)) - // sleep for 1 second to ensure rpc's is processed - time.Sleep(time.Second) - cancel() - unittest.RequireCloseBefore(t, inspector.Done(), 5*time.Second, "inspector did not stop") -} - // TestIHaveInspection_InvalidTopic_BelowThreshold ensures inspector does not disseminate an invalid control message notification for // ihave messages when the invalid topic id count does not exceed the configured threshold. func TestIHaveInspection_InvalidTopic_BelowThreshold(t *testing.T) { @@ -1342,6 +1275,8 @@ func TestNewControlMsgValidationInspector_validateClusterPrefixedTopic(t *testin inspector, signalerCtx, cancel, distributor, rpcTracker, sporkID, idProvider, topicProviderOracle := inspectorFixture(t, func(params *validation.InspectorParams) { // the 11th unknown cluster ID error should cause an error params.Config.ClusterPrefixedMessage.HardThreshold = 10 + // disable invalid topic threshold return an error always + params.Config.GraftPrune.InvalidTopicIdThreshold = 0 }) clusterID := flow.ChainID(unittest.IdentifierFixture().String()) clusterPrefixedTopic := channels.Topic(fmt.Sprintf("%s/%s", channels.SyncCluster(clusterID), sporkID)).String() From c03eb77fa6e30a1da4d579705fceae46b0213e41 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 21 Feb 2024 09:36:53 -0500 Subject: [PATCH 04/10] Update config/default-config.yml Co-authored-by: Yahya Hassanzadeh, Ph.D. --- config/default-config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/default-config.yml b/config/default-config.yml index 54e96c35d52..ad9388d1292 100644 --- a/config/default-config.yml +++ b/config/default-config.yml @@ -160,7 +160,7 @@ network-config: # to avoid penalizing peers that are not malicious but are misbehaving due to bugs or other issues. # A topic id is considered duplicate if it appears more than once in a single GRAFT or PRUNE message. duplicate-topic-id-threshold: 50 - # Maximum number of total invalid topic ids in a single GRAFT or PRUNE message, ideally this should be 0 but we allow for some tolerance + # Maximum number of total invalid topic ids in GRAFTs/PRUNEs of a single RPC, ideally this should be 0 but we allow for some tolerance # to avoid penalizing peers that are not malicious but are misbehaving due to bugs or other issues. invalid-topic-id-threshold: 50 ihave: From cc41ebc13428d734b6a0c6ab8d07c3657d5fbbad Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 21 Feb 2024 09:36:59 -0500 Subject: [PATCH 05/10] Update config/default-config.yml Co-authored-by: Yahya Hassanzadeh, Ph.D. --- config/default-config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/default-config.yml b/config/default-config.yml index ad9388d1292..ff549f4ff72 100644 --- a/config/default-config.yml +++ b/config/default-config.yml @@ -161,7 +161,7 @@ network-config: # A topic id is considered duplicate if it appears more than once in a single GRAFT or PRUNE message. duplicate-topic-id-threshold: 50 # Maximum number of total invalid topic ids in GRAFTs/PRUNEs of a single RPC, ideally this should be 0 but we allow for some tolerance - # to avoid penalizing peers that are not malicious but are misbehaving due to bugs or other issues. + # to avoid penalizing peers that are not malicious but are misbehaving due to bugs or other issues. Exceeding this threshold causes RPC inspection failure with an invalid control message notification (penalty). invalid-topic-id-threshold: 50 ihave: # The maximum allowed number of iHave messages in a single RPC message. From d1564abfebb439e228eaf7f266e9bad7123874bc Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 21 Feb 2024 09:37:05 -0500 Subject: [PATCH 06/10] Update config/default-config.yml Co-authored-by: Yahya Hassanzadeh, Ph.D. --- config/default-config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/default-config.yml b/config/default-config.yml index ff549f4ff72..acb009836d1 100644 --- a/config/default-config.yml +++ b/config/default-config.yml @@ -184,7 +184,7 @@ network-config: # Ideally, an iHave message should not have any duplicate message IDs, hence a message id is considered duplicate when it is repeated more than once # within the same iHave message. When the total number of duplicate message ids in a single iHave message exceeds this threshold, the inspection of message will fail. duplicate-message-id-threshold: 100 - # Maximum number of total invalid topic ids in a single IHAVE message, ideally this should be 0 but we allow for some tolerance + # Maximum number of total invalid topic ids in an IHAVE message on a single RPC, ideally this should be 0 but we allow for some tolerance # to avoid penalizing peers that are not malicious but are misbehaving due to bugs or other issues. invalid-topic-id-threshold: 50 iwant: From 0ada021f023eef58cf2159dbd5c1e5e1c986d4d7 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 21 Feb 2024 09:37:11 -0500 Subject: [PATCH 07/10] Update config/default-config.yml Co-authored-by: Yahya Hassanzadeh, Ph.D. --- config/default-config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/default-config.yml b/config/default-config.yml index acb009836d1..e7108afb20d 100644 --- a/config/default-config.yml +++ b/config/default-config.yml @@ -185,7 +185,7 @@ network-config: # within the same iHave message. When the total number of duplicate message ids in a single iHave message exceeds this threshold, the inspection of message will fail. duplicate-message-id-threshold: 100 # Maximum number of total invalid topic ids in an IHAVE message on a single RPC, ideally this should be 0 but we allow for some tolerance - # to avoid penalizing peers that are not malicious but are misbehaving due to bugs or other issues. + # to avoid penalizing peers that are not malicious but are misbehaving due to bugs or other issues. Exceeding this threshold causes RPC inspection failure with an invalid control message notification (penalty). invalid-topic-id-threshold: 50 iwant: # The maximum allowed number of iWant messages in a single RPC message. From a6cd7ba676f405adf8e9b0354b31aee06e960391 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 21 Feb 2024 10:38:26 -0500 Subject: [PATCH 08/10] add *ThresholdExceeded errors for invalid topic ID and duplicate topic ID --- .../validation_inspector_test.go | 2 +- .../control_message_validation_inspector.go | 12 +++--- ...ntrol_message_validation_inspector_test.go | 14 +++--- network/p2p/inspector/validation/errors.go | 43 +++++++++++++++++++ .../p2p/inspector/validation/errors_test.go | 24 +++++++++++ 5 files changed, 81 insertions(+), 14 deletions(-) diff --git a/insecure/integration/functional/test/gossipsub/rpc_inspector/validation_inspector_test.go b/insecure/integration/functional/test/gossipsub/rpc_inspector/validation_inspector_test.go index e8cf9bb1566..e4f4b7e61d6 100644 --- a/insecure/integration/functional/test/gossipsub/rpc_inspector/validation_inspector_test.go +++ b/insecure/integration/functional/test/gossipsub/rpc_inspector/validation_inspector_test.go @@ -199,7 +199,7 @@ func TestValidationInspector_DuplicateTopicId_Detection(t *testing.T) { notification, ok := args[0].(*p2p.InvCtrlMsgNotif) require.True(t, ok) require.Equal(t, notification.TopicType, p2p.CtrlMsgNonClusterTopicType, "IsClusterPrefixed is expected to be false, no RPC with cluster prefixed topic sent in this test") - require.True(t, validation.IsDuplicateTopicErr(notification.Error)) + require.True(t, validation.IsDuplicateTopicIDThresholdExceeded(notification.Error)) require.Equal(t, spammer.SpammerNode.ID(), notification.PeerID) switch notification.MsgType { case p2pmsg.CtrlMsgGraft: diff --git a/network/p2p/inspector/validation/control_message_validation_inspector.go b/network/p2p/inspector/validation/control_message_validation_inspector.go index 308869731af..09e94143004 100644 --- a/network/p2p/inspector/validation/control_message_validation_inspector.go +++ b/network/p2p/inspector/validation/control_message_validation_inspector.go @@ -380,7 +380,7 @@ func (c *ControlMsgValidationInspector) inspectGraftMessages(from peer.ID, graft // check if the total number of duplicates exceeds the configured threshold. if totalDuplicateTopicIds > c.config.GraftPrune.DuplicateTopicIdThreshold { c.metrics.OnGraftDuplicateTopicIdsExceedThreshold() - return NewDuplicateTopicErr(topic.String(), totalDuplicateTopicIds, p2pmsg.CtrlMsgGraft), p2p.CtrlMsgNonClusterTopicType + return NewDuplicateTopicIDThresholdExceeded(totalDuplicateTopicIds, len(grafts), c.config.GraftPrune.DuplicateTopicIdThreshold), p2p.CtrlMsgNonClusterTopicType } } err, ctrlMsgType := c.validateTopic(from, topic, activeClusterIDS) @@ -388,7 +388,7 @@ func (c *ControlMsgValidationInspector) inspectGraftMessages(from peer.ID, graft totalInvalidTopicIdErrs++ c.metrics.OnInvalidTopicIdDetectedForControlMessage(p2pmsg.CtrlMsgGraft) if totalInvalidTopicIdErrs > c.config.GraftPrune.InvalidTopicIdThreshold { - return err, ctrlMsgType + return NewInvalidTopicIDThresholdExceeded(totalInvalidTopicIdErrs, c.config.GraftPrune.InvalidTopicIdThreshold), ctrlMsgType } } } @@ -429,7 +429,7 @@ func (c *ControlMsgValidationInspector) inspectPruneMessages(from peer.ID, prune // check if the total number of duplicates exceeds the configured threshold. if totalDuplicateTopicIds > c.config.GraftPrune.DuplicateTopicIdThreshold { c.metrics.OnPruneDuplicateTopicIdsExceedThreshold() - return NewDuplicateTopicErr(topic.String(), totalDuplicateTopicIds, p2pmsg.CtrlMsgPrune), p2p.CtrlMsgNonClusterTopicType + return NewDuplicateTopicIDThresholdExceeded(totalDuplicateTopicIds, len(prunes), c.config.GraftPrune.DuplicateTopicIdThreshold), p2p.CtrlMsgNonClusterTopicType } } err, ctrlMsgType := c.validateTopic(from, topic, activeClusterIDS) @@ -437,7 +437,7 @@ func (c *ControlMsgValidationInspector) inspectPruneMessages(from peer.ID, prune totalInvalidTopicIdErrs++ c.metrics.OnInvalidTopicIdDetectedForControlMessage(p2pmsg.CtrlMsgPrune) if totalInvalidTopicIdErrs > c.config.GraftPrune.InvalidTopicIdThreshold { - return err, ctrlMsgType + return NewInvalidTopicIDThresholdExceeded(totalInvalidTopicIdErrs, c.config.GraftPrune.InvalidTopicIdThreshold), ctrlMsgType } } } @@ -493,7 +493,7 @@ func (c *ControlMsgValidationInspector) inspectIHaveMessages(from peer.ID, ihave totalInvalidTopicIdErrs++ c.metrics.OnInvalidTopicIdDetectedForControlMessage(p2pmsg.CtrlMsgIHave) if totalInvalidTopicIdErrs > c.config.IHave.InvalidTopicIdThreshold { - return err, ctrlMsgType + return NewInvalidTopicIDThresholdExceeded(totalInvalidTopicIdErrs, c.config.IHave.InvalidTopicIdThreshold), ctrlMsgType } } @@ -503,7 +503,7 @@ func (c *ControlMsgValidationInspector) inspectIHaveMessages(from peer.ID, ihave // the topic is duplicated, check if the total number of duplicates exceeds the configured threshold if totalDuplicateTopicIds > c.config.IHave.DuplicateTopicIdThreshold { c.metrics.OnIHaveDuplicateTopicIdsExceedThreshold() - return NewDuplicateTopicErr(topic, totalDuplicateTopicIds, p2pmsg.CtrlMsgIHave), p2p.CtrlMsgNonClusterTopicType + return NewDuplicateTopicIDThresholdExceeded(totalDuplicateTopicIds, len(ihaves), c.config.IHave.DuplicateTopicIdThreshold), p2p.CtrlMsgNonClusterTopicType } } diff --git a/network/p2p/inspector/validation/control_message_validation_inspector_test.go b/network/p2p/inspector/validation/control_message_validation_inspector_test.go index 6ebea061322..63e29be0fe7 100644 --- a/network/p2p/inspector/validation/control_message_validation_inspector_test.go +++ b/network/p2p/inspector/validation/control_message_validation_inspector_test.go @@ -417,7 +417,7 @@ func TestGraftInspection_InvalidTopic_AboveThreshold(t *testing.T) { invalidSporkIDTopicReq := unittest.P2PRPCFixture(unittest.WithGrafts(invalidSporkIDTopicGrafts...)) from := unittest.PeerIdFixture(t) - checkNotification := checkNotificationFunc(t, from, p2pmsg.CtrlMsgGraft, channels.IsInvalidTopicErr, p2p.CtrlMsgNonClusterTopicType) + checkNotification := checkNotificationFunc(t, from, p2pmsg.CtrlMsgGraft, validation.IsInvalidTopicIDThresholdExceeded, p2p.CtrlMsgNonClusterTopicType) rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe() distributor.On("Distribute", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")).Return(nil).Times(3).Run(checkNotification) @@ -484,7 +484,7 @@ func TestGraftInspection_DuplicateTopicIds_AboveThreshold(t *testing.T) { require.Equal(t, notification.TopicType, p2p.CtrlMsgNonClusterTopicType, "expected p2p.CtrlMsgNonClusterTopicType notification type, no RPC with cluster prefixed topic sent in this test") require.Equal(t, from, notification.PeerID) require.Equal(t, p2pmsg.CtrlMsgGraft, notification.MsgType) - require.True(t, validation.IsDuplicateTopicErr(notification.Error)) + require.True(t, validation.IsDuplicateTopicIDThresholdExceeded(notification.Error)) }) inspector.Start(signalerCtx) @@ -573,7 +573,7 @@ func TestPruneInspection_InvalidTopic_AboveThreshold(t *testing.T) { invalidSporkIDTopicReq := unittest.P2PRPCFixture(unittest.WithPrunes(invalidSporkIDTopicPrunes...)) from := unittest.PeerIdFixture(t) - checkNotification := checkNotificationFunc(t, from, p2pmsg.CtrlMsgPrune, channels.IsInvalidTopicErr, p2p.CtrlMsgNonClusterTopicType) + checkNotification := checkNotificationFunc(t, from, p2pmsg.CtrlMsgPrune, validation.IsInvalidTopicIDThresholdExceeded, p2p.CtrlMsgNonClusterTopicType) rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe() distributor.On("Distribute", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")).Return(nil).Times(3).Run(checkNotification) @@ -614,7 +614,7 @@ func TestPruneInspection_DuplicateTopicIds_AboveThreshold(t *testing.T) { require.Equal(t, notification.TopicType, p2p.CtrlMsgNonClusterTopicType, "expected p2p.CtrlMsgNonClusterTopicType notification type, no RPC with cluster prefixed topic sent in this test") require.Equal(t, from, notification.PeerID) require.Equal(t, p2pmsg.CtrlMsgPrune, notification.MsgType) - require.True(t, validation.IsDuplicateTopicErr(notification.Error)) + require.True(t, validation.IsDuplicateTopicIDThresholdExceeded(notification.Error)) }) inspector.Start(signalerCtx) @@ -734,7 +734,7 @@ func TestIHaveInspection_InvalidTopic_AboveThreshold(t *testing.T) { from := unittest.PeerIdFixture(t) rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe() - checkNotification := checkNotificationFunc(t, from, p2pmsg.CtrlMsgIHave, channels.IsInvalidTopicErr, p2p.CtrlMsgNonClusterTopicType) + checkNotification := checkNotificationFunc(t, from, p2pmsg.CtrlMsgIHave, validation.IsInvalidTopicIDThresholdExceeded, p2p.CtrlMsgNonClusterTopicType) distributor.On("Distribute", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")).Return(nil).Times(3).Run(checkNotification) inspector.Start(signalerCtx) @@ -806,7 +806,7 @@ func TestIHaveInspection_DuplicateTopicIds_AboveThreshold(t *testing.T) { rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe() // one notification should be disseminated for invalid messages when the number of duplicates exceeds the threshold - checkNotification := checkNotificationFunc(t, from, p2pmsg.CtrlMsgIHave, validation.IsDuplicateTopicErr, p2p.CtrlMsgNonClusterTopicType) + checkNotification := checkNotificationFunc(t, from, p2pmsg.CtrlMsgIHave, validation.IsDuplicateTopicIDThresholdExceeded, p2p.CtrlMsgNonClusterTopicType) distributor.On("Distribute", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")).Return(nil).Once().Run(checkNotification) inspector.Start(signalerCtx) unittest.RequireComponentsReadyBefore(t, 1*time.Second, inspector) @@ -1284,7 +1284,7 @@ func TestNewControlMsgValidationInspector_validateClusterPrefixedTopic(t *testin from := unittest.PeerIdFixture(t) identity := unittest.IdentityFixture() idProvider.On("ByPeerID", from).Return(identity, true).Times(11) - checkNotification := checkNotificationFunc(t, from, p2pmsg.CtrlMsgGraft, channels.IsUnknownClusterIDErr, p2p.CtrlMsgTopicTypeClusterPrefixed) + checkNotification := checkNotificationFunc(t, from, p2pmsg.CtrlMsgGraft, validation.IsInvalidTopicIDThresholdExceeded, p2p.CtrlMsgTopicTypeClusterPrefixed) inspectMsgRpc := unittest.P2PRPCFixture(unittest.WithGrafts(unittest.P2PRPCGraftFixture(&clusterPrefixedTopic))) inspector.ActiveClustersChanged(flow.ChainIDList{flow.ChainID(unittest.IdentifierFixture().String())}) rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe() diff --git a/network/p2p/inspector/validation/errors.go b/network/p2p/inspector/validation/errors.go index bb73f9cba9b..be58d768d59 100644 --- a/network/p2p/inspector/validation/errors.go +++ b/network/p2p/inspector/validation/errors.go @@ -173,3 +173,46 @@ func IsInvalidRpcPublishMessagesErr(err error) bool { var e InvalidRpcPublishMessagesErr return errors.As(err, &e) } + +// DuplicateTopicIDThresholdExceeded indicates that the number of duplicate topic IDs exceeds the allowed threshold. +type DuplicateTopicIDThresholdExceeded struct { + duplicates int + sampleSize int + threshold int +} + +func (e DuplicateTopicIDThresholdExceeded) Error() string { + return fmt.Sprintf("%d/%d duplicate topic IDs exceed the allowed threshold: %d", e.duplicates, e.sampleSize, e.threshold) +} + +// NewDuplicateTopicIDThresholdExceeded returns a new DuplicateTopicIDThresholdExceeded error. +func NewDuplicateTopicIDThresholdExceeded(duplicates int, sampleSize int, threshold int) DuplicateTopicIDThresholdExceeded { + return DuplicateTopicIDThresholdExceeded{duplicates, sampleSize, threshold} +} + +// IsDuplicateTopicIDThresholdExceeded returns true if an error is DuplicateTopicIDThresholdExceeded +func IsDuplicateTopicIDThresholdExceeded(err error) bool { + var e DuplicateTopicIDThresholdExceeded + return errors.As(err, &e) +} + +// InvalidTopicIDThresholdExceeded indicates that the number of invalid topic IDs exceeds the allowed threshold. +type InvalidTopicIDThresholdExceeded struct { + invalidCount int + threshold int +} + +func (e InvalidTopicIDThresholdExceeded) Error() string { + return fmt.Sprintf("%d invalid topic IDs exceed the allowed threshold: %d", e.invalidCount, e.threshold) +} + +// NewInvalidTopicIDThresholdExceeded returns a new InvalidTopicIDThresholdExceeded error. +func NewInvalidTopicIDThresholdExceeded(invalidCount, threshold int) InvalidTopicIDThresholdExceeded { + return InvalidTopicIDThresholdExceeded{invalidCount, threshold} +} + +// IsInvalidTopicIDThresholdExceeded returns true if an error is InvalidTopicIDThresholdExceeded. +func IsInvalidTopicIDThresholdExceeded(err error) bool { + var e InvalidTopicIDThresholdExceeded + return errors.As(err, &e) +} diff --git a/network/p2p/inspector/validation/errors_test.go b/network/p2p/inspector/validation/errors_test.go index 29072fbd5f7..cc56ca52fde 100644 --- a/network/p2p/inspector/validation/errors_test.go +++ b/network/p2p/inspector/validation/errors_test.go @@ -105,3 +105,27 @@ func TestInvalidRpcPublishMessagesErrRoundTrip(t *testing.T) { dummyErr := fmt.Errorf("dummy error") assert.False(t, IsInvalidRpcPublishMessagesErr(dummyErr), "IsInvalidRpcPublishMessagesErr should return false for non-InvalidRpcPublishMessagesErr error") } + +// TestErrDuplicateTopicIDThresholdExceededRoundTrip ensures correct error formatting for DuplicateTopicIDThresholdExceeded. +func TestDuplicateTopicIDThresholdExceededRoundTrip(t *testing.T) { + expectedErrorMsg := "3/5 duplicate topic IDs exceed the allowed threshold: 2" + err := NewDuplicateTopicIDThresholdExceeded(3, 5, 2) + assert.Equal(t, expectedErrorMsg, err.Error(), "the error message should be correctly formatted") + // tests the IsDuplicateTopicIDThresholdExceeded function. + assert.True(t, IsDuplicateTopicIDThresholdExceeded(err), "IsDuplicateTopicIDThresholdExceeded should return true for DuplicateTopicIDThresholdExceeded error") + // test IsDuplicateTopicIDThresholdExceeded with a different error type. + dummyErr := fmt.Errorf("dummy error") + assert.False(t, IsDuplicateTopicIDThresholdExceeded(dummyErr), "IsDuplicateTopicIDThresholdExceeded should return false for non-DuplicateTopicIDThresholdExceeded error") +} + +// TestErrInvalidTopicIDThresholdExceededRoundTrip ensures correct error formatting for InvalidTopicIDThresholdExceeded. +func TestInvalidTopicIDThresholdExceededRoundTrip(t *testing.T) { + expectedErrorMsg := "8 invalid topic IDs exceed the allowed threshold: 5" + err := NewInvalidTopicIDThresholdExceeded(8, 5) + assert.Equal(t, expectedErrorMsg, err.Error(), "the error message should be correctly formatted") + // tests the IsInvalidTopicIDThresholdExceeded function. + assert.True(t, IsInvalidTopicIDThresholdExceeded(err), "IsInvalidTopicIDThresholdExceeded should return true for InvalidTopicIDThresholdExceeded error") + // test IsInvalidTopicIDThresholdExceeded with a different error type. + dummyErr := fmt.Errorf("dummy error") + assert.False(t, IsInvalidTopicIDThresholdExceeded(dummyErr), "IsInvalidTopicIDThresholdExceeded should return false for non-InvalidTopicIDThresholdExceeded error") +} From fd268a3d0c6e11e44e48be9d3058202da8539d43 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 21 Feb 2024 10:56:17 -0500 Subject: [PATCH 09/10] update mocks --- module/mock/gossip_sub_metrics.go | 33 ++++++++++++++----- ...ip_sub_rpc_validation_inspector_metrics.go | 33 ++++++++++++++----- module/mock/lib_p2_p_metrics.go | 33 ++++++++++++++----- module/mock/network_metrics.go | 33 ++++++++++++++----- 4 files changed, 96 insertions(+), 36 deletions(-) diff --git a/module/mock/gossip_sub_metrics.go b/module/mock/gossip_sub_metrics.go index f7e057ea5ba..f2650ceefc3 100644 --- a/module/mock/gossip_sub_metrics.go +++ b/module/mock/gossip_sub_metrics.go @@ -56,9 +56,14 @@ func (_m *GossipSubMetrics) OnGraftDuplicateTopicIdsExceedThreshold() { _m.Called() } -// OnGraftMessageInspected provides a mock function with given fields: duplicateTopicIds -func (_m *GossipSubMetrics) OnGraftMessageInspected(duplicateTopicIds int) { - _m.Called(duplicateTopicIds) +// OnGraftInvalidTopicIdsExceedThreshold provides a mock function with given fields: +func (_m *GossipSubMetrics) OnGraftInvalidTopicIdsExceedThreshold() { + _m.Called() +} + +// OnGraftMessageInspected provides a mock function with given fields: duplicateTopicIds, invalidTopicIds +func (_m *GossipSubMetrics) OnGraftMessageInspected(duplicateTopicIds int, invalidTopicIds int) { + _m.Called(duplicateTopicIds, invalidTopicIds) } // OnIHaveControlMessageIdsTruncated provides a mock function with given fields: diff @@ -76,14 +81,19 @@ func (_m *GossipSubMetrics) OnIHaveDuplicateTopicIdsExceedThreshold() { _m.Called() } +// OnIHaveInvalidTopicIdsExceedThreshold provides a mock function with given fields: +func (_m *GossipSubMetrics) OnIHaveInvalidTopicIdsExceedThreshold() { + _m.Called() +} + // OnIHaveMessageIDsReceived provides a mock function with given fields: channel, msgIdCount func (_m *GossipSubMetrics) OnIHaveMessageIDsReceived(channel string, msgIdCount int) { _m.Called(channel, msgIdCount) } -// OnIHaveMessagesInspected provides a mock function with given fields: duplicateTopicIds, duplicateMessageIds -func (_m *GossipSubMetrics) OnIHaveMessagesInspected(duplicateTopicIds int, duplicateMessageIds int) { - _m.Called(duplicateTopicIds, duplicateMessageIds) +// OnIHaveMessagesInspected provides a mock function with given fields: duplicateTopicIds, duplicateMessageIds, invalidTopicIds +func (_m *GossipSubMetrics) OnIHaveMessagesInspected(duplicateTopicIds int, duplicateMessageIds int, invalidTopicIds int) { + _m.Called(duplicateTopicIds, duplicateMessageIds, invalidTopicIds) } // OnIPColocationFactorUpdated provides a mock function with given fields: _a0 @@ -216,9 +226,14 @@ func (_m *GossipSubMetrics) OnPruneDuplicateTopicIdsExceedThreshold() { _m.Called() } -// OnPruneMessageInspected provides a mock function with given fields: duplicateTopicIds -func (_m *GossipSubMetrics) OnPruneMessageInspected(duplicateTopicIds int) { - _m.Called(duplicateTopicIds) +// OnPruneInvalidTopicIdsExceedThreshold provides a mock function with given fields: +func (_m *GossipSubMetrics) OnPruneInvalidTopicIdsExceedThreshold() { + _m.Called() +} + +// OnPruneMessageInspected provides a mock function with given fields: duplicateTopicIds, invalidTopicIds +func (_m *GossipSubMetrics) OnPruneMessageInspected(duplicateTopicIds int, invalidTopicIds int) { + _m.Called(duplicateTopicIds, invalidTopicIds) } // OnPublishMessageInspected provides a mock function with given fields: totalErrCount, invalidTopicIdsCount, invalidSubscriptionsCount, invalidSendersCount diff --git a/module/mock/gossip_sub_rpc_validation_inspector_metrics.go b/module/mock/gossip_sub_rpc_validation_inspector_metrics.go index 84eef02f7ea..b9624e8a32a 100644 --- a/module/mock/gossip_sub_rpc_validation_inspector_metrics.go +++ b/module/mock/gossip_sub_rpc_validation_inspector_metrics.go @@ -40,9 +40,14 @@ func (_m *GossipSubRpcValidationInspectorMetrics) OnGraftDuplicateTopicIdsExceed _m.Called() } -// OnGraftMessageInspected provides a mock function with given fields: duplicateTopicIds -func (_m *GossipSubRpcValidationInspectorMetrics) OnGraftMessageInspected(duplicateTopicIds int) { - _m.Called(duplicateTopicIds) +// OnGraftInvalidTopicIdsExceedThreshold provides a mock function with given fields: +func (_m *GossipSubRpcValidationInspectorMetrics) OnGraftInvalidTopicIdsExceedThreshold() { + _m.Called() +} + +// OnGraftMessageInspected provides a mock function with given fields: duplicateTopicIds, invalidTopicIds +func (_m *GossipSubRpcValidationInspectorMetrics) OnGraftMessageInspected(duplicateTopicIds int, invalidTopicIds int) { + _m.Called(duplicateTopicIds, invalidTopicIds) } // OnIHaveControlMessageIdsTruncated provides a mock function with given fields: diff @@ -60,14 +65,19 @@ func (_m *GossipSubRpcValidationInspectorMetrics) OnIHaveDuplicateTopicIdsExceed _m.Called() } +// OnIHaveInvalidTopicIdsExceedThreshold provides a mock function with given fields: +func (_m *GossipSubRpcValidationInspectorMetrics) OnIHaveInvalidTopicIdsExceedThreshold() { + _m.Called() +} + // OnIHaveMessageIDsReceived provides a mock function with given fields: channel, msgIdCount func (_m *GossipSubRpcValidationInspectorMetrics) OnIHaveMessageIDsReceived(channel string, msgIdCount int) { _m.Called(channel, msgIdCount) } -// OnIHaveMessagesInspected provides a mock function with given fields: duplicateTopicIds, duplicateMessageIds -func (_m *GossipSubRpcValidationInspectorMetrics) OnIHaveMessagesInspected(duplicateTopicIds int, duplicateMessageIds int) { - _m.Called(duplicateTopicIds, duplicateMessageIds) +// OnIHaveMessagesInspected provides a mock function with given fields: duplicateTopicIds, duplicateMessageIds, invalidTopicIds +func (_m *GossipSubRpcValidationInspectorMetrics) OnIHaveMessagesInspected(duplicateTopicIds int, duplicateMessageIds int, invalidTopicIds int) { + _m.Called(duplicateTopicIds, duplicateMessageIds, invalidTopicIds) } // OnIWantCacheMissMessageIdsExceedThreshold provides a mock function with given fields: @@ -115,9 +125,14 @@ func (_m *GossipSubRpcValidationInspectorMetrics) OnPruneDuplicateTopicIdsExceed _m.Called() } -// OnPruneMessageInspected provides a mock function with given fields: duplicateTopicIds -func (_m *GossipSubRpcValidationInspectorMetrics) OnPruneMessageInspected(duplicateTopicIds int) { - _m.Called(duplicateTopicIds) +// OnPruneInvalidTopicIdsExceedThreshold provides a mock function with given fields: +func (_m *GossipSubRpcValidationInspectorMetrics) OnPruneInvalidTopicIdsExceedThreshold() { + _m.Called() +} + +// OnPruneMessageInspected provides a mock function with given fields: duplicateTopicIds, invalidTopicIds +func (_m *GossipSubRpcValidationInspectorMetrics) OnPruneMessageInspected(duplicateTopicIds int, invalidTopicIds int) { + _m.Called(duplicateTopicIds, invalidTopicIds) } // OnPublishMessageInspected provides a mock function with given fields: totalErrCount, invalidTopicIdsCount, invalidSubscriptionsCount, invalidSendersCount diff --git a/module/mock/lib_p2_p_metrics.go b/module/mock/lib_p2_p_metrics.go index 298a16fd936..3f5f2dc6e2f 100644 --- a/module/mock/lib_p2_p_metrics.go +++ b/module/mock/lib_p2_p_metrics.go @@ -187,9 +187,14 @@ func (_m *LibP2PMetrics) OnGraftDuplicateTopicIdsExceedThreshold() { _m.Called() } -// OnGraftMessageInspected provides a mock function with given fields: duplicateTopicIds -func (_m *LibP2PMetrics) OnGraftMessageInspected(duplicateTopicIds int) { - _m.Called(duplicateTopicIds) +// OnGraftInvalidTopicIdsExceedThreshold provides a mock function with given fields: +func (_m *LibP2PMetrics) OnGraftInvalidTopicIdsExceedThreshold() { + _m.Called() +} + +// OnGraftMessageInspected provides a mock function with given fields: duplicateTopicIds, invalidTopicIds +func (_m *LibP2PMetrics) OnGraftMessageInspected(duplicateTopicIds int, invalidTopicIds int) { + _m.Called(duplicateTopicIds, invalidTopicIds) } // OnIHaveControlMessageIdsTruncated provides a mock function with given fields: diff @@ -207,14 +212,19 @@ func (_m *LibP2PMetrics) OnIHaveDuplicateTopicIdsExceedThreshold() { _m.Called() } +// OnIHaveInvalidTopicIdsExceedThreshold provides a mock function with given fields: +func (_m *LibP2PMetrics) OnIHaveInvalidTopicIdsExceedThreshold() { + _m.Called() +} + // OnIHaveMessageIDsReceived provides a mock function with given fields: channel, msgIdCount func (_m *LibP2PMetrics) OnIHaveMessageIDsReceived(channel string, msgIdCount int) { _m.Called(channel, msgIdCount) } -// OnIHaveMessagesInspected provides a mock function with given fields: duplicateTopicIds, duplicateMessageIds -func (_m *LibP2PMetrics) OnIHaveMessagesInspected(duplicateTopicIds int, duplicateMessageIds int) { - _m.Called(duplicateTopicIds, duplicateMessageIds) +// OnIHaveMessagesInspected provides a mock function with given fields: duplicateTopicIds, duplicateMessageIds, invalidTopicIds +func (_m *LibP2PMetrics) OnIHaveMessagesInspected(duplicateTopicIds int, duplicateMessageIds int, invalidTopicIds int) { + _m.Called(duplicateTopicIds, duplicateMessageIds, invalidTopicIds) } // OnIPColocationFactorUpdated provides a mock function with given fields: _a0 @@ -357,9 +367,14 @@ func (_m *LibP2PMetrics) OnPruneDuplicateTopicIdsExceedThreshold() { _m.Called() } -// OnPruneMessageInspected provides a mock function with given fields: duplicateTopicIds -func (_m *LibP2PMetrics) OnPruneMessageInspected(duplicateTopicIds int) { - _m.Called(duplicateTopicIds) +// OnPruneInvalidTopicIdsExceedThreshold provides a mock function with given fields: +func (_m *LibP2PMetrics) OnPruneInvalidTopicIdsExceedThreshold() { + _m.Called() +} + +// OnPruneMessageInspected provides a mock function with given fields: duplicateTopicIds, invalidTopicIds +func (_m *LibP2PMetrics) OnPruneMessageInspected(duplicateTopicIds int, invalidTopicIds int) { + _m.Called(duplicateTopicIds, invalidTopicIds) } // OnPublishMessageInspected provides a mock function with given fields: totalErrCount, invalidTopicIdsCount, invalidSubscriptionsCount, invalidSendersCount diff --git a/module/mock/network_metrics.go b/module/mock/network_metrics.go index ad77ea288f0..325a028e088 100644 --- a/module/mock/network_metrics.go +++ b/module/mock/network_metrics.go @@ -217,9 +217,14 @@ func (_m *NetworkMetrics) OnGraftDuplicateTopicIdsExceedThreshold() { _m.Called() } -// OnGraftMessageInspected provides a mock function with given fields: duplicateTopicIds -func (_m *NetworkMetrics) OnGraftMessageInspected(duplicateTopicIds int) { - _m.Called(duplicateTopicIds) +// OnGraftInvalidTopicIdsExceedThreshold provides a mock function with given fields: +func (_m *NetworkMetrics) OnGraftInvalidTopicIdsExceedThreshold() { + _m.Called() +} + +// OnGraftMessageInspected provides a mock function with given fields: duplicateTopicIds, invalidTopicIds +func (_m *NetworkMetrics) OnGraftMessageInspected(duplicateTopicIds int, invalidTopicIds int) { + _m.Called(duplicateTopicIds, invalidTopicIds) } // OnIHaveControlMessageIdsTruncated provides a mock function with given fields: diff @@ -237,14 +242,19 @@ func (_m *NetworkMetrics) OnIHaveDuplicateTopicIdsExceedThreshold() { _m.Called() } +// OnIHaveInvalidTopicIdsExceedThreshold provides a mock function with given fields: +func (_m *NetworkMetrics) OnIHaveInvalidTopicIdsExceedThreshold() { + _m.Called() +} + // OnIHaveMessageIDsReceived provides a mock function with given fields: channel, msgIdCount func (_m *NetworkMetrics) OnIHaveMessageIDsReceived(channel string, msgIdCount int) { _m.Called(channel, msgIdCount) } -// OnIHaveMessagesInspected provides a mock function with given fields: duplicateTopicIds, duplicateMessageIds -func (_m *NetworkMetrics) OnIHaveMessagesInspected(duplicateTopicIds int, duplicateMessageIds int) { - _m.Called(duplicateTopicIds, duplicateMessageIds) +// OnIHaveMessagesInspected provides a mock function with given fields: duplicateTopicIds, duplicateMessageIds, invalidTopicIds +func (_m *NetworkMetrics) OnIHaveMessagesInspected(duplicateTopicIds int, duplicateMessageIds int, invalidTopicIds int) { + _m.Called(duplicateTopicIds, duplicateMessageIds, invalidTopicIds) } // OnIPColocationFactorUpdated provides a mock function with given fields: _a0 @@ -392,9 +402,14 @@ func (_m *NetworkMetrics) OnPruneDuplicateTopicIdsExceedThreshold() { _m.Called() } -// OnPruneMessageInspected provides a mock function with given fields: duplicateTopicIds -func (_m *NetworkMetrics) OnPruneMessageInspected(duplicateTopicIds int) { - _m.Called(duplicateTopicIds) +// OnPruneInvalidTopicIdsExceedThreshold provides a mock function with given fields: +func (_m *NetworkMetrics) OnPruneInvalidTopicIdsExceedThreshold() { + _m.Called() +} + +// OnPruneMessageInspected provides a mock function with given fields: duplicateTopicIds, invalidTopicIds +func (_m *NetworkMetrics) OnPruneMessageInspected(duplicateTopicIds int, invalidTopicIds int) { + _m.Called(duplicateTopicIds, invalidTopicIds) } // OnPublishMessageInspected provides a mock function with given fields: totalErrCount, invalidTopicIdsCount, invalidSubscriptionsCount, invalidSendersCount From 3e393bb26badee5f510fbc317431fea7041b6bb0 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Thu, 22 Feb 2024 20:16:20 -0500 Subject: [PATCH 10/10] merge master --- .../validation_inspector_test.go | 2 +- module/metrics.go | 2 +- ...ntrol_message_validation_inspector_test.go | 136 ++++++++++++++++++ 3 files changed, 138 insertions(+), 2 deletions(-) diff --git a/insecure/integration/functional/test/gossipsub/rpc_inspector/validation_inspector_test.go b/insecure/integration/functional/test/gossipsub/rpc_inspector/validation_inspector_test.go index 088ab934022..22847df96ca 100644 --- a/insecure/integration/functional/test/gossipsub/rpc_inspector/validation_inspector_test.go +++ b/insecure/integration/functional/test/gossipsub/rpc_inspector/validation_inspector_test.go @@ -71,7 +71,7 @@ func TestValidationInspector_InvalidTopicId_Detection(t *testing.T) { require.True(t, ok) require.Equal(t, notification.TopicType, p2p.CtrlMsgNonClusterTopicType, "IsClusterPrefixed is expected to be false, no RPC with cluster prefixed topic sent in this test") require.Equal(t, spammer.SpammerNode.ID(), notification.PeerID) - require.True(t, channels.IsInvalidTopicErr(notification.Error)) + require.True(t, validation.IsInvalidTopicIDThresholdExceeded(notification.Error)) switch notification.MsgType { case p2pmsg.CtrlMsgGraft: invGraftNotifCount.Inc() diff --git a/module/metrics.go b/module/metrics.go index 44b0acda605..39342be16b1 100644 --- a/module/metrics.go +++ b/module/metrics.go @@ -320,7 +320,7 @@ type GossipSubRpcValidationInspectorMetrics interface { // OnIHaveInvalidTopicIdsExceedThreshold tracks the number of times that the async inspection of iHave messages of a single RPC failed due to the total number of invalid topic ids // received by the node on the iHave messages of that RPC exceeding the threshold, which results in a misbehaviour report. OnIHaveInvalidTopicIdsExceedThreshold() - + // OnIHaveDuplicateMessageIdsExceedThreshold tracks the number of times that the async inspection of iHave messages of a single RPC failed due to the total number of duplicate message ids // received by the node on an iHave message exceeding the threshold, which results in a misbehaviour report. OnIHaveDuplicateMessageIdsExceedThreshold() diff --git a/network/p2p/inspector/validation/control_message_validation_inspector_test.go b/network/p2p/inspector/validation/control_message_validation_inspector_test.go index 93289a3836d..0cd0bfb02dc 100644 --- a/network/p2p/inspector/validation/control_message_validation_inspector_test.go +++ b/network/p2p/inspector/validation/control_message_validation_inspector_test.go @@ -342,6 +342,50 @@ func TestControlMessageInspection_ValidRpc(t *testing.T) { unittest.RequireCloseBefore(t, inspector.Done(), 5*time.Second, "inspector did not stop") } +// TestGraftInspection_InvalidTopic_BelowThreshold ensures inspector does not disseminate an invalid control message notification for +// graft messages when the invalid topic id count does not exceed the configured threshold. +func TestGraftInspection_InvalidTopic_BelowThreshold(t *testing.T) { + c, err := config.DefaultConfig() + require.NoError(t, err) + cfg := &c.NetworkConfig.GossipSub.RpcInspector.Validation + inspector, signalerCtx, cancel, consumer, rpcTracker, sporkID, _, topicProviderOracle := inspectorFixture(t, func(params *validation.InspectorParams) { + params.Config = cfg + }) + + var unknownTopicGrafts []*pubsub_pb.ControlGraft + var malformedTopicGrafts []*pubsub_pb.ControlGraft + var invalidSporkIDTopicGrafts []*pubsub_pb.ControlGraft + var allTopics []string + for i := 0; i < cfg.GraftPrune.InvalidTopicIdThreshold; i++ { + // create unknown topic + unknownTopic, malformedTopic, invalidSporkIDTopic := invalidTopics(t, sporkID) + allTopics = append(allTopics, unknownTopic, malformedTopic, invalidSporkIDTopic) + unknownTopicGrafts = append(unknownTopicGrafts, unittest.P2PRPCGraftFixture(&unknownTopic)) + malformedTopicGrafts = append(malformedTopicGrafts, unittest.P2PRPCGraftFixture(&malformedTopic)) + invalidSporkIDTopicGrafts = append(invalidSporkIDTopicGrafts, unittest.P2PRPCGraftFixture(&invalidSporkIDTopic)) + } + // avoid unknown topics errors + topicProviderOracle.UpdateTopics(allTopics) + unknownTopicReq := unittest.P2PRPCFixture(unittest.WithGrafts(unknownTopicGrafts...)) + malformedTopicReq := unittest.P2PRPCFixture(unittest.WithGrafts(malformedTopicGrafts...)) + invalidSporkIDTopicReq := unittest.P2PRPCFixture(unittest.WithGrafts(invalidSporkIDTopicGrafts...)) + + from := unittest.PeerIdFixture(t) + rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe() + consumer.AssertNotCalled(t, "OnInvalidControlMessageNotification", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")) + inspector.Start(signalerCtx) + unittest.RequireComponentsReadyBefore(t, 1*time.Second, inspector) + + require.NoError(t, inspector.Inspect(from, unknownTopicReq)) + require.NoError(t, inspector.Inspect(from, malformedTopicReq)) + require.NoError(t, inspector.Inspect(from, invalidSporkIDTopicReq)) + + // sleep for 1 second to ensure rpc's is processed + time.Sleep(3 * time.Second) + cancel() + unittest.RequireCloseBefore(t, inspector.Done(), 5*time.Second, "inspector did not stop") +} + // TestGraftInspection_InvalidTopic_AboveThreshold ensures inspector disseminates an invalid control message notification for // graft messages when the invalid topic id count exceeds the configured threshold. func TestGraftInspection_InvalidTopic_AboveThreshold(t *testing.T) { @@ -612,6 +656,98 @@ func TestPruneInspection_DuplicateTopicIds_BelowThreshold(t *testing.T) { unittest.RequireCloseBefore(t, inspector.Done(), 5*time.Second, "inspector did not stop") } +// TestIHaveInspection_InvalidTopic_AboveThreshold ensures inspector disseminates an invalid control message notification for +// ihave messages when the invalid topic id count exceeds the configured threshold. +func TestIHaveInspection_InvalidTopic_AboveThreshold(t *testing.T) { + c, err := config.DefaultConfig() + require.NoError(t, err) + cfg := &c.NetworkConfig.GossipSub.RpcInspector.Validation + inspector, signalerCtx, cancel, consumer, rpcTracker, sporkID, _, topicProviderOracle := inspectorFixture(t, func(params *validation.InspectorParams) { + params.Config = cfg + }) + + var unknownTopicIHaves []*pubsub_pb.ControlIHave + var malformedTopicIHaves []*pubsub_pb.ControlIHave + var invalidSporkIDTopicIHaves []*pubsub_pb.ControlIHave + var allTopics []string + for i := 0; i < cfg.GraftPrune.InvalidTopicIdThreshold+1; i++ { + // create unknown topic + unknownTopic, malformedTopic, invalidSporkIDTopic := invalidTopics(t, sporkID) + allTopics = append(allTopics, unknownTopic, malformedTopic, invalidSporkIDTopic) + unknownTopicIHaves = append(unknownTopicIHaves, unittest.P2PRPCIHaveFixture(&unknownTopic, unittest.IdentifierListFixture(5).Strings()...)) + malformedTopicIHaves = append(malformedTopicIHaves, unittest.P2PRPCIHaveFixture(&malformedTopic, unittest.IdentifierListFixture(5).Strings()...)) + invalidSporkIDTopicIHaves = append(invalidSporkIDTopicIHaves, unittest.P2PRPCIHaveFixture(&invalidSporkIDTopic, unittest.IdentifierListFixture(5).Strings()...)) + } + + // avoid unknown topics errors + topicProviderOracle.UpdateTopics(allTopics) + unknownTopicReq := unittest.P2PRPCFixture(unittest.WithIHaves(unknownTopicIHaves...)) + malformedTopicReq := unittest.P2PRPCFixture(unittest.WithIHaves(malformedTopicIHaves...)) + invalidSporkIDTopicReq := unittest.P2PRPCFixture(unittest.WithIHaves(invalidSporkIDTopicIHaves...)) + + from := unittest.PeerIdFixture(t) + rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe() + checkNotification := checkNotificationFunc(t, from, p2pmsg.CtrlMsgIHave, validation.IsInvalidTopicIDThresholdExceeded, p2p.CtrlMsgNonClusterTopicType) + consumer.On("OnInvalidControlMessageNotification", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")).Return(nil).Times(3).Run(checkNotification) + inspector.Start(signalerCtx) + unittest.RequireComponentsReadyBefore(t, 1*time.Second, inspector) + + require.NoError(t, inspector.Inspect(from, unknownTopicReq)) + require.NoError(t, inspector.Inspect(from, malformedTopicReq)) + require.NoError(t, inspector.Inspect(from, invalidSporkIDTopicReq)) + + // sleep for 1 second to ensure rpc's is processed + time.Sleep(2 * time.Second) + cancel() + unittest.RequireCloseBefore(t, inspector.Done(), 5*time.Second, "inspector did not stop") +} + +// TestIHaveInspection_InvalidTopic_BelowThreshold ensures inspector does not disseminate an invalid control message notification for +// ihave messages when the invalid topic id count does not exceed the configured threshold. +func TestIHaveInspection_InvalidTopic_BelowThreshold(t *testing.T) { + c, err := config.DefaultConfig() + require.NoError(t, err) + cfg := &c.NetworkConfig.GossipSub.RpcInspector.Validation + inspector, signalerCtx, cancel, consumer, rpcTracker, sporkID, _, topicProviderOracle := inspectorFixture(t, func(params *validation.InspectorParams) { + params.Config = cfg + }) + + var unknownTopicIHaves []*pubsub_pb.ControlIHave + var malformedTopicIHaves []*pubsub_pb.ControlIHave + var invalidSporkIDTopicIHaves []*pubsub_pb.ControlIHave + var allTopics []string + for i := 0; i < cfg.GraftPrune.InvalidTopicIdThreshold; i++ { + // create unknown topic + unknownTopic, malformedTopic, invalidSporkIDTopic := invalidTopics(t, sporkID) + allTopics = append(allTopics, unknownTopic, malformedTopic, invalidSporkIDTopic) + unknownTopicIHaves = append(unknownTopicIHaves, unittest.P2PRPCIHaveFixture(&unknownTopic, unittest.IdentifierListFixture(5).Strings()...)) + malformedTopicIHaves = append(malformedTopicIHaves, unittest.P2PRPCIHaveFixture(&malformedTopic, unittest.IdentifierListFixture(5).Strings()...)) + invalidSporkIDTopicIHaves = append(invalidSporkIDTopicIHaves, unittest.P2PRPCIHaveFixture(&invalidSporkIDTopic, unittest.IdentifierListFixture(5).Strings()...)) + } + + // avoid unknown topics errors + topicProviderOracle.UpdateTopics(allTopics) + unknownTopicReq := unittest.P2PRPCFixture(unittest.WithIHaves(unknownTopicIHaves...)) + malformedTopicReq := unittest.P2PRPCFixture(unittest.WithIHaves(malformedTopicIHaves...)) + invalidSporkIDTopicReq := unittest.P2PRPCFixture(unittest.WithIHaves(invalidSporkIDTopicIHaves...)) + + from := unittest.PeerIdFixture(t) + rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe() + // no notification should be disseminated for valid messages as long as the number of invalid topic ids is below the threshold + consumer.AssertNotCalled(t, "OnInvalidControlMessageNotification", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")) + inspector.Start(signalerCtx) + unittest.RequireComponentsReadyBefore(t, 1*time.Second, inspector) + + require.NoError(t, inspector.Inspect(from, unknownTopicReq)) + require.NoError(t, inspector.Inspect(from, malformedTopicReq)) + require.NoError(t, inspector.Inspect(from, invalidSporkIDTopicReq)) + + // sleep for 1 second to ensure rpc's is processed + time.Sleep(2 * time.Second) + cancel() + unittest.RequireCloseBefore(t, inspector.Done(), 5*time.Second, "inspector did not stop") +} + // TestIHaveInspection_DuplicateTopicIds_BelowThreshold ensures inspector does not disseminate an invalid control message notification for // iHave messages when duplicate topic ids are below allowed threshold. func TestIHaveInspection_DuplicateTopicIds_BelowThreshold(t *testing.T) {