Skip to content

Commit

Permalink
feat/EN-2131-Broadcast-shard-block-which-is-not-notarized-in-metachain (
Browse files Browse the repository at this point in the history
#169)

* Fixed problem when early received messages (before initialization of current round) from a proposer (Header & Body) were not executed
* Removed duplicate code by extracting it in one method
* Added some print info
* Fixed test
* Improved GetUnnotarisedHeaders method
* Added guard condition
* Broadcast only finals blocks to metachain
* Changed some print info
* Refactored + fixed nil check for header
* Fixed integration tests
* Fixed type assertion with nil bug
* Changed maxThreads accepted in OS from 10k to 100k
* Fixed after code review
* Extracted duplicate code in a common file
* Removed unused code
* Refactor some methods name
* Fixed print messages
* Throtled txs from GenerateAndSendBulkTransactions method
* Improved throtled txs from GenerateAndSendBulkTransactions method
* Fixed after code review
* Added error as additional return parameter to some methods to be able to check the result before call other methods with these values (nil bot not nil)
* Fixed return nil instead err and added new check
  • Loading branch information
SebastianMarian authored May 28, 2019
1 parent dc8ee21 commit ef81f31
Show file tree
Hide file tree
Showing 58 changed files with 1,984 additions and 261 deletions.
21 changes: 21 additions & 0 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"os"
"os/signal"
"path/filepath"
"runtime/debug"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -64,6 +65,7 @@ import (
"github.com/ElrondNetwork/elrond-go-sandbox/process/factory/metachain"
"github.com/ElrondNetwork/elrond-go-sandbox/process/factory/shard"
processSync "github.com/ElrondNetwork/elrond-go-sandbox/process/sync"
"github.com/ElrondNetwork/elrond-go-sandbox/process/track"
"github.com/ElrondNetwork/elrond-go-sandbox/process/transaction"
"github.com/ElrondNetwork/elrond-go-sandbox/sharding"
"github.com/ElrondNetwork/elrond-go-sandbox/storage"
Expand Down Expand Up @@ -253,6 +255,10 @@ func main() {
},
}

//TODO: The next line should be removed when the write in batches is done
// set the maximum allowed OS threads (not go routines) which can run in the same time (the default is 10000)
debug.SetMaxThreads(100000)

app.Action = func(c *cli.Context) error {
return startNode(c, log)
}
Expand Down Expand Up @@ -769,6 +775,11 @@ func createShardNode(
return nil, nil, nil, err
}

blockTracker, err := track.NewShardBlockTracker(datapool, marshalizer, shardCoordinator, store)
if err != nil {
return nil, nil, nil, err
}

blockProcessor, err := block.NewShardProcessor(
datapool,
store,
Expand All @@ -778,6 +789,7 @@ func createShardNode(
accountsAdapter,
shardCoordinator,
forkDetector,
blockTracker,
createTxRequestHandler(resolversFinder, factory.TransactionTopic, log),
createRequestHandler(resolversFinder, factory.MiniBlocksTopic, log),
)
Expand All @@ -800,6 +812,7 @@ func createShardNode(
node.WithConsensusGroupSize(int(nodesConfig.ConsensusGroupSize)),
node.WithSyncer(syncer),
node.WithBlockProcessor(blockProcessor),
node.WithBlockTracker(blockTracker),
node.WithGenesisTime(time.Unix(nodesConfig.StartTime, 0)),
node.WithRounder(rounder),
node.WithDataPool(datapool),
Expand All @@ -818,6 +831,7 @@ func createShardNode(
node.WithConsensusType(config.Consensus.Type),
node.WithTxSingleSigner(txSingleSigner),
node.WithActiveMetachain(nodesConfig.MetaChainActive),
node.WithTxStorageSize(config.TxStorage.Cache.Size),
)
if err != nil {
return nil, nil, nil, errors.New("error creating node: " + err.Error())
Expand Down Expand Up @@ -1049,6 +1063,11 @@ func createMetaNode(
return nil, nil, nil, err
}

blockTracker, err := track.NewMetaBlockTracker()
if err != nil {
return nil, nil, nil, err
}

shardsGenesisBlocks, err := generateGenesisHeadersForMetachainInit(
nodesConfig,
genesisConfig,
Expand Down Expand Up @@ -1093,6 +1112,7 @@ func createMetaNode(
node.WithConsensusGroupSize(int(nodesConfig.MetaChainConsensusGroupSize)),
node.WithSyncer(syncer),
node.WithBlockProcessor(metaProcessor),
node.WithBlockTracker(blockTracker),
node.WithGenesisTime(time.Unix(nodesConfig.StartTime, 0)),
node.WithRounder(rounder),
node.WithMetaDataPool(metaDatapool),
Expand All @@ -1110,6 +1130,7 @@ func createMetaNode(
node.WithResolversFinder(resolversFinder),
node.WithConsensusType(config.Consensus.Type),
node.WithTxSingleSigner(txSingleSigner),
node.WithTxStorageSize(config.TxStorage.Cache.Size),
)
if err != nil {
return nil, nil, nil, errors.New("error creating meta-node: " + err.Error())
Expand Down
10 changes: 0 additions & 10 deletions consensus/mock/blockProcessorMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ type BlockProcessorMock struct {
DecodeBlockHeaderCalled func(dta []byte) data.HeaderHandler
}

// SetOnRequestTransaction mocks setting request transaction call back function
func (blProcMock *BlockProcessorMock) SetOnRequestTransaction(f func(destShardID uint32, txHash []byte)) {
blProcMock.SetOnRequestTransactionCalled(f)
}

// ProcessBlock mocks pocessing a block
func (blProcMock *BlockProcessorMock) ProcessBlock(blockChain data.ChainHandler, header data.HeaderHandler, body data.BodyHandler, haveTime func() time.Duration) error {
return blProcMock.ProcessBlockCalled(blockChain, header, body, haveTime)
Expand All @@ -42,11 +37,6 @@ func (blProcMock *BlockProcessorMock) RevertAccountState() {
blProcMock.RevertAccountStateCalled()
}

// CreateGenesisBlock mocks the creation of a genesis block body
func (blProcMock *BlockProcessorMock) CreateGenesisBlock(balances map[string]*big.Int) (data.HeaderHandler, error) {
return blProcMock.CreateGenesisBlockCalled(balances)
}

// CreateTxBlockBody mocks the creation of a transaction block body
func (blProcMock *BlockProcessorMock) CreateBlockBody(round int32, haveTime func() bool) (data.BodyHandler, error) {
return blProcMock.CreateBlockCalled(round, haveTime)
Expand Down
33 changes: 33 additions & 0 deletions consensus/mock/blocksTrackerMock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package mock

import (
"github.com/ElrondNetwork/elrond-go-sandbox/data"
)

type BlocksTrackerMock struct {
UnnotarisedBlocksCalled func() []data.HeaderHandler
RemoveNotarisedBlocksCalled func(headerHandler data.HeaderHandler) error
AddBlockCalled func(headerHandler data.HeaderHandler)
SetBlockBroadcastRoundCalled func(nonce uint64, round int32)
BlockBroadcastRoundCalled func(nonce uint64) int32
}

func (btm *BlocksTrackerMock) UnnotarisedBlocks() []data.HeaderHandler {
return btm.UnnotarisedBlocksCalled()
}

func (btm *BlocksTrackerMock) RemoveNotarisedBlocks(headerHandler data.HeaderHandler) error {
return btm.RemoveNotarisedBlocksCalled(headerHandler)
}

func (btm *BlocksTrackerMock) AddBlock(headerHandler data.HeaderHandler) {
btm.AddBlockCalled(headerHandler)
}

func (btm *BlocksTrackerMock) SetBlockBroadcastRound(nonce uint64, round int32) {
btm.SetBlockBroadcastRoundCalled(nonce, round)
}

func (btm *BlocksTrackerMock) BlockBroadcastRound(nonce uint64) int32 {
return btm.BlockBroadcastRoundCalled(nonce)
}
1 change: 1 addition & 0 deletions consensus/mock/mockTestInitializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func InitBlockProcessorMock() *BlockProcessorMock {
blockProcessorMock.DecodeBlockHeaderCalled = func(dta []byte) data.HeaderHandler {
return &block.Header{}
}

return blockProcessorMock
}

Expand Down
6 changes: 6 additions & 0 deletions consensus/mock/sposWorkerMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ type SposWorkerMock struct {
ExtendCalled func(subroundId int)
GetConsensusStateChangedChannelsCalled func() chan bool
GetBroadcastBlockCalled func(data.BodyHandler, data.HeaderHandler) error
GetBroadcastHeaderCalled func(data.HeaderHandler) error
ExecuteStoredMessagesCalled func()
BroadcastUnnotarisedBlocksCalled func()
}

func (sposWorkerMock *SposWorkerMock) AddReceivedMessageCall(messageType consensus.MessageType,
Expand Down Expand Up @@ -50,3 +52,7 @@ func (sposWorkerMock *SposWorkerMock) BroadcastBlock(body data.BodyHandler, head
func (sposWorkerMock *SposWorkerMock) ExecuteStoredMessages() {
sposWorkerMock.ExecuteStoredMessagesCalled()
}

func (sposWorkerMock *SposWorkerMock) BroadcastUnnotarisedBlocks() {
sposWorkerMock.BroadcastUnnotarisedBlocksCalled()
}
1 change: 1 addition & 0 deletions consensus/spos/bls/blsSubroundsFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (fct *factory) generateStartRoundSubround() error {
processingThresholdPercent,
getSubroundName,
fct.worker.ExecuteStoredMessages,
fct.worker.BroadcastUnnotarisedBlocks,
)
if err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion consensus/spos/bls/subroundEndRound.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bls

import (
"fmt"
"time"

"github.com/ElrondNetwork/elrond-go-sandbox/consensus/spos"
"github.com/ElrondNetwork/elrond-go-sandbox/data"
Expand Down Expand Up @@ -80,12 +81,16 @@ func (sr *subroundEndRound) doEndRoundJob() bool {
sr.Header.SetPubKeysBitmap(bitmap)
sr.Header.SetSignature(sig)

timeBefore := time.Now()
// Commit the block (commits also the account state)
err = sr.BlockProcessor().CommitBlock(sr.Blockchain(), sr.ConsensusState.Header, sr.ConsensusState.BlockBody)
if err != nil {
log.Error(err.Error())
return false
}
timeAfter := time.Now()

log.Info(fmt.Sprintf("time elapsed to commit block: %v sec\n", timeAfter.Sub(timeBefore).Seconds()))

sr.SetStatus(SrEndRound, spos.SsFinished)

Expand All @@ -95,7 +100,7 @@ func (sr *subroundEndRound) doEndRoundJob() bool {
log.Error(err.Error())
}

log.Info(fmt.Sprintf("%sStep 3: BlockBody and Header has been commited and broadcasted \n", sr.SyncTimer().FormattedCurrentTime()))
log.Info(fmt.Sprintf("%sStep 3: BlockBody and Header has been committed and broadcast\n", sr.SyncTimer().FormattedCurrentTime()))

msg := fmt.Sprintf("Added proposed block with nonce %d in blockchain", sr.Header.GetNonce())
log.Info(log.Headline(msg, sr.SyncTimer().FormattedCurrentTime(), "+"))
Expand Down
1 change: 1 addition & 0 deletions consensus/spos/bn/bnSubroundsFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (fct *factory) generateStartRoundSubround() error {
processingThresholdPercent,
getSubroundName,
fct.worker.ExecuteStoredMessages,
fct.worker.BroadcastUnnotarisedBlocks,
)

if err != nil {
Expand Down
9 changes: 6 additions & 3 deletions consensus/spos/bn/subroundEndRound.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bn

import (
"fmt"
"time"

"github.com/ElrondNetwork/elrond-go-sandbox/consensus/spos"
"github.com/ElrondNetwork/elrond-go-sandbox/data"
Expand Down Expand Up @@ -45,11 +46,9 @@ func checkNewSubroundEndRoundParams(
if baseSubround == nil {
return spos.ErrNilSubround
}

if baseSubround.ConsensusState == nil {
return spos.ErrNilConsensusState
}

if broadcastBlock == nil {
return spos.ErrNilBroadcastBlockFunction
}
Expand Down Expand Up @@ -77,12 +76,16 @@ func (sr *subroundEndRound) doEndRoundJob() bool {

sr.Header.SetSignature(sig)

timeBefore := time.Now()
// Commit the block (commits also the account state)
err = sr.BlockProcessor().CommitBlock(sr.Blockchain(), sr.ConsensusState.Header, sr.ConsensusState.BlockBody)
if err != nil {
log.Error(err.Error())
return false
}
timeAfter := time.Now()

log.Info(fmt.Sprintf("time elapsed to commit block: %v sec\n", timeAfter.Sub(timeBefore).Seconds()))

sr.SetStatus(SrEndRound, spos.SsFinished)

Expand All @@ -92,7 +95,7 @@ func (sr *subroundEndRound) doEndRoundJob() bool {
log.Error(err.Error())
}

log.Info(fmt.Sprintf("%sStep 6: TxBlockBody and Header has been commited and broadcasted \n", sr.SyncTimer().FormattedCurrentTime()))
log.Info(fmt.Sprintf("%sStep 6: TxBlockBody and Header has been committed and broadcast\n", sr.SyncTimer().FormattedCurrentTime()))

actionMsg := "synchronized"
if sr.IsSelfLeaderInCurrentRound() {
Expand Down
4 changes: 3 additions & 1 deletion consensus/spos/commonSubround/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"time"

"github.com/ElrondNetwork/elrond-go-sandbox/consensus/mock"

"github.com/ElrondNetwork/elrond-go-sandbox/consensus/spos"
)

Expand Down Expand Up @@ -116,3 +115,6 @@ func getSubroundName(subroundId int) string {
// executeStoredMessages tries to execute all the messages received which are valid for execution
func executeStoredMessages() {
}

func broadcastUnnotarisedBlocks() {
}
16 changes: 13 additions & 3 deletions consensus/spos/commonSubround/subroundStartRound.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import (
"fmt"
"time"

"github.com/ElrondNetwork/elrond-go-sandbox/consensus/spos"
"github.com/ElrondNetwork/elrond-go-sandbox/core"
"github.com/ElrondNetwork/elrond-go-sandbox/core/logger"

"github.com/ElrondNetwork/elrond-go-sandbox/consensus/spos"
)

var log = logger.DefaultLogger()
Expand All @@ -19,6 +18,7 @@ type SubroundStartRound struct {
processingThresholdPercentage int
getSubroundName func(subroundId int) string
executeStoredMessages func()
broadcastUnnotarisedBlocks func()
}

// NewSubroundStartRound creates a SubroundStartRound object
Expand All @@ -28,9 +28,11 @@ func NewSubroundStartRound(
processingThresholdPercentage int,
getSubroundName func(subroundId int) string,
executeStoredMessages func(),
broadcastUnnotarisedBlocks func(),
) (*SubroundStartRound, error) {
err := checkNewSubroundStartRoundParams(
baseSubround,
broadcastUnnotarisedBlocks,
)
if err != nil {
return nil, err
Expand All @@ -41,6 +43,7 @@ func NewSubroundStartRound(
processingThresholdPercentage,
getSubroundName,
executeStoredMessages,
broadcastUnnotarisedBlocks,
}
srStartRound.Job = srStartRound.doStartRoundJob
srStartRound.Check = srStartRound.doStartRoundConsensusCheck
Expand All @@ -51,14 +54,17 @@ func NewSubroundStartRound(

func checkNewSubroundStartRoundParams(
baseSubround *spos.Subround,
broadcastUnnotarisedBlocks func(),
) error {
if baseSubround == nil {
return spos.ErrNilSubround
}

if baseSubround.ConsensusState == nil {
return spos.ErrNilConsensusState
}
if broadcastUnnotarisedBlocks == nil {
return spos.ErrNilBroadcastUnnotarisedBlocks
}

err := spos.ValidateConsensusCore(baseSubround.ConsensusCoreHandler)

Expand Down Expand Up @@ -156,6 +162,10 @@ func (sr *SubroundStartRound) initCurrentRound() bool {

sr.SetStatus(sr.Current(), spos.SsFinished)

if leader == sr.SelfPubKey() {
sr.broadcastUnnotarisedBlocks()
}

// execute stored messages which were received in this new round but before this initialisation
go sr.executeStoredMessages()

Expand Down
Loading

0 comments on commit ef81f31

Please sign in to comment.