Skip to content

Commit

Permalink
fixes after review
Browse files Browse the repository at this point in the history
  • Loading branch information
sstanculeanu committed Feb 7, 2022
1 parent 8e75c02 commit 6a36170
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 80 deletions.
67 changes: 18 additions & 49 deletions dataRetriever/resolvers/peerAuthenticationResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,28 +167,12 @@ func (res *peerAuthenticationResolver) resolveChunkRequest(chunkIndex int, epoch
return err
}

var lastErr error
errorsFound := 0
dataSlice := make([][]byte, 0, res.maxNumOfPeerAuthenticationInResponse)
for _, pk := range pksChunk {
peerAuth, tmpErr := res.fetchPeerAuthenticationAsByteSlice(pk)
if tmpErr != nil {
lastErr = fmt.Errorf("%w for public key %s", tmpErr, logger.DisplayByteSlice(pk))
errorsFound++
continue
}
dataSlice = append(dataSlice, peerAuth)
}

err = res.sendData(dataSlice, nil, chunkIndex, maxChunks, pid)
dataSlice, err := res.fetchPeerAuthenticationSlicesForPublicKeys(pksChunk)
if err != nil {
return err
return fmt.Errorf("resolveChunkRequest error %w from chunk %d", err, chunkIndex)
}

if lastErr != nil {
lastErr = fmt.Errorf("resolveChunkRequest last error %w from %d encountered errors", lastErr, errorsFound)
}
return lastErr
return res.sendData(dataSlice, nil, chunkIndex, maxChunks, pid)
}

// getSortedValidatorsKeys returns the sorted slice of validators keys from all shards
Expand Down Expand Up @@ -234,29 +218,12 @@ func (res *peerAuthenticationResolver) resolveMultipleHashesRequest(hashesBuff [
}
hashes := b.Data

var lastErr error
errorsFound := 0
peerAuthsForHashes := make([][]byte, 0)
for _, hash := range hashes {
peerAuthSlicesForHash := res.fetchPeerAuthenticationSlicesForHash(hash)
if peerAuthSlicesForHash == nil {
lastErr = fmt.Errorf("could not find any peerAuthentication for hash %s", logger.DisplayByteSlice(hash))
errorsFound++
continue
}

peerAuthsForHashes = append(peerAuthsForHashes, peerAuthSlicesForHash...)
}

err = res.sendPeerAuthsForHashes(peerAuthsForHashes, hashesBuff, pid)
peerAuthsForHashes, err := res.fetchPeerAuthenticationSlicesForPublicKeys(hashes)
if err != nil {
return err
return fmt.Errorf("resolveMultipleHashesRequest error %w from buff %s", err, hashesBuff)
}

if lastErr != nil {
lastErr = fmt.Errorf("resolveMultipleHashes last error %w from %d encountered errors", lastErr, errorsFound)
}
return lastErr
return res.sendPeerAuthsForHashes(peerAuthsForHashes, hashesBuff, pid)
}

// sendPeerAuthsForHashes sends multiple peer authentication messages for specific hashes
Expand Down Expand Up @@ -309,19 +276,21 @@ func (res *peerAuthenticationResolver) sendData(dataSlice [][]byte, reference []
return res.Send(buffToSend, pid)
}

// fetchPeerAuthenticationSlicesForHash fetches all peer authentications for the matching pks to hash
func (res *peerAuthenticationResolver) fetchPeerAuthenticationSlicesForHash(hash []byte) [][]byte {
var messages [][]byte

keys := res.peerAuthenticationPool.Keys()
for _, key := range keys {
if bytes.Equal(hash, key[:len(hash)]) {
peerAuth, _ := res.fetchPeerAuthenticationAsByteSlice(key)
messages = append(messages, peerAuth)
// fetchPeerAuthenticationSlicesForPublicKeys fetches all peer authentications for all pks
func (res *peerAuthenticationResolver) fetchPeerAuthenticationSlicesForPublicKeys(pks [][]byte) ([][]byte, error) {
peerAuths := make([][]byte, 0)
for _, pk := range pks {
peerAuthForHash, _ := res.fetchPeerAuthenticationAsByteSlice(pk)
if peerAuthForHash != nil {
peerAuths = append(peerAuths, peerAuthForHash)
}
}

return messages
if len(peerAuths) == 0 {
return nil, dataRetriever.ErrNotFound
}

return peerAuths, nil
}

// fetchPeerAuthenticationAsByteSlice returns the value from authentication pool if exists
Expand Down
53 changes: 22 additions & 31 deletions dataRetriever/resolvers/peerAuthenticationResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,6 @@ func Test_peerAuthenticationResolver_ProcessReceivedMessage(t *testing.T) {
wasSent := false
arg.SenderResolver = &mock.TopicResolverSenderStub{
SendCalled: func(buff []byte, peer core.PeerID) error {
b := &batch.Batch{}
err := arg.Marshalizer.Unmarshal(b, buff)
assert.Nil(t, err)
assert.Equal(t, 0, len(b.Data))
wasSent = true
return nil
},
Expand All @@ -280,19 +276,19 @@ func Test_peerAuthenticationResolver_ProcessReceivedMessage(t *testing.T) {
chunkIndex := uint32(0)
err = res.ProcessReceivedMessage(createRequestMsgWithChunkIndex(dataRetriever.ChunkType, []byte(""), epoch, chunkIndex), fromConnectedPeer)
assert.True(t, errors.Is(err, dataRetriever.ErrNotFound))
expectedSubstrErr := fmt.Sprintf("%s %d %s", "from", arg.MaxNumOfPeerAuthenticationInResponse, "encountered errors")
expectedSubstrErr := fmt.Sprintf("%s %d", "from chunk", chunkIndex)
assert.True(t, strings.Contains(fmt.Sprintf("%s", err), expectedSubstrErr))
assert.True(t, wasSent)
assert.False(t, wasSent)
})
t.Run("resolveChunkRequest: some data not found in cache should error", func(t *testing.T) {
t.Run("resolveChunkRequest: some data not found in cache should work", func(t *testing.T) {
t.Parallel()

expectedNumOfErrors := 3
expectedNumOfMissing := 3
cache := testscommon.NewCacherStub()
errorsCount := 0
missingCount := 0
cache.PeekCalled = func(key []byte) (value interface{}, ok bool) {
if errorsCount < expectedNumOfErrors {
errorsCount++
if missingCount < expectedNumOfMissing {
missingCount++
return nil, false
}
return key, true
Expand All @@ -303,6 +299,11 @@ func Test_peerAuthenticationResolver_ProcessReceivedMessage(t *testing.T) {
messagesSent := 0
arg.SenderResolver = &mock.TopicResolverSenderStub{
SendCalled: func(buff []byte, peer core.PeerID) error {
b := &batch.Batch{}
err := arg.Marshalizer.Unmarshal(b, buff)
assert.Nil(t, err)
expectedDataLen := arg.MaxNumOfPeerAuthenticationInResponse - expectedNumOfMissing
assert.Equal(t, expectedDataLen, len(b.Data))
messagesSent++
return nil
},
Expand All @@ -314,9 +315,7 @@ func Test_peerAuthenticationResolver_ProcessReceivedMessage(t *testing.T) {
epoch := uint32(0)
chunkIndex := uint32(0)
err = res.ProcessReceivedMessage(createRequestMsgWithChunkIndex(dataRetriever.ChunkType, []byte(""), epoch, chunkIndex), fromConnectedPeer)
assert.True(t, errors.Is(err, dataRetriever.ErrNotFound))
expectedSubstrErr := fmt.Sprintf("%s %d %s", "from", expectedNumOfErrors, "encountered errors")
assert.True(t, strings.Contains(fmt.Sprintf("%s", err), expectedSubstrErr))
assert.Nil(t, err)
assert.Equal(t, 1, messagesSent)
})
t.Run("resolveChunkRequest: Send returns error", func(t *testing.T) {
Expand Down Expand Up @@ -382,7 +381,7 @@ func Test_peerAuthenticationResolver_ProcessReceivedMessage(t *testing.T) {
err = res.ProcessReceivedMessage(createRequestMsg(dataRetriever.HashArrayType, []byte("invalid data")), fromConnectedPeer)
assert.NotNil(t, err)
})
t.Run("resolveMultipleHashesRequest: all hashes missing from cache", func(t *testing.T) {
t.Run("resolveMultipleHashesRequest: all hashes missing from cache should error", func(t *testing.T) {
t.Parallel()

cache := testscommon.NewCacherStub()
Expand All @@ -395,10 +394,6 @@ func Test_peerAuthenticationResolver_ProcessReceivedMessage(t *testing.T) {
wasSent := false
arg.SenderResolver = &mock.TopicResolverSenderStub{
SendCalled: func(buff []byte, peer core.PeerID) error {
b := &batch.Batch{}
err := arg.Marshalizer.Unmarshal(b, buff)
assert.Nil(t, err)
assert.Equal(t, 0, len(b.Data))
wasSent = true
return nil
},
Expand All @@ -411,11 +406,11 @@ func Test_peerAuthenticationResolver_ProcessReceivedMessage(t *testing.T) {
providedHashes, err := arg.Marshalizer.Marshal(batch.Batch{Data: hashes})
assert.Nil(t, err)
err = res.ProcessReceivedMessage(createRequestMsg(dataRetriever.HashArrayType, providedHashes), fromConnectedPeer)
expectedSubstrErr := fmt.Sprintf("%s %d %s", "from", len(hashes), "encountered errors")
expectedSubstrErr := fmt.Sprintf("%s %s", "from buff", providedHashes)
assert.True(t, strings.Contains(fmt.Sprintf("%s", err), expectedSubstrErr))
assert.True(t, wasSent)
assert.False(t, wasSent)
})
t.Run("resolveMultipleHashesRequest: some data missing from cache", func(t *testing.T) {
t.Run("resolveMultipleHashesRequest: some data missing from cache should work", func(t *testing.T) {
t.Parallel()

arg := createMockArgPeerAuthenticationResolver()
Expand All @@ -430,8 +425,8 @@ func Test_peerAuthenticationResolver_ProcessReceivedMessage(t *testing.T) {
pks = append(pks, []byte(pk2))

hashes := make([][]byte, 0)
hashes = append(hashes, []byte("pk0")) // 2 entries, both pk1 and pk2
hashes = append(hashes, []byte("pk1")) // no entries
hashes = append(hashes, []byte("pk01")) // exists in cache
hashes = append(hashes, []byte("pk1")) // no entries
providedHashes, err := arg.Marshalizer.Marshal(batch.Batch{Data: hashes})
assert.Nil(t, err)

Expand All @@ -451,7 +446,7 @@ func Test_peerAuthenticationResolver_ProcessReceivedMessage(t *testing.T) {
b := &batch.Batch{}
err = arg.Marshalizer.Unmarshal(b, buff)
assert.Nil(t, err)
assert.Equal(t, 2, len(b.Data)) // 2 entries for one of the hashes in the keys
assert.Equal(t, 1, len(b.Data)) // 1 entry for provided hashes
wasSent = true
return nil
},
Expand All @@ -461,8 +456,7 @@ func Test_peerAuthenticationResolver_ProcessReceivedMessage(t *testing.T) {
assert.False(t, res.IsInterfaceNil())

err = res.ProcessReceivedMessage(createRequestMsg(dataRetriever.HashArrayType, providedHashes), fromConnectedPeer)
expectedSubstrErr := fmt.Sprintf("%s %d %s", "from", 1, "encountered errors")
assert.True(t, strings.Contains(fmt.Sprintf("%s", err), expectedSubstrErr))
assert.Nil(t, err)
assert.True(t, wasSent)
})
t.Run("resolveMultipleHashesRequest: Send returns error", func(t *testing.T) {
Expand Down Expand Up @@ -533,10 +527,7 @@ func Test_peerAuthenticationResolver_ProcessReceivedMessage(t *testing.T) {

epoch := uint32(0)
chunkIndex := uint32(0)
hashes := make([][]byte, 0)
hashes = append(hashes, []byte("pk")) // all entries start with pk, so we should have len(pksMap) = 9 entries

providedHashes, err := arg.Marshalizer.Marshal(batch.Batch{Data: hashes})
providedHashes, err := arg.Marshalizer.Marshal(batch.Batch{Data: providedKeys})
assert.Nil(t, err)
err = res.ProcessReceivedMessage(createRequestMsgWithChunkIndex(dataRetriever.HashArrayType, providedHashes, epoch, chunkIndex), fromConnectedPeer)
assert.Nil(t, err)
Expand Down

0 comments on commit 6a36170

Please sign in to comment.