From 2b4c2570968bce5d7f43abcfd11d0b29ea763c4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20Junior?= Date: Thu, 3 Nov 2022 15:44:42 -0400 Subject: [PATCH] fix(dot/sync): remove block announcement in `bootstrap` sync mode (#2906) --- dot/core/service.go | 6 +- dot/network/notifications_test.go | 6 + dot/sync/chain_processor.go | 49 +++++--- dot/sync/chain_processor_test.go | 183 +++++++++++++++++++++++++--- dot/sync/chain_sync.go | 2 +- dot/sync/chain_sync_test.go | 4 +- dot/sync/interface.go | 2 +- dot/sync/mocks_test.go | 8 +- dot/sync/syncer.go | 20 ++- dot/sync/syncer_integration_test.go | 4 +- 10 files changed, 237 insertions(+), 47 deletions(-) diff --git a/dot/core/service.go b/dot/core/service.go index 528bda8c4f..581b1a343a 100644 --- a/dot/core/service.go +++ b/dot/core/service.go @@ -127,12 +127,16 @@ func (s *Service) StorageRoot() (common.Hash, error) { } // HandleBlockImport handles a block that was imported via the network -func (s *Service) HandleBlockImport(block *types.Block, state *rtstorage.TrieState) error { +func (s *Service) HandleBlockImport(block *types.Block, state *rtstorage.TrieState, announce bool) error { err := s.handleBlock(block, state) if err != nil { return fmt.Errorf("handling block: %w", err) } + if !announce { + return nil + } + bestBlockHash := s.blockState.BestBlockHash() isBestBlock := bestBlockHash.Equal(block.Header.Hash()) diff --git a/dot/network/notifications_test.go b/dot/network/notifications_test.go index 179f5c63c3..88cd0f3582 100644 --- a/dot/network/notifications_test.go +++ b/dot/network/notifications_test.go @@ -5,6 +5,7 @@ package network import ( "errors" + "fmt" "reflect" "testing" "time" @@ -196,6 +197,11 @@ func TestCreateNotificationsMessageHandler_BlockAnnounceHandshake(t *testing.T) err = handler(stream, testHandshake) require.ErrorIs(t, err, errCannotValidateHandshake) + + expectedErrorMessage := fmt.Sprintf("handling handshake: %s from peer %s using protocol %s: genesis hash mismatch", + errCannotValidateHandshake, testPeerID, info.protocolID) + require.EqualError(t, err, expectedErrorMessage) + data := info.peersData.getInboundHandshakeData(testPeerID) require.NotNil(t, data) require.True(t, data.received) diff --git a/dot/sync/chain_processor.go b/dot/sync/chain_processor.go index 9f0ba79ba8..abdbcff5ba 100644 --- a/dot/sync/chain_processor.go +++ b/dot/sync/chain_processor.go @@ -25,6 +25,8 @@ type chainProcessor struct { ctx context.Context cancel context.CancelFunc + chainSync ChainSync + // blocks that are ready for processing. ie. their parent is known, or their parent is ahead // of them within this channel and thus will be processed first readyBlocks *blockQueue @@ -42,24 +44,35 @@ type chainProcessor struct { telemetry telemetry.Client } -func newChainProcessor(readyBlocks *blockQueue, pendingBlocks DisjointBlockSet, - blockState BlockState, storageState StorageState, - transactionState TransactionState, babeVerifier BabeVerifier, - finalityGadget FinalityGadget, blockImportHandler BlockImportHandler, telemetry telemetry.Client) *chainProcessor { +type chainProcessorConfig struct { + readyBlocks *blockQueue + pendingBlocks DisjointBlockSet + syncer ChainSync + blockState BlockState + storageState StorageState + transactionState TransactionState + babeVerifier BabeVerifier + finalityGadget FinalityGadget + blockImportHandler BlockImportHandler + telemetry telemetry.Client +} + +func newChainProcessor(cfg chainProcessorConfig) *chainProcessor { ctx, cancel := context.WithCancel(context.Background()) return &chainProcessor{ ctx: ctx, cancel: cancel, - readyBlocks: readyBlocks, - pendingBlocks: pendingBlocks, - blockState: blockState, - storageState: storageState, - transactionState: transactionState, - babeVerifier: babeVerifier, - finalityGadget: finalityGadget, - blockImportHandler: blockImportHandler, - telemetry: telemetry, + readyBlocks: cfg.readyBlocks, + pendingBlocks: cfg.pendingBlocks, + chainSync: cfg.syncer, + blockState: cfg.blockState, + storageState: cfg.storageState, + transactionState: cfg.transactionState, + babeVerifier: cfg.babeVerifier, + finalityGadget: cfg.finalityGadget, + blockImportHandler: cfg.blockImportHandler, + telemetry: cfg.telemetry, } } @@ -109,6 +122,8 @@ func (s *chainProcessor) processBlockData(bd *types.BlockData) error { return fmt.Errorf("failed to check block state has body for hash %s: %w", bd.Hash, err) } + // while in bootstrap mode we don't need to broadcast block announcements + announceImportedBlock := s.chainSync.syncState() == tip if hasHeader && hasBody { // TODO: fix this; sometimes when the node shuts down the "best block" isn't stored properly, // so when the node restarts it has blocks higher than what it thinks is the best, causing it not to sync @@ -149,7 +164,7 @@ func (s *chainProcessor) processBlockData(bd *types.BlockData) error { return err } - if err := s.blockImportHandler.HandleBlockImport(block, state); err != nil { + if err := s.blockImportHandler.HandleBlockImport(block, state, announceImportedBlock); err != nil { logger.Warnf("failed to handle block import: %s", err) } @@ -170,7 +185,7 @@ func (s *chainProcessor) processBlockData(bd *types.BlockData) error { Body: *bd.Body, } - if err := s.handleBlock(block); err != nil { + if err := s.handleBlock(block, announceImportedBlock); err != nil { logger.Debugf("failed to handle block number %d: %s", block.Header.Number, err) return err } @@ -201,7 +216,7 @@ func (s *chainProcessor) handleBody(body *types.Body) { } // handleHeader handles blocks (header+body) included in BlockResponses -func (s *chainProcessor) handleBlock(block *types.Block) error { +func (s *chainProcessor) handleBlock(block *types.Block, announceImportedBlock bool) error { parent, err := s.blockState.GetHeader(block.Header.ParentHash) if err != nil { return fmt.Errorf("%w: %s", errFailedToGetParent, err) @@ -233,7 +248,7 @@ func (s *chainProcessor) handleBlock(block *types.Block) error { return fmt.Errorf("failed to execute block %d: %w", block.Header.Number, err) } - if err = s.blockImportHandler.HandleBlockImport(block, ts); err != nil { + if err = s.blockImportHandler.HandleBlockImport(block, ts, announceImportedBlock); err != nil { return err } diff --git a/dot/sync/chain_processor_test.go b/dot/sync/chain_processor_test.go index f9671d843f..86942df81b 100644 --- a/dot/sync/chain_processor_test.go +++ b/dot/sync/chain_processor_test.go @@ -26,6 +26,7 @@ func Test_chainProcessor_handleBlock(t *testing.T) { tests := map[string]struct { chainProcessorBuilder func(ctrl *gomock.Controller) chainProcessor block *types.Block + announce bool wantErr error }{ "handle getHeader error": { @@ -122,7 +123,7 @@ func Test_chainProcessor_handleBlock(t *testing.T) { chainProcessor.storageState = mockStorageState mockBlockImportHandler := NewMockBlockImportHandler(ctrl) mockBlockImportHandler.EXPECT().HandleBlockImport(mockBlock, - trieState).Return(mockError) + trieState, false).Return(mockError) chainProcessor.blockImportHandler = mockBlockImportHandler return }, @@ -156,7 +157,47 @@ func Test_chainProcessor_handleBlock(t *testing.T) { mockStorageState.EXPECT().TrieState(&trie.EmptyHash).Return(trieState, nil) chainProcessor.storageState = mockStorageState mockBlockImportHandler := NewMockBlockImportHandler(ctrl) - mockBlockImportHandler.EXPECT().HandleBlockImport(mockBlock, trieState).Return(nil) + mockBlockImportHandler.EXPECT().HandleBlockImport(mockBlock, trieState, false).Return(nil) + chainProcessor.blockImportHandler = mockBlockImportHandler + mockTelemetry := NewMockClient(ctrl) + mockTelemetry.EXPECT().SendMessage(gomock.Any()) + chainProcessor.telemetry = mockTelemetry + return + }, + block: &types.Block{ + Header: types.Header{ + Number: 0, + }, + Body: types.Body{}, + }, + }, + "import block and announce": { + announce: true, + chainProcessorBuilder: func(ctrl *gomock.Controller) (chainProcessor chainProcessor) { + mockBlock := &types.Block{ + Body: types.Body{}, // empty slice of extrinsics + } + trieState := storage.NewTrieState(nil) + mockBlockState := NewMockBlockState(ctrl) + mockHeader := &types.Header{ + Number: 0, + StateRoot: trie.EmptyHash, + } + mockHeaderHash := mockHeader.Hash() + mockBlockState.EXPECT().GetHeader(common.Hash{}).Return(mockHeader, nil) + + mockInstance := NewMockRuntimeInstance(ctrl) + mockInstance.EXPECT().SetContextStorage(trieState) + mockInstance.EXPECT().ExecuteBlock(mockBlock).Return(nil, nil) + mockBlockState.EXPECT().GetRuntime(&mockHeaderHash).Return(mockInstance, nil) + chainProcessor.blockState = mockBlockState + mockStorageState := NewMockStorageState(ctrl) + mockStorageState.EXPECT().Lock() + mockStorageState.EXPECT().Unlock() + mockStorageState.EXPECT().TrieState(&trie.EmptyHash).Return(trieState, nil) + chainProcessor.storageState = mockStorageState + mockBlockImportHandler := NewMockBlockImportHandler(ctrl) + mockBlockImportHandler.EXPECT().HandleBlockImport(mockBlock, trieState, true).Return(nil) chainProcessor.blockImportHandler = mockBlockImportHandler mockTelemetry := NewMockClient(ctrl) mockTelemetry.EXPECT().SendMessage(gomock.Any()) @@ -178,7 +219,7 @@ func Test_chainProcessor_handleBlock(t *testing.T) { ctrl := gomock.NewController(t) s := tt.chainProcessorBuilder(ctrl) - err := s.handleBlock(tt.block) + err := s.handleBlock(tt.block, tt.announce) assert.ErrorIs(t, err, tt.wantErr) }) } @@ -205,7 +246,7 @@ func Test_chainProcessor_handleBlock(t *testing.T) { } const expectedPanicValue = "parent state root does not match snapshot state root" assert.PanicsWithValue(t, expectedPanicValue, func() { - _ = chainProcessor.handleBlock(bock) + _ = chainProcessor.handleBlock(bock, false) }) }) } @@ -346,6 +387,7 @@ func Test_chainProcessor_processBlockData(t *testing.T) { chainProcessorBuilder: func(ctrl *gomock.Controller) chainProcessor { mockBlockState := NewMockBlockState(ctrl) mockBlockState.EXPECT().HasHeader(common.Hash{}).Return(false, mockError) + return chainProcessor{ blockState: mockBlockState, } @@ -371,8 +413,12 @@ func Test_chainProcessor_processBlockData(t *testing.T) { mockBlockState.EXPECT().HasHeader(common.Hash{}).Return(true, nil) mockBlockState.EXPECT().HasBlockBody(common.Hash{}).Return(true, nil) mockBlockState.EXPECT().GetBlockByHash(common.Hash{}).Return(nil, mockError) + + mockChainSync := NewMockChainSync(ctrl) + mockChainSync.EXPECT().syncState().Return(bootstrap) return chainProcessor{ blockState: mockBlockState, + chainSync: mockChainSync, } }, blockData: &types.BlockData{}, @@ -394,7 +440,11 @@ func Test_chainProcessor_processBlockData(t *testing.T) { mockFinalityGadget := NewMockFinalityGadget(ctrl) mockStorageState := NewMockStorageState(ctrl) mockBlockImportHandler := NewMockBlockImportHandler(ctrl) + + mockChainSync := NewMockChainSync(ctrl) + mockChainSync.EXPECT().syncState().Return(bootstrap) return chainProcessor{ + chainSync: mockChainSync, blockState: mockBlockState, finalityGadget: mockFinalityGadget, storageState: mockStorageState, @@ -427,7 +477,11 @@ func Test_chainProcessor_processBlockData(t *testing.T) { VerifyBlockJustification(headerHash, []byte{1, 2, 3}). Return(nil, mockError) + mockChainSync := NewMockChainSync(ctrl) + mockChainSync.EXPECT().syncState().Return(bootstrap) + return chainProcessor{ + chainSync: mockChainSync, blockState: blockState, finalityGadget: finalityGadget, } @@ -438,7 +492,7 @@ func Test_chainProcessor_processBlockData(t *testing.T) { }, expectedError: mockError, }, - "handle block data justification != nil": { + "handle block data justification successfully": { chainProcessorBuilder: func(ctrl *gomock.Controller) chainProcessor { mockBlock := &types.Block{ Header: types.Header{ @@ -459,10 +513,19 @@ func Test_chainProcessor_processBlockData(t *testing.T) { 3}).Return([]byte{1, 2, 3}, nil) mockStorageState := NewMockStorageState(ctrl) mockStorageState.EXPECT().TrieState(&common.Hash{}).Return(nil, nil) + + // given our current chain sync state is `tip` + // the `HandleBlockImport` method should expect + // true as the announce parameter + mockChainSync := NewMockChainSync(ctrl) + mockChainSync.EXPECT().syncState().Return(tip) + mockBlockImportHandler := NewMockBlockImportHandler(ctrl) mockBlockImportHandler.EXPECT().HandleBlockImport(mockBlock, - nil).Return(nil) + nil, true).Return(nil) + return chainProcessor{ + chainSync: mockChainSync, blockState: mockBlockState, finalityGadget: mockFinalityGadget, storageState: mockStorageState, @@ -494,7 +557,11 @@ func Test_chainProcessor_processBlockData(t *testing.T) { 3}).Return([]byte{1, 2, 3}, nil) mockStorageState := NewMockStorageState(ctrl) mockStorageState.EXPECT().TrieState(&common.Hash{}).Return(nil, mockError) + + mockChainSync := NewMockChainSync(ctrl) + mockChainSync.EXPECT().syncState().Return(bootstrap) return chainProcessor{ + chainSync: mockChainSync, blockState: mockBlockState, finalityGadget: mockFinalityGadget, storageState: mockStorageState, @@ -526,10 +593,18 @@ func Test_chainProcessor_processBlockData(t *testing.T) { 3}).Return([]byte{1, 2, 3}, nil) mockStorageState := NewMockStorageState(ctrl) mockStorageState.EXPECT().TrieState(&common.Hash{}).Return(nil, nil) + + // given our current chain sync state is `bootstrap` + // the `HandleBlockImport` method should expect + // false as the announce parameter + mockChainSync := NewMockChainSync(ctrl) + mockChainSync.EXPECT().syncState().Return(bootstrap) + mockBlockImportHandler := NewMockBlockImportHandler(ctrl) mockBlockImportHandler.EXPECT().HandleBlockImport(mockBlock, - nil).Return(mockError) + nil, false).Return(mockError) return chainProcessor{ + chainSync: mockChainSync, blockState: mockBlockState, finalityGadget: mockFinalityGadget, storageState: mockStorageState, @@ -546,7 +621,12 @@ func Test_chainProcessor_processBlockData(t *testing.T) { mockBlockState.EXPECT().HasHeader(common.Hash{}).Return(false, nil) mockBlockState.EXPECT().HasBlockBody(common.Hash{}).Return(false, nil) mockBlockState.EXPECT().CompareAndSetBlockData(&types.BlockData{}).Return(nil) + + mockChainSync := NewMockChainSync(ctrl) + mockChainSync.EXPECT().syncState().Return(bootstrap) + return chainProcessor{ + chainSync: mockChainSync, blockState: mockBlockState, } }, @@ -559,7 +639,12 @@ func Test_chainProcessor_processBlockData(t *testing.T) { mockBlockState.EXPECT().HasBlockBody(common.Hash{}).Return(false, nil) mockBabeVerifier := NewMockBabeVerifier(ctrl) mockBabeVerifier.EXPECT().VerifyBlock(&types.Header{}).Return(mockError) + + mockChainSync := NewMockChainSync(ctrl) + mockChainSync.EXPECT().syncState().Return(bootstrap) + return chainProcessor{ + chainSync: mockChainSync, blockState: mockBlockState, babeVerifier: mockBabeVerifier, } @@ -578,7 +663,12 @@ func Test_chainProcessor_processBlockData(t *testing.T) { mockBlockState.EXPECT().GetHeader(common.Hash{}).Return(nil, mockError) mockBabeVerifier := NewMockBabeVerifier(ctrl) mockBabeVerifier.EXPECT().VerifyBlock(&types.Header{}).Return(nil) + + mockChainSync := NewMockChainSync(ctrl) + mockChainSync.EXPECT().syncState().Return(bootstrap) + return chainProcessor{ + chainSync: mockChainSync, blockState: mockBlockState, babeVerifier: mockBabeVerifier, } @@ -601,7 +691,11 @@ func Test_chainProcessor_processBlockData(t *testing.T) { }, nil) mockBlockState.EXPECT().AddBlockToBlockTree(&types.Block{ Header: types.Header{Number: 1}}).Return(mockError) + + mockChainSync := NewMockChainSync(ctrl) + mockChainSync.EXPECT().syncState().Return(bootstrap) return chainProcessor{ + chainSync: mockChainSync, blockState: mockBlockState, } }, @@ -624,9 +718,16 @@ func Test_chainProcessor_processBlockData(t *testing.T) { mockBlockState.EXPECT().AddBlockToBlockTree(&types.Block{Header: types.Header{Number: 1}}).Return(nil) mockStorageState := NewMockStorageState(ctrl) mockStorageState.EXPECT().TrieState(&common.Hash{}).Return(mockTrieState, nil) + + mockChainSync := NewMockChainSync(ctrl) + mockChainSync.EXPECT().syncState().Return(bootstrap) + mockBlockImportHandler := NewMockBlockImportHandler(ctrl) - mockBlockImportHandler.EXPECT().HandleBlockImport(&types.Block{Header: types.Header{Number: 1}}, mockTrieState) + mockBlockImportHandler.EXPECT().HandleBlockImport(&types.Block{Header: types.Header{Number: 1}}, + mockTrieState, false) + return chainProcessor{ + chainSync: mockChainSync, blockState: mockBlockState, storageState: mockStorageState, blockImportHandler: mockBlockImportHandler, @@ -649,7 +750,11 @@ func Test_chainProcessor_processBlockData(t *testing.T) { VerifyBlockJustification(expectedBlockDataHeaderHash, []byte{1, 2, 3}). Return(nil, mockError) + mockChainSync := NewMockChainSync(ctrl) + mockChainSync.EXPECT().syncState().Return(bootstrap) + return chainProcessor{ + chainSync: mockChainSync, blockState: blockState, finalityGadget: finalityGadget, } @@ -667,7 +772,11 @@ func Test_chainProcessor_processBlockData(t *testing.T) { mockBlockState.EXPECT().HasHeader(common.Hash{}).Return(false, nil) mockBlockState.EXPECT().HasBlockBody(common.Hash{}).Return(false, nil) mockBlockState.EXPECT().CompareAndSetBlockData(&types.BlockData{}).Return(mockError) + + mockChainSync := NewMockChainSync(ctrl) + mockChainSync.EXPECT().syncState().Return(bootstrap) return chainProcessor{ + chainSync: mockChainSync, blockState: mockBlockState, } }, @@ -684,6 +793,7 @@ func Test_chainProcessor_processBlockData(t *testing.T) { mockInstance := NewMockRuntimeInstance(ctrl) mockInstance.EXPECT().SetContextStorage(mockTrieState) mockInstance.EXPECT().ExecuteBlock(mockBlock).Return(nil, nil) + mockBlockState := NewMockBlockState(ctrl) mockBlockState.EXPECT().HasHeader(common.Hash{}).Return(false, nil) mockBlockState.EXPECT().HasBlockBody(common.Hash{}).Return(false, nil) @@ -693,17 +803,25 @@ func Test_chainProcessor_processBlockData(t *testing.T) { }, nil) mockBlockState.EXPECT().CompareAndSetBlockData(&types.BlockData{Header: &types.Header{}, Body: &types.Body{}}) mockBlockState.EXPECT().GetRuntime(&runtimeHash).Return(mockInstance, nil) + mockBabeVerifier := NewMockBabeVerifier(ctrl) - mockBabeVerifier.EXPECT().VerifyBlock(&types.Header{}) + mockBabeVerifier.EXPECT().VerifyBlock(&types.Header{}).Return(nil) + mockStorageState := NewMockStorageState(ctrl) mockStorageState.EXPECT().Lock() mockStorageState.EXPECT().TrieState(&stateRootHash).Return(mockTrieState, nil) mockStorageState.EXPECT().Unlock() + + mockChainSync := NewMockChainSync(ctrl) + mockChainSync.EXPECT().syncState().Return(tip) + mockBlockImportHandler := NewMockBlockImportHandler(ctrl) - mockBlockImportHandler.EXPECT().HandleBlockImport(mockBlock, mockTrieState) + mockBlockImportHandler.EXPECT().HandleBlockImport(mockBlock, mockTrieState, true) + mockTelemetry := NewMockClient(ctrl) mockTelemetry.EXPECT().SendMessage(gomock.Any()).AnyTimes() return chainProcessor{ + chainSync: mockChainSync, blockState: mockBlockState, babeVerifier: mockBabeVerifier, storageState: mockStorageState, @@ -745,8 +863,13 @@ func Test_chainProcessor_processBlockData(t *testing.T) { mockStorageState.EXPECT().Lock() mockStorageState.EXPECT().TrieState(&stateRootHash).Return(mockTrieState, nil) mockStorageState.EXPECT().Unlock() + + mockChainSync := NewMockChainSync(ctrl) + mockChainSync.EXPECT().syncState().Return(bootstrap) + mockBlockImportHandler := NewMockBlockImportHandler(ctrl) - mockBlockImportHandler.EXPECT().HandleBlockImport(mockBlock, mockTrieState) + mockBlockImportHandler.EXPECT().HandleBlockImport(mockBlock, mockTrieState, false) + mockTelemetry := NewMockClient(ctrl) mockTelemetry.EXPECT().SendMessage(gomock.Any()).AnyTimes() mockFinalityGadget := NewMockFinalityGadget(ctrl) @@ -754,6 +877,7 @@ func Test_chainProcessor_processBlockData(t *testing.T) { common.MustHexToHash("0xdcdd89927d8a348e00257e1ecc8617f45edb5118efff3ea2f9961b2ad9b7690a"), justification).Return(justification, nil) return chainProcessor{ + chainSync: mockChainSync, blockState: mockBlockState, babeVerifier: mockBabeVerifier, storageState: mockStorageState, @@ -788,6 +912,7 @@ func Test_chainProcessor_processReadyBlocks(t *testing.T) { t.Parallel() mockError := errors.New("test mock error") tests := map[string]struct { + chainSyncBuilder func(ctrl *gomock.Controller) ChainSync blockStateBuilder func(ctrl *gomock.Controller, done chan struct{}) BlockState blockData *types.BlockData babeVerifierBuilder func(ctrl *gomock.Controller) BabeVerifier @@ -795,6 +920,11 @@ func Test_chainProcessor_processReadyBlocks(t *testing.T) { storageStateBuilder func(ctrl *gomock.Controller, done chan struct{}) StorageState }{ "base case": { + chainSyncBuilder: func(ctrl *gomock.Controller) ChainSync { + cs := NewMockChainSync(ctrl) + cs.EXPECT().syncState().Return(bootstrap) + return cs + }, blockStateBuilder: func(ctrl *gomock.Controller, done chan struct{}) BlockState { mockBlockState := NewMockBlockState(ctrl) mockBlockState.EXPECT().HasHeader(common.Hash{}).Return(false, nil) @@ -820,6 +950,11 @@ func Test_chainProcessor_processReadyBlocks(t *testing.T) { }, }, "add block": { + chainSyncBuilder: func(ctrl *gomock.Controller) ChainSync { + cs := NewMockChainSync(ctrl) + cs.EXPECT().syncState().Return(bootstrap) + return cs + }, blockStateBuilder: func(ctrl *gomock.Controller, done chan struct{}) BlockState { mockBlockState := NewMockBlockState(ctrl) mockBlockState.EXPECT().HasHeader(common.Hash{}).Return(false, nil) @@ -853,6 +988,11 @@ func Test_chainProcessor_processReadyBlocks(t *testing.T) { }, }, "error in process block": { + chainSyncBuilder: func(ctrl *gomock.Controller) ChainSync { + cs := NewMockChainSync(ctrl) + cs.EXPECT().syncState().Return(bootstrap) + return cs + }, blockStateBuilder: func(ctrl *gomock.Controller, done chan struct{}) BlockState { mockBlockState := NewMockBlockState(ctrl) mockBlockState.EXPECT().HasHeader(common.Hash{}).Return(false, nil) @@ -886,6 +1026,11 @@ func Test_chainProcessor_processReadyBlocks(t *testing.T) { }, }, "add block error": { + chainSyncBuilder: func(ctrl *gomock.Controller) ChainSync { + cs := NewMockChainSync(ctrl) + cs.EXPECT().syncState().Return(bootstrap) + return cs + }, blockStateBuilder: func(ctrl *gomock.Controller, done chan struct{}) BlockState { mockBlockState := NewMockBlockState(ctrl) mockBlockState.EXPECT().HasHeader(common.Hash{}).Return(false, nil) @@ -932,6 +1077,7 @@ func Test_chainProcessor_processReadyBlocks(t *testing.T) { ctx: ctx, cancel: cancel, readyBlocks: readyBlock, + chainSync: tt.chainSyncBuilder(ctrl), blockState: tt.blockStateBuilder(ctrl, done), babeVerifier: tt.babeVerifierBuilder(ctrl), pendingBlocks: tt.pendingBlockBuilder(ctrl, done), @@ -1002,9 +1148,18 @@ func Test_newChainProcessor(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - got := newChainProcessor(tt.args.readyBlocks, tt.args.pendingBlocks, tt.args.blockState, - tt.args.storageState, tt.args.transactionState, tt.args.babeVerifier, tt.args.finalityGadget, - tt.args.blockImportHandler, nil) + cpCfg := chainProcessorConfig{ + readyBlocks: tt.args.readyBlocks, + pendingBlocks: tt.args.pendingBlocks, + blockState: tt.args.blockState, + storageState: tt.args.storageState, + transactionState: tt.args.transactionState, + babeVerifier: tt.args.babeVerifier, + finalityGadget: tt.args.finalityGadget, + blockImportHandler: tt.args.blockImportHandler, + } + + got := newChainProcessor(cpCfg) assert.NotNil(t, got.ctx) got.ctx = nil assert.NotNil(t, got.cancel) diff --git a/dot/sync/chain_sync.go b/dot/sync/chain_sync.go index 7ca4d6de50..7eb6ced84f 100644 --- a/dot/sync/chain_sync.go +++ b/dot/sync/chain_sync.go @@ -176,7 +176,7 @@ type chainSyncConfig struct { slotDuration time.Duration } -func newChainSync(cfg *chainSyncConfig) *chainSync { +func newChainSync(cfg chainSyncConfig) *chainSync { ctx, cancel := context.WithCancel(context.Background()) const syncSamplesToKeep = 30 const logSyncPeriod = 5 * time.Second diff --git a/dot/sync/chain_sync_test.go b/dot/sync/chain_sync_test.go index b6daf18edf..196c97720c 100644 --- a/dot/sync/chain_sync_test.go +++ b/dot/sync/chain_sync_test.go @@ -865,7 +865,7 @@ func TestChainSync_validateResponse(t *testing.T) { t.Parallel() ctrl := gomock.NewController(t) - cfg := &chainSyncConfig{ + cfg := chainSyncConfig{ bs: tt.blockStateBuilder(ctrl), pendingBlocks: newDisjointBlockSet(pendingBlocksLimit), readyBlocks: newBlockQueue(maxResponseSize), @@ -1601,7 +1601,7 @@ func newTestChainSyncWithReadyBlocks(ctrl *gomock.Controller, readyBlocks *block mockBlockState := NewMockBlockState(ctrl) mockBlockState.EXPECT().GetFinalisedNotifierChannel().Return(make(chan *types.FinalisationInfo)) - cfg := &chainSyncConfig{ + cfg := chainSyncConfig{ bs: mockBlockState, readyBlocks: readyBlocks, pendingBlocks: newDisjointBlockSet(pendingBlocksLimit), diff --git a/dot/sync/interface.go b/dot/sync/interface.go index 7da7fa9854..c3752820b6 100644 --- a/dot/sync/interface.go +++ b/dot/sync/interface.go @@ -107,7 +107,7 @@ type FinalityGadget interface { // BlockImportHandler is the interface for the handler of newly imported blocks type BlockImportHandler interface { - HandleBlockImport(block *types.Block, state *rtstorage.TrieState) error + HandleBlockImport(block *types.Block, state *rtstorage.TrieState, announce bool) error } // Network is the interface for the network diff --git a/dot/sync/mocks_test.go b/dot/sync/mocks_test.go index 84eff8ba51..c8712a727b 100644 --- a/dot/sync/mocks_test.go +++ b/dot/sync/mocks_test.go @@ -698,17 +698,17 @@ func (m *MockBlockImportHandler) EXPECT() *MockBlockImportHandlerMockRecorder { } // HandleBlockImport mocks base method. -func (m *MockBlockImportHandler) HandleBlockImport(arg0 *types.Block, arg1 *storage.TrieState) error { +func (m *MockBlockImportHandler) HandleBlockImport(arg0 *types.Block, arg1 *storage.TrieState, arg2 bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "HandleBlockImport", arg0, arg1) + ret := m.ctrl.Call(m, "HandleBlockImport", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // HandleBlockImport indicates an expected call of HandleBlockImport. -func (mr *MockBlockImportHandlerMockRecorder) HandleBlockImport(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockBlockImportHandlerMockRecorder) HandleBlockImport(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleBlockImport", reflect.TypeOf((*MockBlockImportHandler)(nil).HandleBlockImport), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleBlockImport", reflect.TypeOf((*MockBlockImportHandler)(nil).HandleBlockImport), arg0, arg1, arg2) } // MockNetwork is a mock of Network interface. diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index b199c1e645..b161a3e838 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -46,7 +46,7 @@ func NewService(cfg *Config) (*Service, error) { readyBlocks := newBlockQueue(maxResponseSize * 30) pendingBlocks := newDisjointBlockSet(pendingBlocksLimit) - csCfg := &chainSyncConfig{ + csCfg := chainSyncConfig{ bs: cfg.BlockState, net: cfg.Network, readyBlocks: readyBlocks, @@ -55,11 +55,21 @@ func NewService(cfg *Config) (*Service, error) { maxPeers: cfg.MaxPeers, slotDuration: cfg.SlotDuration, } - chainSync := newChainSync(csCfg) - chainProcessor := newChainProcessor(readyBlocks, pendingBlocks, - cfg.BlockState, cfg.StorageState, cfg.TransactionState, - cfg.BabeVerifier, cfg.FinalityGadget, cfg.BlockImportHandler, cfg.Telemetry) + + cpCfg := chainProcessorConfig{ + readyBlocks: readyBlocks, + pendingBlocks: pendingBlocks, + syncer: chainSync, + blockState: cfg.BlockState, + storageState: cfg.StorageState, + transactionState: cfg.TransactionState, + babeVerifier: cfg.BabeVerifier, + finalityGadget: cfg.FinalityGadget, + blockImportHandler: cfg.BlockImportHandler, + telemetry: cfg.Telemetry, + } + chainProcessor := newChainProcessor(cpCfg) return &Service{ blockState: cfg.BlockState, diff --git a/dot/sync/syncer_integration_test.go b/dot/sync/syncer_integration_test.go index 6b349ee82e..c717b542b9 100644 --- a/dot/sync/syncer_integration_test.go +++ b/dot/sync/syncer_integration_test.go @@ -83,8 +83,8 @@ func newTestSyncer(t *testing.T) *Service { cfg.BlockState.StoreRuntime(cfg.BlockState.BestBlockHash(), instance) blockImportHandler := NewMockBlockImportHandler(ctrl) blockImportHandler.EXPECT().HandleBlockImport(gomock.AssignableToTypeOf(&types.Block{}), - gomock.AssignableToTypeOf(&rtstorage.TrieState{})).DoAndReturn( - func(block *types.Block, ts *rtstorage.TrieState) error { + gomock.AssignableToTypeOf(&rtstorage.TrieState{}), false).DoAndReturn( + func(block *types.Block, ts *rtstorage.TrieState, _ bool) error { // store updates state trie nodes in database if err = stateSrvc.Storage.StoreTrie(ts, &block.Header); err != nil { logger.Warnf("failed to store state trie for imported block %s: %s", block.Header.Hash(), err)