Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix missing storage resolver for validator info #4566

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,28 @@ func (brcf *baseResolversContainerFactory) generatePeerAuthenticationResolver()

return brcf.container.Add(identifierPeerAuth, peerAuthResolver)
}

func (brcf *baseResolversContainerFactory) generateValidatorInfoResolver() error {
validatorInfoStorer, err := brcf.store.GetStorer(dataRetriever.UnsignedTransactionUnit)
if err != nil {
return err
}

identifierValidatorInfo := common.ValidatorInfoTopic
arg := storageResolvers.ArgSliceResolver{
Messenger: brcf.messenger,
ResponseTopicName: identifierValidatorInfo,
Storage: validatorInfoStorer,
DataPacker: brcf.dataPacker,
Marshalizer: brcf.marshalizer,
ManualEpochStartNotifier: brcf.manualEpochStartNotifier,
ChanGracefullyClose: brcf.chanGracefullyClose,
DelayBeforeGracefulClose: defaultBeforeGracefulClose,
}
validatorInfoResolver, err := storageResolvers.NewSliceResolver(arg)
if err != nil {
return err
}

return brcf.container.Add(identifierValidatorInfo, validatorInfoResolver)
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func (mrcf *metaResolversContainerFactory) Create() (dataRetriever.ResolversCont
return nil, err
}

err = mrcf.generateValidatorInfoResolver()
if err != nil {
return nil, err
}

return mrcf.container, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,10 @@ func TestMetaResolversContainerFactory_With4ShardsShouldWork(t *testing.T) {
numResolversTxs := noOfShards + 1
numResolversTrieNodes := 2
numPeerAuthentication := 1
numValidatorInfo := 1
totalResolvers := numResolversShardHeadersForMetachain + numResolverMetablocks + numResolversMiniBlocks +
numResolversUnsigned + numResolversTxs + numResolversTrieNodes + numResolversRewards + numPeerAuthentication
numResolversUnsigned + numResolversTxs + numResolversTrieNodes + numResolversRewards + numPeerAuthentication +
numValidatorInfo

assert.Equal(t, totalResolvers, container.Len())
assert.Equal(t, totalResolvers, container.Len())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func (srcf *shardResolversContainerFactory) Create() (dataRetriever.ResolversCon
return nil, err
}

err = srcf.generateValidatorInfoResolver()
if err != nil {
return nil, err
}

return srcf.container, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,10 @@ func TestShardResolversContainerFactory_With4ShardsShouldWork(t *testing.T) {
numResolverMetaBlockHeaders := 1
numResolverTrieNodes := 1
numPeerAuthentication := 1
numValidatorInfo := 1
totalResolvers := numResolverTxs + numResolverHeaders + numResolverMiniBlocks +
numResolverMetaBlockHeaders + numResolverSCRs + numResolverRewardTxs + numResolverTrieNodes + numPeerAuthentication
numResolverMetaBlockHeaders + numResolverSCRs + numResolverRewardTxs + numResolverTrieNodes +
numPeerAuthentication + numValidatorInfo

assert.Equal(t, totalResolvers, container.Len())
}
Expand Down
54 changes: 0 additions & 54 deletions dataRetriever/resolvers/disabled/validatorInfoResolver.go

This file was deleted.

47 changes: 0 additions & 47 deletions dataRetriever/resolvers/disabled/validatorInfoResolver_test.go

This file was deleted.

8 changes: 5 additions & 3 deletions dataRetriever/storageResolvers/sliceResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,10 @@ func (sliceRes *sliceResolver) RequestDataFromHashArray(hashes [][]byte, _ uint3
mb, errTemp := sliceRes.storage.Get(hash)
if errTemp != nil {
errFetch = fmt.Errorf("%w for hash %s", errTemp, logger.DisplayByteSlice(hash))
log.Trace("fetchMbAsByteSlice missing",
log.Trace("fetchByteSlice missing",
"error", errFetch.Error(),
"hash", hash)
"hash", hash,
"topic", sliceRes.responseTopicName)
errorsFound++

continue
Expand All @@ -121,7 +122,8 @@ func (sliceRes *sliceResolver) RequestDataFromHashArray(hashes [][]byte, _ uint3
}

if errFetch != nil {
errFetch = fmt.Errorf("resolveMbRequestByHashArray last error %w from %d encountered errors", errFetch, errorsFound)
errFetch = fmt.Errorf("resolveRequestByHashArray on topic %s, last error %w from %d encountered errors",
sliceRes.responseTopicName, errFetch, errorsFound)
sliceRes.signalGracefullyClose()
}

Expand Down
4 changes: 2 additions & 2 deletions epochStart/shardchain/peerMiniBlocksSyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,13 @@ func (p *peerMiniBlockSyncer) setMissingValidatorsInfo(miniBlock *block.MiniBloc
for _, txHash := range miniBlock.TxHashes {
p.mapAllValidatorsInfo[string(txHash)] = nil

validatorInfoObjectFound, ok := p.validatorsInfoPool.SearchFirstData(txHash)
val, ok := p.validatorsInfoPool.SearchFirstData(txHash)
if !ok {
numMissingValidatorsInfo++
continue
}

validatorInfo, ok := validatorInfoObjectFound.(*state.ShardValidatorInfo)
validatorInfo, ok := val.(*state.ShardValidatorInfo)
if !ok {
numMissingValidatorsInfo++
continue
Expand Down
9 changes: 5 additions & 4 deletions process/block/preprocess/basePreProcess.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,21 +250,22 @@ func (bpp *basePreProcess) saveTransactionToStorage(
forBlock.mutTxsForBlock.RUnlock()

if txInfoFromMap == nil || txInfoFromMap.tx == nil {
log.Warn("basePreProcess.saveTransactionToStorage", "type", dataUnit, "txHash", txHash, "error", process.ErrMissingTransaction.Error())
log.Warn("basePreProcess.saveTransactionToStorage", "txHash", txHash, "dataUnit", dataUnit, "error", process.ErrMissingTransaction)
return
}

buff, err := bpp.marshalizer.Marshal(txInfoFromMap.tx)
if err != nil {
log.Warn("basePreProcess.saveTransactionToStorage", "txHash", txHash, "error", err.Error())
log.Warn("basePreProcess.saveTransactionToStorage: Marshal", "txHash", txHash, "error", err)
return
}

errNotCritical := store.Put(dataUnit, txHash, buff)
if errNotCritical != nil {
log.Debug("store.Put",
"error", errNotCritical.Error(),
log.Debug("basePreProcess.saveTransactionToStorage: Put",
"txHash", txHash,
"dataUnit", dataUnit,
"error", errNotCritical,
)
}
}
Expand Down
48 changes: 46 additions & 2 deletions process/block/preprocess/validatorInfoPreProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,55 @@ func (vip *validatorInfoPreprocessor) ProcessBlockTransactions(
return nil
}

// SaveTxsToStorage does nothing
func (vip *validatorInfoPreprocessor) SaveTxsToStorage(_ *block.Body) error {
// SaveTxsToStorage saves validator info from body into storage
func (vip *validatorInfoPreprocessor) SaveTxsToStorage(body *block.Body) error {
if check.IfNil(body) {
return process.ErrNilBlockBody
}

for i := 0; i < len(body.MiniBlocks); i++ {
miniBlock := body.MiniBlocks[i]
if miniBlock.Type != block.PeerBlock {
continue
}

vip.saveValidatorInfoToStorage(miniBlock)
}

return nil
}

func (vip *validatorInfoPreprocessor) saveValidatorInfoToStorage(miniBlock *block.MiniBlock) {
for _, txHash := range miniBlock.TxHashes {
val, ok := vip.validatorsInfoPool.SearchFirstData(txHash)
if !ok {
log.Debug("validatorInfoPreprocessor.saveValidatorInfoToStorage: SearchFirstData: tx not found in validator info pool", "txHash", txHash)
continue
}

validatorInfo, ok := val.(*state.ShardValidatorInfo)
if !ok {
log.Warn("validatorInfoPreprocessor.saveValidatorInfoToStorage: wrong type assertion", "txHash", txHash)
continue
}

buff, err := vip.marshalizer.Marshal(validatorInfo)
if err != nil {
log.Warn("validatorInfoPreprocessor.saveValidatorInfoToStorage: Marshal", "txHash", txHash, "error", err)
continue
}

err = vip.storage.Put(dataRetriever.UnsignedTransactionUnit, txHash, buff)
if err != nil {
log.Debug("validatorInfoPreprocessor.saveValidatorInfoToStorage: Put",
"txHash", txHash,
"dataUnit", dataRetriever.UnsignedTransactionUnit,
"error", err,
)
}
}
}

// CreateBlockStarted cleans the local cache map for processed/created validators info at this round
func (vip *validatorInfoPreprocessor) CreateBlockStarted() {
_ = core.EmptyChannel(vip.chReceivedAllValidatorsInfo)
Expand Down
74 changes: 74 additions & 0 deletions process/block/preprocess/validatorInfoPreProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/ElrondNetwork/elrond-go-core/core"
"github.com/ElrondNetwork/elrond-go-core/data/block"
"github.com/ElrondNetwork/elrond-go-core/data/rewardTx"
"github.com/ElrondNetwork/elrond-go/dataRetriever"
"github.com/ElrondNetwork/elrond-go/process"
"github.com/ElrondNetwork/elrond-go/state"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/ElrondNetwork/elrond-go/testscommon/hashingMocks"
"github.com/ElrondNetwork/elrond-go/testscommon/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewValidatorInfoPreprocessor_NilHasherShouldErr(t *testing.T) {
Expand Down Expand Up @@ -495,3 +497,75 @@ func TestNewValidatorInfoPreprocessor_RestoreValidatorsInfo(t *testing.T) {
assert.True(t, wasCalledWithExpectedKey)
})
}

func TestValidatorInfoPreprocessor_SaveTxsToStorageShouldWork(t *testing.T) {
t.Parallel()

txHash1 := []byte("txHash1")
txHash2 := []byte("txHash2")
txHash3 := []byte("txHash3")
txHash4 := []byte("txHash4")

tdp := initDataPool()

tdp.ValidatorsInfoCalled = func() dataRetriever.ShardedDataCacherNotifier {
return &testscommon.ShardedDataStub{
SearchFirstDataCalled: func(key []byte) (value interface{}, ok bool) {
if bytes.Equal(key, txHash1) {
return nil, false
}
if bytes.Equal(key, txHash2) {
return &state.ValidatorInfo{}, true
}
if bytes.Equal(key, txHash3) {
return &state.ShardValidatorInfo{}, true
}
if bytes.Equal(key, txHash4) {
return &rewardTx.RewardTx{}, true
}
return nil, false
},
}
}

putHashes := make([][]byte, 0)
storer := &storage.ChainStorerStub{
PutCalled: func(unitType dataRetriever.UnitType, key []byte, value []byte) error {
putHashes = append(putHashes, key)
return nil
},
}

vip, _ := NewValidatorInfoPreprocessor(
&hashingMocks.HasherMock{},
&testscommon.MarshalizerMock{},
&testscommon.BlockSizeComputationStub{},
tdp.ValidatorsInfo(),
storer,
&testscommon.EnableEpochsHandlerStub{},
)

err := vip.SaveTxsToStorage(nil)
assert.Equal(t, process.ErrNilBlockBody, err)

peersHashes := [][]byte{txHash1, txHash2, txHash3}
rewardsHashes := [][]byte{txHash4}

mb1 := block.MiniBlock{
TxHashes: rewardsHashes,
Type: block.RewardsBlock,
}
mb2 := block.MiniBlock{
TxHashes: peersHashes,
Type: block.PeerBlock,
}

blockBody := &block.Body{}
blockBody.MiniBlocks = append(blockBody.MiniBlocks, &mb1, &mb2)

err = vip.SaveTxsToStorage(blockBody)

assert.Nil(t, err)
require.Equal(t, 1, len(putHashes))
assert.Equal(t, txHash3, putHashes[0])
}