diff --git a/cmd/node/factory/structs.go b/cmd/node/factory/structs.go index ea9ea8e02a9..40787da4a02 100644 --- a/cmd/node/factory/structs.go +++ b/cmd/node/factory/structs.go @@ -85,6 +85,8 @@ const ( MaxTxsToRequest = 100 ) +var log = logger.DefaultLogger() + // Network struct holds the network components of the Elrond protocol type Network struct { NetMessenger p2p.Messenger @@ -863,57 +865,57 @@ func createShardDataPoolFromConfig( uint64ByteSliceConverter typeConverters.Uint64ByteSliceConverter, ) (dataRetriever.PoolsHolder, error) { - fmt.Println("creatingShardDataPool from config") + log.Info("creatingShardDataPool from config") txPool, err := shardedData.NewShardedData(getCacherFromConfig(config.TxDataPool)) if err != nil { - fmt.Println("error creating txpool") + log.Info("error creating txpool") return nil, err } uTxPool, err := shardedData.NewShardedData(getCacherFromConfig(config.UnsignedTransactionDataPool)) if err != nil { - fmt.Println("error creating smart contract result") + log.Info("error creating smart contract result") return nil, err } cacherCfg := getCacherFromConfig(config.BlockHeaderDataPool) hdrPool, err := storageUnit.NewCache(cacherCfg.Type, cacherCfg.Size, cacherCfg.Shards) if err != nil { - fmt.Println("error creating hdrpool") + log.Info("error creating hdrpool") return nil, err } cacherCfg = getCacherFromConfig(config.MetaBlockBodyDataPool) metaBlockBody, err := storageUnit.NewCache(cacherCfg.Type, cacherCfg.Size, cacherCfg.Shards) if err != nil { - fmt.Println("error creating metaBlockBody") + log.Info("error creating metaBlockBody") return nil, err } cacherCfg = getCacherFromConfig(config.BlockHeaderNoncesDataPool) hdrNoncesCacher, err := storageUnit.NewCache(cacherCfg.Type, cacherCfg.Size, cacherCfg.Shards) if err != nil { - fmt.Println("error creating hdrNoncesCacher") + log.Info("error creating hdrNoncesCacher") return nil, err } hdrNonces, err := dataPool.NewNonceSyncMapCacher(hdrNoncesCacher, uint64ByteSliceConverter) if err != nil { - fmt.Println("error creating hdrNonces") + log.Info("error creating hdrNonces") return nil, err } cacherCfg = getCacherFromConfig(config.TxBlockBodyDataPool) txBlockBody, err := storageUnit.NewCache(cacherCfg.Type, cacherCfg.Size, cacherCfg.Shards) if err != nil { - fmt.Println("error creating txBlockBody") + log.Info("error creating txBlockBody") return nil, err } cacherCfg = getCacherFromConfig(config.PeerBlockBodyDataPool) peerChangeBlockBody, err := storageUnit.NewCache(cacherCfg.Type, cacherCfg.Size, cacherCfg.Shards) if err != nil { - fmt.Println("error creating peerChangeBlockBody") + log.Info("error creating peerChangeBlockBody") return nil, err } @@ -935,31 +937,31 @@ func createMetaDataPoolFromConfig( cacherCfg := getCacherFromConfig(config.MetaBlockBodyDataPool) metaBlockBody, err := storageUnit.NewCache(cacherCfg.Type, cacherCfg.Size, cacherCfg.Shards) if err != nil { - fmt.Println("error creating metaBlockBody") + log.Info("error creating metaBlockBody") return nil, err } miniBlockHashes, err := shardedData.NewShardedData(getCacherFromConfig(config.MiniBlockHeaderHashesDataPool)) if err != nil { - fmt.Println("error creating miniBlockHashes") + log.Info("error creating miniBlockHashes") return nil, err } cacherCfg = getCacherFromConfig(config.ShardHeadersDataPool) shardHeaders, err := storageUnit.NewCache(cacherCfg.Type, cacherCfg.Size, cacherCfg.Shards) if err != nil { - fmt.Println("error creating shardHeaders") + log.Info("error creating shardHeaders") return nil, err } headersNoncesCacher, err := storageUnit.NewCache(cacherCfg.Type, cacherCfg.Size, cacherCfg.Shards) if err != nil { - fmt.Println("error creating shard headers nonces pool") + log.Info("error creating shard headers nonces pool") return nil, err } headersNonces, err := dataPool.NewNonceSyncMapCacher(headersNoncesCacher, uint64ByteSliceConverter) if err != nil { - fmt.Println("error creating shard headers nonces pool") + log.Info("error creating shard headers nonces pool") return nil, err } diff --git a/cmd/node/main.go b/cmd/node/main.go index df78b999a72..d6331ee2ec2 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -530,6 +530,9 @@ func startNode(ctx *cli.Context, log *logger.Logger, version string) error { coreComponents.StatusHandler.SetStringValue(core.MetricNodeType, string(nodeType)) coreComponents.StatusHandler.SetUInt64Value(core.MetricRoundTime, nodesConfig.RoundDuration/milisecondsInSecond) coreComponents.StatusHandler.SetStringValue(core.MetricAppVersion, version) + coreComponents.StatusHandler.SetUInt64Value(core.MetricCountConsensus, 0) + coreComponents.StatusHandler.SetUInt64Value(core.MetricCountLeader, 0) + coreComponents.StatusHandler.SetUInt64Value(core.MetricCountAcceptedBlocks, 0) dataArgs := factory.NewDataComponentsFactoryArgs(generalConfig, shardCoordinator, coreComponents, uniqueDBFolder) dataComponents, err := factory.DataComponentsFactory(dataArgs) diff --git a/consensus/chronology/chronology.go b/consensus/chronology/chronology.go index bc1425935ba..a14f5ae2c37 100644 --- a/consensus/chronology/chronology.go +++ b/consensus/chronology/chronology.go @@ -135,8 +135,6 @@ func (chr *chronology) startRound() { msg := fmt.Sprintf("SUBROUND %s BEGINS", sr.Name()) log.Info(log.Headline(msg, chr.syncTimer.FormattedCurrentTime(), ".")) - chr.appStatusHandler.SetUInt64Value(core.MetricCurrentRound, uint64(chr.rounder.Index())) - if !sr.DoWork(chr.rounder) { chr.subroundId = srBeforeStartRound return @@ -168,6 +166,7 @@ func (chr *chronology) initRound() { if hasSubroundsAndGenesisTimePassed { chr.subroundId = chr.subroundHandlers[0].Current() + chr.appStatusHandler.SetUInt64Value(core.MetricCurrentRound, uint64(chr.rounder.Index())) } chr.mutSubrounds.RUnlock() diff --git a/consensus/spos/bn/bnSubroundsFactory.go b/consensus/spos/bn/bnSubroundsFactory.go index 2c3d138d06e..f71121171ea 100644 --- a/consensus/spos/bn/bnSubroundsFactory.go +++ b/consensus/spos/bn/bnSubroundsFactory.go @@ -156,6 +156,11 @@ func (fct *factory) generateStartRoundSubround() error { return err } + err = subroundStartRound.SetAppStatusHandler(fct.appStatusHandler) + if err != nil { + return err + } + fct.consensusCore.Chronology().AddSubround(subroundStartRound) return nil diff --git a/consensus/spos/commonSubround/subroundStartRound.go b/consensus/spos/commonSubround/subroundStartRound.go index 7380268e6fc..d15832a74b1 100644 --- a/consensus/spos/commonSubround/subroundStartRound.go +++ b/consensus/spos/commonSubround/subroundStartRound.go @@ -135,6 +135,7 @@ func (sr *SubroundStartRound) initCurrentRound() bool { msg := "" if leader == sr.SelfPubKey() { + sr.appStatusHandler.Increment(core.MetricCountLeader) msg = " (my turn)" } @@ -153,6 +154,8 @@ func (sr *SubroundStartRound) initCurrentRound() bool { return false } + sr.appStatusHandler.Increment(core.MetricCountConsensus) + err = sr.MultiSigner().Reset(pubKeys, uint16(selfIndex)) if err != nil { log.Error(err.Error()) @@ -177,14 +180,9 @@ func (sr *SubroundStartRound) initCurrentRound() bool { sr.SetStatus(sr.Current(), spos.SsFinished) if leader == sr.SelfPubKey() { - sr.appStatusHandler.Increment(core.MetricCountLeader) //TODO: Should be analyzed if call of sr.broadcastUnnotarisedBlocks() is still necessary } - sr.appStatusHandler.Increment(core.MetricCountConsensus) - - //TODO rollback decrement - // execute stored messages which were received in this new round but before this initialisation go sr.executeStoredMessages() diff --git a/node/nodeTesting.go b/node/nodeTesting.go index d4795a7556d..649c688804c 100644 --- a/node/nodeTesting.go +++ b/node/nodeTesting.go @@ -14,6 +14,7 @@ import ( "github.com/ElrondNetwork/elrond-go/process" "github.com/ElrondNetwork/elrond-go/process/factory" "github.com/ElrondNetwork/elrond-go/sharding" + "github.com/ElrondNetwork/elrond-go/storage" ) // maxLoadThresholdPercent specifies the max load percent accepted from txs storage size when generates new txs @@ -25,12 +26,12 @@ const maxGoRoutinesSendMessage = 30 // GenerateAndSendBulkTransactions is a method for generating and propagating a set // of transactions to be processed. It is mainly used for demo purposes -func (n *Node) GenerateAndSendBulkTransactions(receiverHex string, value *big.Int, noOfTx uint64) error { +func (n *Node) GenerateAndSendBulkTransactions(receiverHex string, value *big.Int, noOfTxs uint64) error { if atomic.LoadInt32(&n.currentSendingGoRoutines) >= maxGoRoutinesSendMessage { return ErrSystemBusyGeneratingTransactions } - err := n.generateBulkTransactionsChecks(noOfTx) + err := n.generateBulkTransactionsChecks(noOfTxs) if err != nil { return err } @@ -42,27 +43,35 @@ func (n *Node) GenerateAndSendBulkTransactions(receiverHex string, value *big.In return ErrNilTransactionPool } - maxNoOfTx := int64(0) txStorageSize := uint64(n.txStorageSize) * maxLoadThresholdPercent / 100 + selfId := n.shardCoordinator.SelfId() + strCache := process.ShardCacherIdentifier(selfId, selfId) + txStore := txPool.ShardDataStore(strCache) + if txStore != nil { + noOfTxs = getMaxNoOfTxsToGenerate(txStore, txStorageSize, noOfTxs) + } + for i := uint32(0); i < n.shardCoordinator.NumberOfShards(); i++ { - strCache := process.ShardCacherIdentifier(n.shardCoordinator.SelfId(), i) - txStore := txPool.ShardDataStore(strCache) - if txStore == nil { + if i == selfId { continue } - txStoreLen := uint64(txStore.Len()) - if txStoreLen+noOfTx > txStorageSize { - maxNoOfTx = int64(txStorageSize) - int64(txStoreLen) - if int64(noOfTx) > maxNoOfTx { - if maxNoOfTx <= 0 { - return ErrTooManyTransactionsInPool - } + strCache = process.ShardCacherIdentifier(i, selfId) + txStore = txPool.ShardDataStore(strCache) + if txStore != nil { + noOfTxs = getMaxNoOfTxsToGenerate(txStore, txStorageSize, noOfTxs) + } - noOfTx = uint64(maxNoOfTx) - } + strCache = process.ShardCacherIdentifier(selfId, i) + txStore = txPool.ShardDataStore(strCache) + if txStore != nil { + noOfTxs = getMaxNoOfTxsToGenerate(txStore, txStorageSize, noOfTxs) } } + + if noOfTxs == 0 { + return ErrTooManyTransactionsInPool + } } newNonce, senderAddressBytes, recvAddressBytes, senderShardId, err := n.generateBulkTransactionsPrepareParams(receiverHex) @@ -71,7 +80,7 @@ func (n *Node) GenerateAndSendBulkTransactions(receiverHex string, value *big.In } wg := sync.WaitGroup{} - wg.Add(int(noOfTx)) + wg.Add(int(noOfTxs)) mutTransactions := sync.RWMutex{} transactions := make([][]byte, 0) @@ -84,7 +93,7 @@ func (n *Node) GenerateAndSendBulkTransactions(receiverHex string, value *big.In return err } - for nonce := newNonce; nonce < newNonce+noOfTx; nonce++ { + for nonce := newNonce; nonce < newNonce+noOfTxs; nonce++ { go func(crtNonce uint64) { _, signedTxBuff, err := n.generateAndSignSingleTx( crtNonce, @@ -116,8 +125,8 @@ func (n *Node) GenerateAndSendBulkTransactions(receiverHex string, value *big.In return errFound } - if len(transactions) != int(noOfTx) { - return errors.New(fmt.Sprintf("generated only %d from required %d transactions", len(transactions), noOfTx)) + if len(transactions) != int(noOfTxs) { + return errors.New(fmt.Sprintf("generated only %d from required %d transactions", len(transactions), noOfTxs)) } //the topic identifier is made of the current shard id and sender's shard id @@ -144,6 +153,25 @@ func (n *Node) GenerateAndSendBulkTransactions(receiverHex string, value *big.In return nil } +func getMaxNoOfTxsToGenerate( + txStore storage.Cacher, + txStorageSize uint64, + noOfTxs uint64, +) uint64 { + + txStoreLen := uint64(txStore.Len()) + if txStoreLen >= txStorageSize { + return 0 + } + + maxNoOfTxs := txStorageSize - txStoreLen + if maxNoOfTxs < noOfTxs { + return maxNoOfTxs + } + + return noOfTxs +} + // GenerateAndSendBulkTransactionsOneByOne is a method for generating and propagating a set // of transactions to be processed. It is mainly used for demo purposes func (n *Node) GenerateAndSendBulkTransactionsOneByOne(receiverHex string, value *big.Int, noOfTx uint64) error { diff --git a/p2p/libp2p/netMessenger.go b/p2p/libp2p/netMessenger.go index ec9ae2d44b3..1e3e7c05c6d 100644 --- a/p2p/libp2p/netMessenger.go +++ b/p2p/libp2p/netMessenger.go @@ -164,7 +164,7 @@ func createMessenger( }(pb, netMes.outgoingPLB) for _, address := range netMes.ctxProvider.Host().Addrs() { - fmt.Println(address.String() + "/p2p/" + netMes.ID().Pretty()) + log.Info(address.String() + "/p2p/" + netMes.ID().Pretty()) } return &netMes, nil diff --git a/process/block/displayBlock.go b/process/block/displayBlock.go index 5be0ca46677..5771eea6649 100644 --- a/process/block/displayBlock.go +++ b/process/block/displayBlock.go @@ -30,8 +30,8 @@ func NewTransactionCounter() *transactionCounter { } } -// getNumTxsWithDst returns the number of transactions for a certain destination shard -func (txc *transactionCounter) getNumTxsWithDst(dstShardId uint32, dataPool dataRetriever.PoolsHolder, nrShards uint32) int { +// getNumTxsFromPool returns the number of transactions from pool for a given shard +func (txc *transactionCounter) getNumTxsFromPool(shardId uint32, dataPool dataRetriever.PoolsHolder, nrShards uint32) int { txPool := dataPool.Transactions() if txPool == nil { return 0 @@ -39,13 +39,28 @@ func (txc *transactionCounter) getNumTxsWithDst(dstShardId uint32, dataPool data sumTxs := 0 + strCache := process.ShardCacherIdentifier(shardId, shardId) + txStore := txPool.ShardDataStore(strCache) + if txStore != nil { + sumTxs += txStore.Len() + } + for i := uint32(0); i < nrShards; i++ { - strCache := process.ShardCacherIdentifier(i, dstShardId) - txStore := txPool.ShardDataStore(strCache) - if txStore == nil { + if i == shardId { continue } - sumTxs += txStore.Len() + + strCache = process.ShardCacherIdentifier(i, shardId) + txStore = txPool.ShardDataStore(strCache) + if txStore != nil { + sumTxs += txStore.Len() + } + + strCache = process.ShardCacherIdentifier(shardId, i) + txStore = txPool.ShardDataStore(strCache) + if txStore != nil { + sumTxs += txStore.Len() + } } return sumTxs @@ -82,7 +97,7 @@ func (txc *transactionCounter) displayLogInfo( core.ToB64(headerHash), txc.totalTxs, txc.currentBlockTxs, - txc.getNumTxsWithDst(selfId, dataPool, numShards), + txc.getNumTxsFromPool(selfId, dataPool, numShards), numShards, selfId) txc.mutex.RUnlock() diff --git a/process/block/shardblock.go b/process/block/shardblock.go index 332594c6851..a2e39461a0b 100644 --- a/process/block/shardblock.go +++ b/process/block/shardblock.go @@ -187,7 +187,7 @@ func (sp *shardProcessor) ProcessBlock( return err } - numTxWithDst := sp.txCounter.getNumTxsWithDst(header.ShardId, sp.dataPool, sp.shardCoordinator.NumberOfShards()) + numTxWithDst := sp.txCounter.getNumTxsFromPool(header.ShardId, sp.dataPool, sp.shardCoordinator.NumberOfShards()) sp.appStatusHandler.SetUInt64Value(core.MetricTxPoolLoad, uint64(numTxWithDst)) diff --git a/statusHandler/termuiStatusHandler.go b/statusHandler/termuiStatusHandler.go index 49bb3b4c848..419f71e2a11 100644 --- a/statusHandler/termuiStatusHandler.go +++ b/statusHandler/termuiStatusHandler.go @@ -72,7 +72,6 @@ func (tsh *TermuiStatusHandler) Increment(key string) { keyValue++ tsh.termuiConsoleMetrics.Store(key, keyValue) - } // Decrement - will decrement the value of a key