Skip to content

Commit

Permalink
[FAB-6277] Check if peer is eligible of a collection
Browse files Browse the repository at this point in the history
This change set makes the peer check if it is eligible of a certain
collection when it iterates over the block.
If it isn't eligible of the collection, it skips the collection
and doesn't attempt to search it in the transient store, and doesn't
fetch it from other peers.

Change-Id: Ifc1014a3b3281b37825497f8d9c4bd5ad7069f63
Signed-off-by: yacovm <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Sep 25, 2017
1 parent 7c12814 commit 84e14de
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 17 deletions.
48 changes: 42 additions & 6 deletions gossip/privdata/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
util2 "github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/core/committer/txvalidator"
"github.com/hyperledger/fabric/core/common/privdata"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/core/transientstore"
Expand Down Expand Up @@ -68,20 +69,27 @@ type Coordinator interface {
Close()
}

type fetcher interface {
type Fetcher interface {
fetch(req *gossip2.RemotePvtDataRequest) ([]*gossip2.PvtDataElement, error)
}

type coordinator struct {
type Support struct {
privdata.PolicyParser
privdata.PolicyStore
txvalidator.Validator
committer.Committer
TransientStore
gossipFetcher fetcher
Fetcher
}

type coordinator struct {
selfSignedData common.SignedData
Support
}

// NewCoordinator creates a new instance of coordinator
func NewCoordinator(committer committer.Committer, store TransientStore, gossipFetcher fetcher, validator txvalidator.Validator) Coordinator {
return &coordinator{Committer: committer, TransientStore: store, gossipFetcher: gossipFetcher, Validator: validator}
func NewCoordinator(support Support, selfSignedData common.SignedData) Coordinator {
return &coordinator{Support: support, selfSignedData: selfSignedData}
}

// StorePvtData used to persist private date into transient store
Expand Down Expand Up @@ -164,7 +172,7 @@ func (c *coordinator) fetchFromPeers(blockSeq uint64, missingKeys rwsetKeys, own
})
})

fetchedData, err := c.gossipFetcher.fetch(req)
fetchedData, err := c.fetch(req)
if err != nil {
logger.Warning("Failed fetching private data for block", blockSeq, "from peers:", err)
return
Expand Down Expand Up @@ -445,6 +453,9 @@ func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets ma

for _, ns := range txRWSet.NsRwSets {
for _, hashed := range ns.CollHashedRwSets {
if !c.isEligible(chdr, ns.NameSpace, hashed.CollectionName) {
continue
}
key := rwSetKey{
txID: chdr.TxId,
seqInBlock: uint64(seqInBlock),
Expand Down Expand Up @@ -476,6 +487,31 @@ func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets ma
return missing, nil
}

// isEligible checks if this peer is eligible for a collection in a given namespace
func (c *coordinator) isEligible(chdr *common.ChannelHeader, namespace string, col string) bool {
cp := rwset.CollectionCriteria{
Channel: chdr.ChannelId,
Namespace: namespace,
Collection: col,
TxId: chdr.TxId,
}
sp := c.PolicyStore.CollectionPolicy(cp)
if sp == nil {
logger.Warning("Failed obtaining policy for", cp, "skipping collection")
return false
}
filt := c.PolicyParser.Parse(sp)
if filt == nil {
logger.Warning("Failed parsing policy for", cp, "skipping collection")
return false
}
eligible := filt(c.selfSignedData)
if !eligible {
logger.Debug("Skipping", cp, "because we're not eligible for the private data")
}
return eligible
}

// GetPvtDataAndBlockByNum get block by number and returns also all related private data
// the order of private data in slice of PvtDataCollections doesn't implies the order of
// transactions in the block related to these private data, to get the correct placement
Expand Down
184 changes: 176 additions & 8 deletions gossip/privdata/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ SPDX-License-Identifier: Apache-2.0
package privdata

import (
"encoding/asn1"
"encoding/hex"
"errors"
"fmt"
"reflect"
"testing"

util2 "github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/common/privdata"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/transientstore"
"github.com/hyperledger/fabric/gossip/util"
Expand Down Expand Up @@ -220,6 +222,75 @@ func (f *fetcherMock) fetch(req *proto.RemotePvtDataRequest) ([]*proto.PvtDataEl
return nil, args.Get(1).(error)
}

func createPolicyStore(selfSignedData common.SignedData) *policyStore {
return &policyStore{
selfSignedData: selfSignedData,
policies: make(map[serializedPolicy]rwset.CollectionCriteria),
store: make(map[rwset.CollectionCriteria]serializedPolicy),
}
}

type policyStore struct {
selfSignedData common.SignedData
acceptsAll bool
store map[rwset.CollectionCriteria]serializedPolicy
policies map[serializedPolicy]rwset.CollectionCriteria
}

func (ps *policyStore) thatAcceptsAll() *policyStore {
ps.acceptsAll = true
return ps
}

func (ps *policyStore) thatAccepts(cc rwset.CollectionCriteria) *policyStore {
sp := serializedPolicy{
ps: ps,
n: util.RandomUInt64(),
}
ps.store[cc] = sp
ps.policies[sp] = cc
return ps
}

func (ps *policyStore) CollectionPolicy(cc rwset.CollectionCriteria) privdata.SerializedPolicy {
if sp, exists := ps.store[cc]; exists {
return &sp
}
return &serializedPolicy{
ps: ps,
n: util.RandomUInt64(),
}
}

type serializedPolicy struct {
ps *policyStore
n uint64
}

func (*serializedPolicy) Channel() string {
panic("implement me")
}

func (*serializedPolicy) Raw() []byte {
panic("implement me")
}

type policyParser struct {
}

func (*policyParser) Parse(serializedPol privdata.SerializedPolicy) privdata.Filter {
sp := serializedPol.(*serializedPolicy)
return func(sd common.SignedData) bool {
that, _ := asn1.Marshal(sd)
this, _ := asn1.Marshal(sp.ps.selfSignedData)
if hex.EncodeToString(that) != hex.EncodeToString(this) {
panic("Self signed data passed isn't equal to expected")
}
_, exists := sp.ps.policies[*sp]
return exists || sp.ps.acceptsAll
}
}

func TestPvtDataCollections_FailOnEmptyPayload(t *testing.T) {
collection := &util.PvtDataCollections{
&ledger.TxPvtData{
Expand Down Expand Up @@ -437,11 +508,18 @@ var expectedCommittedPrivateData2 = map[uint64]*ledger.TxPvtData{
}

func TestCoordinatorStoreInvalidBlock(t *testing.T) {
peerSelfSignedData := common.SignedData{
Identity: []byte{0, 1, 2},
Signature: []byte{3, 4, 5},
Data: []byte{6, 7, 8},
}
hash := util2.ComputeSHA256([]byte("rws-pre-image"))
committer := &committerMock{}
committer.On("CommitWithPvtData", mock.Anything).Run(func(args mock.Arguments) {
t.Fatal("Shouldn't have committed")
}).Return(nil)
ps := createPolicyStore(peerSelfSignedData).thatAcceptsAll()
pp := &policyParser{}
store := &mockTransientStore{t: t}
fetcher := &fetcherMock{t: t}
pdFactory := &pvtDataFactory{}
Expand All @@ -452,23 +530,44 @@ func TestCoordinatorStoreInvalidBlock(t *testing.T) {
block := bf.withoutMetadata().create()
// Scenario I: Block we got doesn't have any metadata with it
pvtData := pdFactory.create()
coordinator := NewCoordinator(committer, store, fetcher, &validatorMock{})
coordinator := NewCoordinator(Support{
PolicyStore: ps,
PolicyParser: pp,
Committer: committer,
Fetcher: fetcher,
TransientStore: store,
Validator: &validatorMock{},
}, peerSelfSignedData)
err := coordinator.StoreBlock(block, pvtData)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Block.Metadata is nil or Block.Metadata lacks a Tx filter bitmap")

// Scenario II: Validator has an error while validating the block
block = bf.create()
pvtData = pdFactory.create()
coordinator = NewCoordinator(committer, store, fetcher, &validatorMock{fmt.Errorf("failed validating block")})
coordinator = NewCoordinator(Support{
PolicyStore: ps,
PolicyParser: pp,
Committer: committer,
Fetcher: fetcher,
TransientStore: store,
Validator: &validatorMock{fmt.Errorf("failed validating block")},
}, peerSelfSignedData)
err = coordinator.StoreBlock(block, pvtData)
assert.Error(t, err)
assert.Contains(t, err.Error(), "failed validating block")

// Scenario III: Block we got contains an inadequate length of Tx filter in the metadata
block = bf.withMetadataSize(100).create()
pvtData = pdFactory.create()
coordinator = NewCoordinator(committer, store, fetcher, &validatorMock{})
coordinator = NewCoordinator(Support{
PolicyStore: ps,
PolicyParser: pp,
Committer: committer,
Fetcher: fetcher,
TransientStore: store,
Validator: &validatorMock{},
}, peerSelfSignedData)
err = coordinator.StoreBlock(block, pvtData)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Block data size")
Expand Down Expand Up @@ -496,7 +595,14 @@ func TestCoordinatorStoreInvalidBlock(t *testing.T) {
}).Return(nil)
block = bf.withInvalidTxns(1).AddTxn("tx1", "ns1", hash, "c1", "c2").AddTxn("tx2", "ns2", hash, "c1").create()
pvtData = pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").create()
coordinator = NewCoordinator(committer, store, fetcher, &validatorMock{})
coordinator = NewCoordinator(Support{
PolicyParser: pp,
PolicyStore: ps,
Committer: committer,
Fetcher: fetcher,
TransientStore: store,
Validator: &validatorMock{},
}, peerSelfSignedData)
err = coordinator.StoreBlock(block, pvtData)
assert.NoError(t, err)
assertCommitHappened()
Expand All @@ -507,15 +613,24 @@ func TestCoordinatorStoreInvalidBlock(t *testing.T) {
assert.Error(t, err)
assert.Contains(t, err.Error(), "Block header is nil")

// Scenario V: Block doesn't contain Data
// Scenario VI: Block doesn't contain Data
block.Data = nil
err = coordinator.StoreBlock(block, pvtData)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Block data is empty")
}

func TestCoordinatorStoreBlock(t *testing.T) {
peerSelfSignedData := common.SignedData{
Identity: []byte{0, 1, 2},
Signature: []byte{3, 4, 5},
Data: []byte{6, 7, 8},
}
// Green path test, all private data should be obtained successfully

ps := createPolicyStore(peerSelfSignedData).thatAcceptsAll()
pp := &policyParser{}

var commitHappened bool
assertCommitHappened := func() {
assert.True(t, commitHappened)
Expand All @@ -541,7 +656,14 @@ func TestCoordinatorStoreBlock(t *testing.T) {
// If the coordinator tries fetching from the transientstore, or peers it would result in panic,
// because we didn't define yet the "On(...)" invocation of the transient store or other peers.
pvtData := pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").addRWSet().addNSRWSet("ns2", "c1").create()
coordinator := NewCoordinator(committer, store, fetcher, &validatorMock{})
coordinator := NewCoordinator(Support{
PolicyStore: ps,
PolicyParser: pp,
Committer: committer,
Fetcher: fetcher,
TransientStore: store,
Validator: &validatorMock{},
}, peerSelfSignedData)
err := coordinator.StoreBlock(block, pvtData)
assert.NoError(t, err)
assertCommitHappened()
Expand Down Expand Up @@ -640,17 +762,63 @@ func TestCoordinatorStoreBlock(t *testing.T) {
assert.True(t, privateDataPassed2Ledger.Equal(expectedCommittedPrivateData2))
commitHappened = true
}).Return(nil)
coordinator = NewCoordinator(committer, store, fetcher, &validatorMock{})
coordinator = NewCoordinator(Support{
PolicyStore: ps,
PolicyParser: pp,
Committer: committer,
Fetcher: fetcher,
TransientStore: store,
Validator: &validatorMock{},
}, peerSelfSignedData)
err = coordinator.StoreBlock(block, nil)
assert.NoError(t, err)
assertCommitHappened()

// Scenario VI: Block contains 2 transactions, and the peer is eligible for only tx3-ns3-c3.
// Also, the blocks comes with a private data for tx3-ns3-c3 so that the peer won't have to fetch the
// private data from the transient store or peers, and in fact- if it attempts to fetch the data it's not eligible
// for from the transient store or from peers - the test would fail because the Mock wasn't initialized.
block = bf.AddTxn("tx3", "ns3", hash, "c3", "c2", "c1").AddTxn("tx1", "ns1", hash, "c1").create()
ps = createPolicyStore(peerSelfSignedData).thatAccepts(rwset.CollectionCriteria{
TxId: "tx3",
Collection: "c3",
Namespace: "ns3",
Channel: "test",
})
pp = &policyParser{}
store = &mockTransientStore{t: t}
fetcher = &fetcherMock{t: t}
committer = &committerMock{}
committer.On("CommitWithPvtData", mock.Anything).Run(func(args mock.Arguments) {
var privateDataPassed2Ledger privateData = args.Get(0).(*ledger.BlockAndPvtData).BlockPvtData
assert.True(t, privateDataPassed2Ledger.Equal(expectedCommittedPrivateData2))
commitHappened = true
}).Return(nil)
coordinator = NewCoordinator(Support{
PolicyStore: ps,
PolicyParser: pp,
Committer: committer,
Fetcher: fetcher,
TransientStore: store,
Validator: &validatorMock{},
}, peerSelfSignedData)

pvtData = pdFactory.addRWSet().addNSRWSet("ns3", "c3").create()
err = coordinator.StoreBlock(block, pvtData)
assert.NoError(t, err)
assertCommitHappened()
}

func TestCoordinatorGetBlocks(t *testing.T) {
committer := &committerMock{}
store := &mockTransientStore{t: t}
fetcher := &fetcherMock{t: t}
coordinator := NewCoordinator(committer, store, fetcher, &validatorMock{})
coordinator := NewCoordinator(Support{
Committer: committer,
Fetcher: fetcher,
TransientStore: store,
Validator: &validatorMock{},
}, common.SignedData{})

// Bad path: block is not returned
committer.On("GetBlocks", mock.Anything).Return([]*common.Block{})
Expand Down
2 changes: 1 addition & 1 deletion gossip/privdata/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (bf *blockFactory) AddTxn(txID string, nsName string, hash []byte, collecti

func (bf *blockFactory) create() *common.Block {
defer func() {
*bf = blockFactory{}
*bf = blockFactory{channelID: bf.channelID}
}()
block := &common.Block{
Header: &common.BlockHeader{
Expand Down
Loading

0 comments on commit 84e14de

Please sign in to comment.