diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index e87f1e6327d..345696c1922 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -410,6 +410,7 @@ func (builder *FlowAccessNodeBuilder) buildSyncEngine() *FlowAccessNodeBuilder { builder.FollowerEng, builder.SyncCore, builder.SyncEngineParticipantsProviderFactory(), + synceng.NewSpamDetectionConfig(), ) if err != nil { return nil, fmt.Errorf("could not create synchronization engine: %w", err) diff --git a/cmd/collection/main.go b/cmd/collection/main.go index 27faa959ca2..e53762ffb35 100644 --- a/cmd/collection/main.go +++ b/cmd/collection/main.go @@ -366,6 +366,7 @@ func main() { followerEng, mainChainSyncCore, node.SyncEngineIdentifierProvider, + consync.NewSpamDetectionConfig(), ) if err != nil { return nil, fmt.Errorf("could not create synchronization engine: %w", err) diff --git a/cmd/consensus/main.go b/cmd/consensus/main.go index 8fb294fedf4..2ef9380f661 100644 --- a/cmd/consensus/main.go +++ b/cmd/consensus/main.go @@ -849,6 +849,7 @@ func main() { comp, syncCore, node.SyncEngineIdentifierProvider, + synceng.NewSpamDetectionConfig(), ) if err != nil { return nil, fmt.Errorf("could not initialize synchronization engine: %w", err) diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 0c60c2cec0b..01d22346437 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -1081,6 +1081,7 @@ func (exeNode *ExecutionNode) LoadSynchronizationEngine( exeNode.followerEng, exeNode.syncCore, node.SyncEngineIdentifierProvider, + synchronization.NewSpamDetectionConfig(), ) if err != nil { return nil, fmt.Errorf("could not initialize synchronization engine: %w", err) diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 1e294e70d01..9ff2a3fd61e 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -396,6 +396,7 @@ func (builder *ObserverServiceBuilder) buildSyncEngine() *ObserverServiceBuilder builder.FollowerEng, builder.SyncCore, builder.SyncEngineParticipantsProviderFactory(), + synceng.NewSpamDetectionConfig(), ) if err != nil { return nil, fmt.Errorf("could not create synchronization engine: %w", err) diff --git a/cmd/verification_builder.go b/cmd/verification_builder.go index 948857fb196..e7e42d224f7 100644 --- a/cmd/verification_builder.go +++ b/cmd/verification_builder.go @@ -405,6 +405,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() { followerEng, syncCore, node.SyncEngineIdentifierProvider, + commonsync.NewSpamDetectionConfig(), ) if err != nil { return nil, fmt.Errorf("could not create synchronization engine: %w", err) diff --git a/consensus/integration/nodes_test.go b/consensus/integration/nodes_test.go index 79fc57a56f1..ddfad8ee52a 100644 --- a/consensus/integration/nodes_test.go +++ b/consensus/integration/nodes_test.go @@ -638,6 +638,7 @@ func createNode( comp, syncCore, idProvider, + synceng.NewSpamDetectionConfig(), func(cfg *synceng.Config) { // use a small pool and scan interval for sync engine cfg.ScanInterval = 500 * time.Millisecond diff --git a/engine/common/synchronization/config.go b/engine/common/synchronization/config.go index 59d8c4dc1ea..646238098c0 100644 --- a/engine/common/synchronization/config.go +++ b/engine/common/synchronization/config.go @@ -36,3 +36,19 @@ func WithScanInterval(interval time.Duration) OptionFunc { cfg.ScanInterval = interval } } + +// spamProbabilityMultiplier is used to convert probability factor to an integer as well as a maximum value - 1 +// random number that can be generated by the random number generator. +const spamProbabilityMultiplier = 1001 + +type SpamDetectionConfig struct { + syncRequestProbability float32 +} + +func NewSpamDetectionConfig() *SpamDetectionConfig { + return &SpamDetectionConfig{ + // create misbehavior report 1/100 message requests + // TODO: make this configurable as a start up flag for the engine + syncRequestProbability: 0.01, + } +} diff --git a/engine/common/synchronization/engine.go b/engine/common/synchronization/engine.go index ec3f2e941dd..5ee427546c2 100644 --- a/engine/common/synchronization/engine.go +++ b/engine/common/synchronization/engine.go @@ -24,9 +24,11 @@ import ( "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/network" + "github.com/onflow/flow-go/network/alsp" "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/utils/logging" "github.com/onflow/flow-go/utils/rand" ) @@ -54,7 +56,8 @@ type Engine struct { core module.SyncCore participantsProvider module.IdentifierProvider - requestHandler *RequestHandler // component responsible for handling requests + requestHandler *RequestHandler // component responsible for handling requests + spamDetectionConfig *SpamDetectionConfig pendingSyncResponses engine.MessageStore // message store for *message.SyncResponse pendingBlockResponses engine.MessageStore // message store for *message.BlockResponse @@ -75,6 +78,7 @@ func New( comp consensus.Compliance, core module.SyncCore, participantsProvider module.IdentifierProvider, + spamDetectionConfig *SpamDetectionConfig, opts ...OptionFunc, ) (*Engine, error) { @@ -105,6 +109,7 @@ func New( pollInterval: opt.PollInterval, scanInterval: opt.ScanInterval, participantsProvider: participantsProvider, + spamDetectionConfig: spamDetectionConfig, } // register the engine with the network layer and store the conduit @@ -202,9 +207,73 @@ func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, eve // - All other errors are potential symptoms of internal state corruption or bugs (fatal). func (e *Engine) process(channel channels.Channel, originID flow.Identifier, event interface{}) error { switch event.(type) { - case *messages.RangeRequest, *messages.BatchRequest, *messages.SyncRequest: + case *messages.BatchRequest: + report, misbehavior := e.validateBatchRequestForALSP(channel, originID, event) + if misbehavior { + e.con.ReportMisbehavior(report) // report misbehavior to ALSP + e.log. + Warn(). + Hex("origin_id", logging.ID(originID)). + Str(logging.KeySuspicious, "true"). + Msgf("received invalid batch request from %x: %v", originID[:], misbehavior) + e.metrics.InboundMessageDropped(metrics.EngineSynchronization, metrics.MessageBatchRequest) + return nil + } + return e.requestHandler.Process(channel, originID, event) + case *messages.RangeRequest: + report, misbehavior := e.validateRangeRequestForALSP(channel, originID, event) + if misbehavior { + e.con.ReportMisbehavior(report) // report misbehavior to ALSP + e.log. + Warn(). + Hex("origin_id", logging.ID(originID)). + Str(logging.KeySuspicious, "true"). + Msgf("received invalid range request from %x: %v", originID[:], misbehavior) + e.metrics.InboundMessageDropped(metrics.EngineSynchronization, metrics.MessageRangeRequest) + return nil + } + return e.requestHandler.Process(channel, originID, event) + + case *messages.SyncRequest: + report, misbehavior := e.validateSyncRequestForALSP(channel, originID, event) + if misbehavior { + e.con.ReportMisbehavior(report) // report misbehavior to ALSP + e.log. + Warn(). + Hex("origin_id", logging.ID(originID)). + Str(logging.KeySuspicious, "true"). + Msgf("received invalid sync request from %x: %v", originID[:], misbehavior) + e.metrics.InboundMessageDropped(metrics.EngineSynchronization, metrics.MessageSyncRequest) + return nil + } return e.requestHandler.Process(channel, originID, event) - case *messages.SyncResponse, *messages.BlockResponse: + + case *messages.BlockResponse: + report, misbehavior := e.validateBlockResponseForALSP(channel, originID, event) + if misbehavior { + e.con.ReportMisbehavior(report) // report misbehavior to ALSP + e.log. + Warn(). + Hex("origin_id", logging.ID(originID)). + Str(logging.KeySuspicious, "true"). + Msgf("received invalid block response from %x: %v", originID[:], misbehavior) + e.metrics.InboundMessageDropped(metrics.EngineSynchronization, metrics.MessageBlockResponse) + return nil + } + return e.responseMessageHandler.Process(originID, event) + + case *messages.SyncResponse: + report, misbehavior := e.validateSyncResponseForALSP(channel, originID, event) + if misbehavior { + e.con.ReportMisbehavior(report) // report misbehavior to ALSP + e.log. + Warn(). + Hex("origin_id", logging.ID(originID)). + Str(logging.KeySuspicious, "true"). + Msgf("received invalid sync response from %x: %v", originID[:], misbehavior) + e.metrics.InboundMessageDropped(metrics.EngineSynchronization, metrics.MessageSyncResponse) + return nil + } return e.responseMessageHandler.Process(originID, event) default: return fmt.Errorf("received input with type %T from %x: %w", event, originID[:], engine.IncompatibleInputTypeError) @@ -424,3 +493,61 @@ func (e *Engine) sendRequests(participants flow.IdentifierList, ranges []chainsy e.log.Warn().Err(err).Msg("sending range and batch requests failed") } } + +// TODO: implement spam reporting similar to validateSyncRequestForALSP +func (e *Engine) validateBatchRequestForALSP(channel channels.Channel, id flow.Identifier, event interface{}) (*alsp.MisbehaviorReport, bool) { + return nil, false +} + +// TODO: implement spam reporting similar to validateSyncRequestForALSP +func (e *Engine) validateBlockResponseForALSP(channel channels.Channel, id flow.Identifier, event interface{}) (*alsp.MisbehaviorReport, bool) { + return nil, false +} + +// TODO: implement spam reporting similar to validateSyncRequestForALSP +func (e *Engine) validateRangeRequestForALSP(channel channels.Channel, id flow.Identifier, event interface{}) (*alsp.MisbehaviorReport, bool) { + return nil, false +} + +// validateSyncRequestForALSP checks if a sync request should be reported as spam. It returns a misbehavior report and a boolean indicating whether the request should be reported. +func (e *Engine) validateSyncRequestForALSP(channel channels.Channel, originID flow.Identifier, event interface{}) (*alsp.MisbehaviorReport, bool) { + // Generate a random integer between 1 and spamProbabilityMultiplier (exclusive) + n, err := rand.Uint32n(spamProbabilityMultiplier) + + if err != nil { + // failing to generate a random number is unlikely. If an error is encountered while + // generating a random number it indicates a bug and processing can not proceed. + e.log.Fatal(). + Err(err). + Bool(logging.KeyNetworkingSecurity, true). + Str("originID", originID.String()). + Msg("failed to generate random number") + } + + // to avoid creating a misbehavior report for every sync request received, use a probabilistic approach. + // Create a report with a probability of spamDetectionConfig.syncRequestProbability + if float32(n) < e.spamDetectionConfig.syncRequestProbability*spamProbabilityMultiplier { + // create a misbehavior report + e.log.Info().Str("originID", originID.String()).Msg("creating misbehavior report") + report, err := alsp.NewMisbehaviorReport(originID, alsp.ResourceIntensiveRequest) + + if err != nil { + // failing to create the misbehavior report is unlikely. If an error is encountered while + // creating the misbehavior report it indicates a bug and processing can not proceed. + e.log.Fatal(). + Err(err). + Bool(logging.KeyNetworkingSecurity, true). + Str("originID", originID.String()). + Msg("failed to create misbehavior report") + } + return report, true + } + + // most of the time, don't report a misbehavior + return nil, false +} + +// TODO: implement spam reporting similar to validateSyncRequestForALSP +func (e *Engine) validateSyncResponseForALSP(channel channels.Channel, id flow.Identifier, event interface{}) (*alsp.MisbehaviorReport, bool) { + return nil, false +} diff --git a/engine/common/synchronization/engine_test.go b/engine/common/synchronization/engine_test.go index 128c4684376..0c81d3fda5e 100644 --- a/engine/common/synchronization/engine_test.go +++ b/engine/common/synchronization/engine_test.go @@ -2,9 +2,9 @@ package synchronization import ( "context" + "fmt" "io" "math" - "math/rand" "testing" "time" @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/suite" mockconsensus "github.com/onflow/flow-go/engine/consensus/mock" + "github.com/onflow/flow-go/model/chainsync" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/filter" "github.com/onflow/flow-go/model/messages" @@ -32,6 +33,7 @@ import ( protocol "github.com/onflow/flow-go/state/protocol/mock" storerr "github.com/onflow/flow-go/storage" storage "github.com/onflow/flow-go/storage/mock" + "github.com/onflow/flow-go/utils/rand" "github.com/onflow/flow-go/utils/unittest" ) @@ -174,37 +176,255 @@ func (ss *SyncSuite) SetupTest() { filter.Not(filter.HasNodeID(ss.me.NodeID())), ), idCache, - )) + ), + NewSpamDetectionConfig()) require.NoError(ss.T(), err, "should pass engine initialization") ss.e = e } -func (ss *SyncSuite) TestOnSyncRequest() { - +// TestOnSyncRequest_LowerThanReceiver_WithinTolerance tests that a sync request that's within tolerance of the receiver doesn't trigger +// a response, even if request height is lower than receiver. +func (ss *SyncSuite) TestOnSyncRequest_LowerThanReceiver_WithinTolerance() { + nonce, err := rand.Uint64() + require.NoError(ss.T(), err, "should generate nonce") // generate origin and request message originID := unittest.IdentifierFixture() req := &messages.SyncRequest{ - Nonce: rand.Uint64(), + Nonce: nonce, Height: 0, } // regardless of request height, if within tolerance, we should not respond ss.core.On("HandleHeight", ss.head, req.Height) ss.core.On("WithinTolerance", ss.head, req.Height).Return(true) - err := ss.e.requestHandler.onSyncRequest(originID, req) - ss.Assert().NoError(err, "same height sync request should pass") + ss.Assert().NoError(ss.e.requestHandler.onSyncRequest(originID, req)) ss.con.AssertNotCalled(ss.T(), "Unicast", mock.Anything, mock.Anything) + ss.core.AssertExpectations(ss.T()) +} + +// TestOnSyncRequest_HigherThanReceiver_OutsideTolerance tests that a sync request that's higher +// than the receiver's height doesn't trigger a response, even if outside tolerance. +func (ss *SyncSuite) TestOnSyncRequest_HigherThanReceiver_OutsideTolerance() { + nonce, err := rand.Uint64() + require.NoError(ss.T(), err, "should generate nonce") + // generate origin and request message + originID := unittest.IdentifierFixture() + req := &messages.SyncRequest{ + Nonce: nonce, + Height: 0, + } // if request height is higher than local finalized, we should not respond req.Height = ss.head.Height + 1 + ss.core.On("HandleHeight", ss.head, req.Height) ss.core.On("WithinTolerance", ss.head, req.Height).Return(false) - err = ss.e.requestHandler.onSyncRequest(originID, req) - ss.Assert().NoError(err, "same height sync request should pass") + ss.Assert().NoError(ss.e.requestHandler.onSyncRequest(originID, req)) + ss.con.AssertNotCalled(ss.T(), "Unicast", mock.Anything, mock.Anything) + ss.core.AssertExpectations(ss.T()) +} + +// TestProcess_SyncRequest_HigherThanReceiver_OutsideTolerance_NoMisbehaviorReport tests that a sync request that's higher +// than the receiver's height doesn't trigger a response, even if outside tolerance and does not generate ALSP +// spamming misbehavior report (simulating the most likely probability). +func (ss *SyncSuite) TestProcess_SyncRequest_HigherThanReceiver_OutsideTolerance_NoMisbehaviorReport() { + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(ss.T(), context.Background()) + ss.e.Start(ctx) + unittest.AssertClosesBefore(ss.T(), ss.e.Ready(), time.Second) + defer cancel() + + // generate origin and request message + originID := unittest.IdentifierFixture() + + nonce, err := rand.Uint64() + require.NoError(ss.T(), err, "should generate nonce") + + req := &messages.SyncRequest{ + Nonce: nonce, + Height: 0, + } + + // if request height is higher than local finalized, we should not respond + req.Height = ss.head.Height + 1 + + ss.core.On("HandleHeight", ss.head, req.Height).Once() + ss.core.On("WithinTolerance", ss.head, req.Height).Return(false).Once() + ss.con.AssertNotCalled(ss.T(), "Unicast", mock.Anything, mock.Anything) - // if the request height is lower than head and outside tolerance, we should submit correct response + ss.e.spamDetectionConfig.syncRequestProbability = 0.0 // force not creating misbehavior report + + require.NoError(ss.T(), ss.e.Process(channels.SyncCommittee, originID, req)) + + // give at least some time to process items + time.Sleep(time.Millisecond * 100) + + ss.core.AssertExpectations(ss.T()) + ss.con.AssertExpectations(ss.T()) +} + +// TestLoad_Process_SyncRequest_HigherThanReceiver_OutsideTolerance_AlwaysReportSpam tests that a sync request that's higher +// than the receiver's height doesn't trigger a response, even if outside tolerance and generates ALSP +// spamming misbehavior report (simulating the unlikely probability). +// This load test ensures that a misbehavior report is generated every time when the probability factor is set to 1.0. +func (ss *SyncSuite) TestLoad_Process_SyncRequest_HigherThanReceiver_OutsideTolerance_AlwaysReportSpam() { + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(ss.T(), context.Background()) + ss.e.Start(ctx) + unittest.AssertClosesBefore(ss.T(), ss.e.Ready(), time.Second) + defer cancel() + + load := 1000 + + // reset misbehavior report counter for each subtest + misbehaviorsCounter := 0 + + for i := 0; i < load; i++ { + // generate origin and request message + originID := unittest.IdentifierFixture() + + nonce, err := rand.Uint64() + require.NoError(ss.T(), err, "should generate nonce") + + req := &messages.SyncRequest{ + Nonce: nonce, + Height: 0, + } + + // if request height is higher than local finalized, we should not respond + req.Height = ss.head.Height + 1 + + // assert that HandleHeight, WithinTolerance are not called because misbehavior is reported + // also, check that response is never sent + ss.core.AssertNotCalled(ss.T(), "HandleHeight") + ss.core.AssertNotCalled(ss.T(), "WithinTolerance") + ss.con.AssertNotCalled(ss.T(), "Unicast", mock.Anything, mock.Anything) + + // count misbehavior reports over the course of a load test + ss.con.On("ReportMisbehavior", mock.Anything).Return(mock.Anything).Run( + func(args mock.Arguments) { + misbehaviorsCounter++ + }, + ) + + // force creating misbehavior report by setting syncRequestProbability to 1.0 (i.e. report misbehavior 100% of the time) + ss.e.spamDetectionConfig.syncRequestProbability = 1.0 + + require.NoError(ss.T(), ss.e.Process(channels.SyncCommittee, originID, req)) + } + + ss.core.AssertExpectations(ss.T()) + ss.con.AssertExpectations(ss.T()) + assert.Equal(ss.T(), misbehaviorsCounter, load) // should generate misbehavior report every time +} + +// TestLoad_Process_SyncRequest_HigherThanReceiver_OutsideTolerance_SometimesReportSpam load tests that a sync request that's higher +// than the receiver's height doesn't trigger a response, even if outside tolerance. It checks that an ALSP +// spam misbehavior report was generated and that the number of misbehavior reports is within a reasonable range. +// This load test ensures that a misbehavior report is generated an appropriate range of times when the probability factor is set to different values. +func (ss *SyncSuite) TestLoad_Process_SyncRequest_HigherThanReceiver_OutsideTolerance_SometimesReportSpam() { + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(ss.T(), context.Background()) + ss.e.Start(ctx) + unittest.AssertClosesBefore(ss.T(), ss.e.Ready(), time.Second) + defer cancel() + + load := 1000 + + type loadGroup struct { + syncRequestProbabilityFactor float32 + expectedMisbehaviorsLower int + expectedMisbehaviorsUpper int + } + + loadGroups := []loadGroup{} + + // expect to never get misbehavior report + loadGroups = append(loadGroups, loadGroup{0.0, 0, 0}) + + // expect to get misbehavior report between 10% of the time + loadGroups = append(loadGroups, loadGroup{0.1, 75, 140}) + + // expect to get misbehavior report between 1% of the time + loadGroups = append(loadGroups, loadGroup{0.01, 5, 15}) + + // expect to get misbehavior report between 0.1% of the time (1 in 1000 requests) + loadGroups = append(loadGroups, loadGroup{0.001, 0, 7}) + + // expect to get misbehavior report between 50% of the time + loadGroups = append(loadGroups, loadGroup{0.5, 450, 550}) + + // expect to get misbehavior report between 90% of the time + loadGroups = append(loadGroups, loadGroup{0.9, 850, 950}) + + // reset misbehavior report counter for each subtest + misbehaviorsCounter := 0 + + for _, loadGroup := range loadGroups { + ss.T().Run(fmt.Sprintf("load test; pfactor=%f lower=%d upper=%d", loadGroup.syncRequestProbabilityFactor, loadGroup.expectedMisbehaviorsLower, loadGroup.expectedMisbehaviorsUpper), func(t *testing.T) { + for i := 0; i < load; i++ { + ss.T().Log("load iteration", i) + nonce, err := rand.Uint64() + require.NoError(ss.T(), err, "should generate nonce") + + // generate origin and request message + originID := unittest.IdentifierFixture() + req := &messages.SyncRequest{ + Nonce: nonce, + Height: 0, + } + + // if request height is higher than local finalized, we should not respond + req.Height = ss.head.Height + 1 + + ss.core.On("HandleHeight", ss.head, req.Height) + ss.core.On("WithinTolerance", ss.head, req.Height).Return(false) + ss.con.AssertNotCalled(ss.T(), "Unicast", mock.Anything, mock.Anything) + + // maybe function calls that might or might not occur over the course of the load test + ss.core.On("ScanPending", ss.head).Return([]chainsync.Range{}, []chainsync.Batch{}).Maybe() + ss.con.On("Multicast", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + + // count misbehavior reports over the course of a load test + ss.con.On("ReportMisbehavior", mock.Anything).Return(mock.Anything).Maybe().Run( + func(args mock.Arguments) { + misbehaviorsCounter++ + }, + ) + ss.e.spamDetectionConfig.syncRequestProbability = loadGroup.syncRequestProbabilityFactor + require.NoError(ss.T(), ss.e.Process(channels.SyncCommittee, originID, req)) + } + + // check function call expectations at the end of the load test; otherwise, load test would take much longer + ss.core.AssertExpectations(ss.T()) + ss.con.AssertExpectations(ss.T()) + + // check that correct range of misbehavior reports were generated (between 1-2 reports per 1000 requests) + // since we're using a random method to generate misbehavior reports, we can't guarantee the exact number, so we + // check that it's within a larger range, but that at least 1 misbehavior report was generated + + ss.T().Logf("misbehaviors counter after load test: %d (expected lower bound: %d expected upper bound: %d)", misbehaviorsCounter, loadGroup.expectedMisbehaviorsLower, loadGroup.expectedMisbehaviorsUpper) + assert.GreaterOrEqual(ss.T(), misbehaviorsCounter, loadGroup.expectedMisbehaviorsLower) + assert.LessOrEqual(ss.T(), misbehaviorsCounter, loadGroup.expectedMisbehaviorsUpper) // too many reports would indicate a bug + + misbehaviorsCounter = 0 // reset counter for next subtest + }) + } +} + +// TestOnSyncRequest_LowerThanReceiver_OutsideTolerance tests that a sync request that's outside tolerance and +// lower than the receiver's height triggers a response. +func (ss *SyncSuite) TestOnSyncRequest_LowerThanReceiver_OutsideTolerance() { + nonce, err := rand.Uint64() + require.NoError(ss.T(), err, "should generate nonce") + + // generate origin and request message + originID := unittest.IdentifierFixture() + req := &messages.SyncRequest{ + Nonce: nonce, + Height: 0, + } + + // if the request height is lower than head and outside tolerance, we should expect correct response req.Height = ss.head.Height - 1 ss.core.On("HandleHeight", ss.head, req.Height) ss.core.On("WithinTolerance", ss.head, req.Height).Return(false) @@ -224,12 +444,17 @@ func (ss *SyncSuite) TestOnSyncRequest() { } func (ss *SyncSuite) TestOnSyncResponse() { + nonce, err := rand.Uint64() + require.NoError(ss.T(), err, "should generate nonce") + + height, err := rand.Uint64() + require.NoError(ss.T(), err, "should generate height") // generate origin ID and response message originID := unittest.IdentifierFixture() res := &messages.SyncResponse{ - Nonce: rand.Uint64(), - Height: rand.Uint64(), + Nonce: nonce, + Height: height, } // the height should be handled @@ -239,11 +464,13 @@ func (ss *SyncSuite) TestOnSyncResponse() { } func (ss *SyncSuite) TestOnRangeRequest() { + nonce, err := rand.Uint64() + require.NoError(ss.T(), err, "should generate nonce") // generate originID and range request originID := unittest.IdentifierFixture() req := &messages.RangeRequest{ - Nonce: rand.Uint64(), + Nonce: nonce, FromHeight: 0, ToHeight: 0, } @@ -357,11 +584,13 @@ func (ss *SyncSuite) TestOnRangeRequest() { } func (ss *SyncSuite) TestOnBatchRequest() { + nonce, err := rand.Uint64() + require.NoError(ss.T(), err, "should generate nonce") // generate origin ID and batch request originID := unittest.IdentifierFixture() req := &messages.BatchRequest{ - Nonce: rand.Uint64(), + Nonce: nonce, BlockIDs: nil, } @@ -434,11 +663,13 @@ func (ss *SyncSuite) TestOnBatchRequest() { } func (ss *SyncSuite) TestOnBlockResponse() { + nonce, err := rand.Uint64() + require.NoError(ss.T(), err, "should generate nonce") // generate origin and block response originID := unittest.IdentifierFixture() res := &messages.BlockResponse{ - Nonce: rand.Uint64(), + Nonce: nonce, Blocks: []messages.UntrustedBlock{}, } diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index 8639e1ee1f7..4e38cefc458 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -790,6 +790,7 @@ func ExecutionNode(t *testing.T, hub *stub.Hub, identity *flow.Identity, identit ), idCache, ), + synchronization.NewSpamDetectionConfig(), synchronization.WithPollInterval(time.Duration(0)), ) require.NoError(t, err) diff --git a/follower/follower_builder.go b/follower/follower_builder.go index 4b561916fe4..4986a07416d 100644 --- a/follower/follower_builder.go +++ b/follower/follower_builder.go @@ -296,6 +296,7 @@ func (builder *FollowerServiceBuilder) buildSyncEngine() *FollowerServiceBuilder builder.FollowerEng, builder.SyncCore, builder.SyncEngineParticipantsProviderFactory(), + synceng.NewSpamDetectionConfig(), ) if err != nil { return nil, fmt.Errorf("could not create synchronization engine: %w", err) diff --git a/module/chainsync/core.go b/module/chainsync/core.go index 08514b1b513..c493df79cb9 100644 --- a/module/chainsync/core.go +++ b/module/chainsync/core.go @@ -112,7 +112,7 @@ func (c *Core) HandleBlock(header *flow.Header) bool { } // HandleHeight handles receiving a new highest finalized height from another node. -// If the height difference between local and the reported height, we do nothing. +// If the height difference between local and the reported height is outside tolerance, we do nothing. // Otherwise, we queue each missing height. func (c *Core) HandleHeight(final *flow.Header, height uint64) { log := c.log.With().Uint64("final_height", final.Height).Uint64("recv_height", height).Logger() diff --git a/module/synchronization.go b/module/synchronization.go index ec7e893699f..eda7145e891 100644 --- a/module/synchronization.go +++ b/module/synchronization.go @@ -17,7 +17,7 @@ type BlockRequester interface { // RequestHeight indicates that the given block height should be queued for retrieval. RequestHeight(height uint64) - // Manually Prune requests + // Prune manually prunes requests Prune(final *flow.Header) } diff --git a/network/internal/testutils/testUtil.go b/network/internal/testutils/testUtil.go index c9fe52f2f4e..2f1fc839ca5 100644 --- a/network/internal/testutils/testUtil.go +++ b/network/internal/testutils/testUtil.go @@ -112,7 +112,7 @@ func NewTagWatchingConnManager(log zerolog.Logger, metrics module.LibP2PConnecti } // LibP2PNodeForMiddlewareFixture is a test helper that generate flow identities with a valid port and libp2p nodes. -// Note that the LibP2PNode created by this fixture is meant to used with a middleware component. +// Note that the LibP2PNode created by this fixture is meant to be used with a middleware component. // If you want to create a standalone LibP2PNode without network and middleware components, please use p2ptest.NodeFixture. // Args: // diff --git a/utils/rand/rand.go b/utils/rand/rand.go index 9a577f7afec..e56e2e3df60 100644 --- a/utils/rand/rand.go +++ b/utils/rand/rand.go @@ -114,7 +114,7 @@ func Uint() (uint, error) { return uint(r), err } -// returns a random uint strictly less than `n`. +// Uintn returns a random uint strictly less than `n`. // `n` has to be a strictly positive integer. // // It returns an error: