Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized peer authentication messages management #4420

Merged
merged 13 commits into from
Sep 1, 2022
Merged
36 changes: 17 additions & 19 deletions dataRetriever/factory/resolverscontainer/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,28 @@ import (
"github.com/ElrondNetwork/elrond-go/config"
"github.com/ElrondNetwork/elrond-go/dataRetriever"
"github.com/ElrondNetwork/elrond-go/p2p"
"github.com/ElrondNetwork/elrond-go/process"
"github.com/ElrondNetwork/elrond-go/sharding"
)

// FactoryArgs will hold the arguments for ResolversContainerFactory for both shard and meta
type FactoryArgs struct {
ResolverConfig config.ResolverConfig
NumConcurrentResolvingJobs int32
ShardCoordinator sharding.Coordinator
Messenger dataRetriever.TopicMessageHandler
Store dataRetriever.StorageService
Marshalizer marshal.Marshalizer
DataPools dataRetriever.PoolsHolder
Uint64ByteSliceConverter typeConverters.Uint64ByteSliceConverter
DataPacker dataRetriever.DataPacker
TriesContainer common.TriesHolder
InputAntifloodHandler dataRetriever.P2PAntifloodHandler
OutputAntifloodHandler dataRetriever.P2PAntifloodHandler
CurrentNetworkEpochProvider dataRetriever.CurrentNetworkEpochProviderHandler
PreferredPeersHolder p2p.PreferredPeersHolderHandler
PeersRatingHandler dataRetriever.PeersRatingHandler
SizeCheckDelta uint32
IsFullHistoryNode bool
ResolverConfig config.ResolverConfig
NumConcurrentResolvingJobs int32
ShardCoordinator sharding.Coordinator
Messenger dataRetriever.TopicMessageHandler
Store dataRetriever.StorageService
Marshalizer marshal.Marshalizer
DataPools dataRetriever.PoolsHolder
Uint64ByteSliceConverter typeConverters.Uint64ByteSliceConverter
DataPacker dataRetriever.DataPacker
TriesContainer common.TriesHolder
InputAntifloodHandler dataRetriever.P2PAntifloodHandler
OutputAntifloodHandler dataRetriever.P2PAntifloodHandler
CurrentNetworkEpochProvider dataRetriever.CurrentNetworkEpochProviderHandler
PreferredPeersHolder p2p.PreferredPeersHolderHandler
PeersRatingHandler dataRetriever.PeersRatingHandler
SizeCheckDelta uint32
IsFullHistoryNode bool
NodesCoordinator dataRetriever.NodesCoordinator
MaxNumOfPeerAuthenticationInResponse int
PeerShardMapper process.PeerShardMapper
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/ElrondNetwork/elrond-go/dataRetriever"
"github.com/ElrondNetwork/elrond-go/dataRetriever/resolvers"
"github.com/ElrondNetwork/elrond-go/dataRetriever/resolvers/topicResolverSender"
"github.com/ElrondNetwork/elrond-go/process"
"github.com/ElrondNetwork/elrond-go/process/factory"
"github.com/ElrondNetwork/elrond-go/sharding"
)
Expand Down Expand Up @@ -49,7 +48,6 @@ type baseResolversContainerFactory struct {
numFullHistoryPeers int
nodesCoordinator dataRetriever.NodesCoordinator
maxNumOfPeerAuthenticationInResponse int
peerShardMapper process.PeerShardMapper
}

func (brcf *baseResolversContainerFactory) checkParams() error {
Expand Down Expand Up @@ -111,9 +109,6 @@ func (brcf *baseResolversContainerFactory) checkParams() error {
return fmt.Errorf("%w for maxNumOfPeerAuthenticationInResponse, expected %d, received %d",
dataRetriever.ErrInvalidValue, minNumOfPeerAuthentication, brcf.maxNumOfPeerAuthenticationInResponse)
}
if check.IfNil(brcf.peerShardMapper) {
return dataRetriever.ErrNilPeerShardMapper
}

return nil
}
Expand Down Expand Up @@ -298,7 +293,6 @@ func (brcf *baseResolversContainerFactory) generatePeerAuthenticationResolver()
PeerAuthenticationPool: brcf.dataPools.PeerAuthentications(),
NodesCoordinator: brcf.nodesCoordinator,
MaxNumOfPeerAuthenticationInResponse: brcf.maxNumOfPeerAuthenticationInResponse,
PeerShardMapper: brcf.peerShardMapper,
DataPacker: brcf.dataPacker,
}
peerAuthResolver, err := resolvers.NewPeerAuthenticationResolver(arg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func NewMetaResolversContainerFactory(
numFullHistoryPeers: int(args.ResolverConfig.NumFullHistoryPeers),
nodesCoordinator: args.NodesCoordinator,
maxNumOfPeerAuthenticationInResponse: args.MaxNumOfPeerAuthenticationInResponse,
peerShardMapper: args.PeerShardMapper,
}

err = base.checkParams()
Expand Down Expand Up @@ -164,7 +163,7 @@ func (mrcf *metaResolversContainerFactory) AddShardTrieNodeResolvers(container d
return container.AddMultiple(keys, resolversSlice)
}

//------- Shard header resolvers
// ------- Shard header resolvers

func (mrcf *metaResolversContainerFactory) generateShardHeaderResolvers() error {
shardC := mrcf.shardCoordinator
Expand Down Expand Up @@ -203,7 +202,7 @@ func (mrcf *metaResolversContainerFactory) createShardHeaderResolver(
return nil, err
}

//TODO change this data unit creation method through a factory or func
// TODO change this data unit creation method through a factory or func
hdrNonceHashDataUnit := dataRetriever.ShardHdrNonceHashDataUnit + dataRetriever.UnitType(shardID)
hdrNonceStore := mrcf.store.GetStorer(hdrNonceHashDataUnit)
arg := resolvers.ArgHeaderResolver{
Expand Down Expand Up @@ -233,7 +232,7 @@ func (mrcf *metaResolversContainerFactory) createShardHeaderResolver(
return resolver, nil
}

//------- Meta header resolvers
// ------- Meta header resolvers

func (mrcf *metaResolversContainerFactory) generateMetaChainHeaderResolvers() error {
identifierHeader := factory.MetachainBlocksTopic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func createTriesHolderForMeta() common.TriesHolder {
return triesHolder
}

//------- NewResolversContainerFactory
// ------- NewResolversContainerFactory

func TestNewMetaResolversContainerFactory_NilShardCoordinatorShouldErr(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -290,17 +290,6 @@ func TestNewMetaResolversContainerFactory_InvalidNumFullHistoryPeersShouldErr(t
assert.True(t, errors.Is(err, dataRetriever.ErrInvalidValue))
}

func TestNewMetaResolversContainerFactory_NilPeerShardMapperShouldErr(t *testing.T) {
t.Parallel()

args := getArgumentsMeta()
args.PeerShardMapper = nil
rcf, err := resolverscontainer.NewMetaResolversContainerFactory(args)

assert.Nil(t, rcf)
assert.Equal(t, dataRetriever.ErrNilPeerShardMapper, err)
}

func TestNewMetaResolversContainerFactory_NilNodesCoordinatorShouldErr(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -336,7 +325,7 @@ func TestNewMetaResolversContainerFactory_ShouldWork(t *testing.T) {
assert.Equal(t, int(args.ResolverConfig.NumFullHistoryPeers), rcf.NumFullHistoryPeers())
}

//------- Create
// ------- Create

func TestMetaResolversContainerFactory_CreateRegisterShardHeadersForMetachainFailsShouldErr(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -418,6 +407,5 @@ func getArgumentsMeta() resolverscontainer.FactoryArgs {
PeersRatingHandler: &p2pmocks.PeersRatingHandlerStub{},
NodesCoordinator: &shardingMocks.NodesCoordinatorStub{},
MaxNumOfPeerAuthenticationInResponse: 5,
PeerShardMapper: &p2pmocks.NetworkShardingCollectorStub{},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ func NewShardResolversContainerFactory(
numFullHistoryPeers: int(args.ResolverConfig.NumFullHistoryPeers),
nodesCoordinator: args.NodesCoordinator,
maxNumOfPeerAuthenticationInResponse: args.MaxNumOfPeerAuthenticationInResponse,
peerShardMapper: args.PeerShardMapper,
}

err = base.checkParams()
Expand Down Expand Up @@ -131,7 +130,7 @@ func (srcf *shardResolversContainerFactory) Create() (dataRetriever.ResolversCon
return srcf.container, nil
}

//------- Hdr resolver
// ------- Hdr resolver

func (srcf *shardResolversContainerFactory) generateHeaderResolvers() error {
shardC := srcf.shardCoordinator
Expand Down Expand Up @@ -174,7 +173,7 @@ func (srcf *shardResolversContainerFactory) generateHeaderResolvers() error {
return srcf.container.Add(identifierHdr, resolver)
}

//------- MetaBlockHeaderResolvers
// ------- MetaBlockHeaderResolvers

func (srcf *shardResolversContainerFactory) generateMetablockHeaderResolvers() error {
// only one metachain header block topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func createTriesHolderForShard() common.TriesHolder {
return triesHolder
}

//------- NewResolversContainerFactory
// ------- NewResolversContainerFactory

func TestNewShardResolversContainerFactory_NilShardCoordinatorShouldErr(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -322,17 +322,6 @@ func TestNewShardResolversContainerFactory_NilCurrentNetworkEpochProviderShouldE
assert.Equal(t, dataRetriever.ErrNilCurrentNetworkEpochProvider, err)
}

func TestNewShardResolversContainerFactory_NilPeerShardMapperShouldErr(t *testing.T) {
t.Parallel()

args := getArgumentsShard()
args.PeerShardMapper = nil
rcf, err := resolverscontainer.NewShardResolversContainerFactory(args)

assert.Nil(t, rcf)
assert.Equal(t, dataRetriever.ErrNilPeerShardMapper, err)
}

func TestNewShardResolversContainerFactory_ShouldWork(t *testing.T) {
t.Parallel()

Expand All @@ -347,7 +336,7 @@ func TestNewShardResolversContainerFactory_ShouldWork(t *testing.T) {
assert.Equal(t, int(args.ResolverConfig.NumFullHistoryPeers), rcf.NumFullHistoryPeers())
}

//------- Create
// ------- Create

func TestShardResolversContainerFactory_CreateRegisterTxFailsShouldErr(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -478,7 +467,6 @@ func getArgumentsShard() resolverscontainer.FactoryArgs {
},
NodesCoordinator: &shardingMocks.NodesCoordinatorStub{},
MaxNumOfPeerAuthenticationInResponse: 5,
PeerShardMapper: &p2pmocks.NetworkShardingCollectorStub{},
PeersRatingHandler: &p2pmocks.PeersRatingHandlerStub{},
}
}
16 changes: 2 additions & 14 deletions dataRetriever/resolvers/peerAuthenticationResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
logger "github.com/ElrondNetwork/elrond-go-logger"
"github.com/ElrondNetwork/elrond-go/dataRetriever"
"github.com/ElrondNetwork/elrond-go/p2p"
"github.com/ElrondNetwork/elrond-go/process"
"github.com/ElrondNetwork/elrond-go/storage"
)

Expand All @@ -27,7 +26,6 @@ type ArgPeerAuthenticationResolver struct {
ArgBaseResolver
PeerAuthenticationPool storage.Cacher
NodesCoordinator dataRetriever.NodesCoordinator
PeerShardMapper process.PeerShardMapper
DataPacker dataRetriever.DataPacker
MaxNumOfPeerAuthenticationInResponse int
}
Expand All @@ -38,7 +36,6 @@ type peerAuthenticationResolver struct {
messageProcessor
peerAuthenticationPool storage.Cacher
nodesCoordinator dataRetriever.NodesCoordinator
peerShardMapper process.PeerShardMapper
dataPacker dataRetriever.DataPacker
maxNumOfPeerAuthenticationInResponse int
}
Expand All @@ -62,7 +59,6 @@ func NewPeerAuthenticationResolver(arg ArgPeerAuthenticationResolver) (*peerAuth
},
peerAuthenticationPool: arg.PeerAuthenticationPool,
nodesCoordinator: arg.NodesCoordinator,
peerShardMapper: arg.PeerShardMapper,
dataPacker: arg.DataPacker,
maxNumOfPeerAuthenticationInResponse: arg.MaxNumOfPeerAuthenticationInResponse,
}, nil
Expand All @@ -79,9 +75,6 @@ func checkArgPeerAuthenticationResolver(arg ArgPeerAuthenticationResolver) error
if check.IfNil(arg.NodesCoordinator) {
return dataRetriever.ErrNilNodesCoordinator
}
if check.IfNil(arg.PeerShardMapper) {
return dataRetriever.ErrNilPeerShardMapper
}
if check.IfNil(arg.DataPacker) {
return dataRetriever.ErrNilDataPacker
}
Expand Down Expand Up @@ -247,7 +240,7 @@ func (res *peerAuthenticationResolver) resolveMultipleHashesRequest(hashesBuff [

peerAuthsForHashes, err := res.fetchPeerAuthenticationSlicesForPublicKeys(hashes)
if err != nil {
return fmt.Errorf("resolveMultipleHashesRequest error %w from buff %s", err, hashesBuff)
return fmt.Errorf("resolveMultipleHashesRequest error %w from buff %x", err, hashesBuff)
}

return res.sendPeerAuthsForHashes(peerAuthsForHashes, pid)
Expand Down Expand Up @@ -298,12 +291,7 @@ func (res *peerAuthenticationResolver) fetchPeerAuthenticationSlicesForPublicKey

// fetchPeerAuthenticationAsByteSlice returns the value from authentication pool if exists
func (res *peerAuthenticationResolver) fetchPeerAuthenticationAsByteSlice(pk []byte) ([]byte, error) {
pid, ok := res.peerShardMapper.GetLastKnownPeerID(pk)
if !ok {
return nil, dataRetriever.ErrPeerAuthNotFound
}

value, ok := res.peerAuthenticationPool.Peek(pid.Bytes())
value, ok := res.peerAuthenticationPool.Peek(pk)
if ok {
return res.marshalizer.Marshal(value)
}
Expand Down
29 changes: 2 additions & 27 deletions dataRetriever/resolvers/peerAuthenticationResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/ElrondNetwork/elrond-go/dataRetriever/mock"
"github.com/ElrondNetwork/elrond-go/dataRetriever/resolvers"
"github.com/ElrondNetwork/elrond-go/p2p"
processMock "github.com/ElrondNetwork/elrond-go/process/mock"
"github.com/ElrondNetwork/elrond-go/testscommon"
"github.com/ElrondNetwork/elrond-go/testscommon/shardingMocks"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -51,12 +50,7 @@ func createMockArgPeerAuthenticationResolver() resolvers.ArgPeerAuthenticationRe
},
},
MaxNumOfPeerAuthenticationInResponse: 5,
PeerShardMapper: &processMock.PeerShardMapperStub{
GetLastKnownPeerIDCalled: func(pk []byte) (core.PeerID, bool) {
return "pid", true
},
},
DataPacker: &mock.DataPackerStub{},
DataPacker: &mock.DataPackerStub{},
}
}

Expand Down Expand Up @@ -149,15 +143,6 @@ func TestNewPeerAuthenticationResolver(t *testing.T) {
assert.Equal(t, dataRetriever.ErrInvalidNumOfPeerAuthentication, err)
assert.Nil(t, res)
})
t.Run("nil PeerShardMapper should error", func(t *testing.T) {
t.Parallel()

arg := createMockArgPeerAuthenticationResolver()
arg.PeerShardMapper = nil
res, err := resolvers.NewPeerAuthenticationResolver(arg)
assert.Equal(t, dataRetriever.ErrNilPeerShardMapper, err)
assert.Nil(t, res)
})
t.Run("should work", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -436,7 +421,7 @@ func TestPeerAuthenticationResolver_ProcessReceivedMessage(t *testing.T) {
providedHashes, err := arg.Marshaller.Marshal(batch.Batch{Data: hashes})
assert.Nil(t, err)
err = res.ProcessReceivedMessage(createRequestMsg(dataRetriever.HashArrayType, providedHashes), fromConnectedPeer)
expectedSubstrErr := fmt.Sprintf("%s %s", "from buff", providedHashes)
expectedSubstrErr := fmt.Sprintf("%s %x", "from buff", providedHashes)
assert.True(t, strings.Contains(fmt.Sprintf("%s", err), expectedSubstrErr))
assert.False(t, wasSent)
})
Expand Down Expand Up @@ -481,11 +466,6 @@ func TestPeerAuthenticationResolver_ProcessReceivedMessage(t *testing.T) {
return nil
},
}
arg.PeerShardMapper = &processMock.PeerShardMapperStub{
GetLastKnownPeerIDCalled: func(pk []byte) (core.PeerID, bool) {
return core.PeerID(pk), true
},
}
arg.DataPacker, _ = partitioning.NewSizeDataPacker(arg.Marshaller)

res, err := resolvers.NewPeerAuthenticationResolver(arg)
Expand Down Expand Up @@ -554,11 +534,6 @@ func TestPeerAuthenticationResolver_ProcessReceivedMessage(t *testing.T) {
return nil
},
}
arg.PeerShardMapper = &processMock.PeerShardMapperStub{
GetLastKnownPeerIDCalled: func(pk []byte) (core.PeerID, bool) {
return core.PeerID(pk), true
},
}
// split data into 2 packs
arg.DataPacker = &mock.DataPackerStub{
PackDataInChunksCalled: func(data [][]byte, limit int) ([][]byte, error) {
Expand Down
Loading