diff --git a/gossip/privdata/coordinator.go b/gossip/privdata/coordinator.go index ab5d6eb4ddd..7ba31d2321d 100644 --- a/gossip/privdata/coordinator.go +++ b/gossip/privdata/coordinator.go @@ -13,6 +13,7 @@ import ( util2 "github.com/hyperledger/fabric/common/util" "github.com/hyperledger/fabric/core/committer" "github.com/hyperledger/fabric/core/committer/txvalidator" + "github.com/hyperledger/fabric/core/common/privdata" "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil" "github.com/hyperledger/fabric/core/transientstore" @@ -68,20 +69,27 @@ type Coordinator interface { Close() } -type fetcher interface { +type Fetcher interface { fetch(req *gossip2.RemotePvtDataRequest) ([]*gossip2.PvtDataElement, error) } -type coordinator struct { +type Support struct { + privdata.PolicyParser + privdata.PolicyStore txvalidator.Validator committer.Committer TransientStore - gossipFetcher fetcher + Fetcher +} + +type coordinator struct { + selfSignedData common.SignedData + Support } // NewCoordinator creates a new instance of coordinator -func NewCoordinator(committer committer.Committer, store TransientStore, gossipFetcher fetcher, validator txvalidator.Validator) Coordinator { - return &coordinator{Committer: committer, TransientStore: store, gossipFetcher: gossipFetcher, Validator: validator} +func NewCoordinator(support Support, selfSignedData common.SignedData) Coordinator { + return &coordinator{Support: support, selfSignedData: selfSignedData} } // StorePvtData used to persist private date into transient store @@ -164,7 +172,7 @@ func (c *coordinator) fetchFromPeers(blockSeq uint64, missingKeys rwsetKeys, own }) }) - fetchedData, err := c.gossipFetcher.fetch(req) + fetchedData, err := c.fetch(req) if err != nil { logger.Warning("Failed fetching private data for block", blockSeq, "from peers:", err) return @@ -445,6 +453,9 @@ func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets ma for _, ns := range txRWSet.NsRwSets { for _, hashed := range ns.CollHashedRwSets { + if !c.isEligible(chdr, ns.NameSpace, hashed.CollectionName) { + continue + } key := rwSetKey{ txID: chdr.TxId, seqInBlock: uint64(seqInBlock), @@ -476,6 +487,31 @@ func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets ma return missing, nil } +// isEligible checks if this peer is eligible for a collection in a given namespace +func (c *coordinator) isEligible(chdr *common.ChannelHeader, namespace string, col string) bool { + cp := rwset.CollectionCriteria{ + Channel: chdr.ChannelId, + Namespace: namespace, + Collection: col, + TxId: chdr.TxId, + } + sp := c.PolicyStore.CollectionPolicy(cp) + if sp == nil { + logger.Warning("Failed obtaining policy for", cp, "skipping collection") + return false + } + filt := c.PolicyParser.Parse(sp) + if filt == nil { + logger.Warning("Failed parsing policy for", cp, "skipping collection") + return false + } + eligible := filt(c.selfSignedData) + if !eligible { + logger.Debug("Skipping", cp, "because we're not eligible for the private data") + } + return eligible +} + // GetPvtDataAndBlockByNum get block by number and returns also all related private data // the order of private data in slice of PvtDataCollections doesn't implies the order of // transactions in the block related to these private data, to get the correct placement diff --git a/gossip/privdata/coordinator_test.go b/gossip/privdata/coordinator_test.go index f737b4cec82..489780d160b 100644 --- a/gossip/privdata/coordinator_test.go +++ b/gossip/privdata/coordinator_test.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package privdata import ( + "encoding/asn1" "encoding/hex" "errors" "fmt" @@ -14,6 +15,7 @@ import ( "testing" util2 "github.com/hyperledger/fabric/common/util" + "github.com/hyperledger/fabric/core/common/privdata" "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/transientstore" "github.com/hyperledger/fabric/gossip/util" @@ -220,6 +222,75 @@ func (f *fetcherMock) fetch(req *proto.RemotePvtDataRequest) ([]*proto.PvtDataEl return nil, args.Get(1).(error) } +func createPolicyStore(selfSignedData common.SignedData) *policyStore { + return &policyStore{ + selfSignedData: selfSignedData, + policies: make(map[serializedPolicy]rwset.CollectionCriteria), + store: make(map[rwset.CollectionCriteria]serializedPolicy), + } +} + +type policyStore struct { + selfSignedData common.SignedData + acceptsAll bool + store map[rwset.CollectionCriteria]serializedPolicy + policies map[serializedPolicy]rwset.CollectionCriteria +} + +func (ps *policyStore) thatAcceptsAll() *policyStore { + ps.acceptsAll = true + return ps +} + +func (ps *policyStore) thatAccepts(cc rwset.CollectionCriteria) *policyStore { + sp := serializedPolicy{ + ps: ps, + n: util.RandomUInt64(), + } + ps.store[cc] = sp + ps.policies[sp] = cc + return ps +} + +func (ps *policyStore) CollectionPolicy(cc rwset.CollectionCriteria) privdata.SerializedPolicy { + if sp, exists := ps.store[cc]; exists { + return &sp + } + return &serializedPolicy{ + ps: ps, + n: util.RandomUInt64(), + } +} + +type serializedPolicy struct { + ps *policyStore + n uint64 +} + +func (*serializedPolicy) Channel() string { + panic("implement me") +} + +func (*serializedPolicy) Raw() []byte { + panic("implement me") +} + +type policyParser struct { +} + +func (*policyParser) Parse(serializedPol privdata.SerializedPolicy) privdata.Filter { + sp := serializedPol.(*serializedPolicy) + return func(sd common.SignedData) bool { + that, _ := asn1.Marshal(sd) + this, _ := asn1.Marshal(sp.ps.selfSignedData) + if hex.EncodeToString(that) != hex.EncodeToString(this) { + panic("Self signed data passed isn't equal to expected") + } + _, exists := sp.ps.policies[*sp] + return exists || sp.ps.acceptsAll + } +} + func TestPvtDataCollections_FailOnEmptyPayload(t *testing.T) { collection := &util.PvtDataCollections{ &ledger.TxPvtData{ @@ -437,11 +508,18 @@ var expectedCommittedPrivateData2 = map[uint64]*ledger.TxPvtData{ } func TestCoordinatorStoreInvalidBlock(t *testing.T) { + peerSelfSignedData := common.SignedData{ + Identity: []byte{0, 1, 2}, + Signature: []byte{3, 4, 5}, + Data: []byte{6, 7, 8}, + } hash := util2.ComputeSHA256([]byte("rws-pre-image")) committer := &committerMock{} committer.On("CommitWithPvtData", mock.Anything).Run(func(args mock.Arguments) { t.Fatal("Shouldn't have committed") }).Return(nil) + ps := createPolicyStore(peerSelfSignedData).thatAcceptsAll() + pp := &policyParser{} store := &mockTransientStore{t: t} fetcher := &fetcherMock{t: t} pdFactory := &pvtDataFactory{} @@ -452,7 +530,14 @@ func TestCoordinatorStoreInvalidBlock(t *testing.T) { block := bf.withoutMetadata().create() // Scenario I: Block we got doesn't have any metadata with it pvtData := pdFactory.create() - coordinator := NewCoordinator(committer, store, fetcher, &validatorMock{}) + coordinator := NewCoordinator(Support{ + PolicyStore: ps, + PolicyParser: pp, + Committer: committer, + Fetcher: fetcher, + TransientStore: store, + Validator: &validatorMock{}, + }, peerSelfSignedData) err := coordinator.StoreBlock(block, pvtData) assert.Error(t, err) assert.Contains(t, err.Error(), "Block.Metadata is nil or Block.Metadata lacks a Tx filter bitmap") @@ -460,7 +545,14 @@ func TestCoordinatorStoreInvalidBlock(t *testing.T) { // Scenario II: Validator has an error while validating the block block = bf.create() pvtData = pdFactory.create() - coordinator = NewCoordinator(committer, store, fetcher, &validatorMock{fmt.Errorf("failed validating block")}) + coordinator = NewCoordinator(Support{ + PolicyStore: ps, + PolicyParser: pp, + Committer: committer, + Fetcher: fetcher, + TransientStore: store, + Validator: &validatorMock{fmt.Errorf("failed validating block")}, + }, peerSelfSignedData) err = coordinator.StoreBlock(block, pvtData) assert.Error(t, err) assert.Contains(t, err.Error(), "failed validating block") @@ -468,7 +560,14 @@ func TestCoordinatorStoreInvalidBlock(t *testing.T) { // Scenario III: Block we got contains an inadequate length of Tx filter in the metadata block = bf.withMetadataSize(100).create() pvtData = pdFactory.create() - coordinator = NewCoordinator(committer, store, fetcher, &validatorMock{}) + coordinator = NewCoordinator(Support{ + PolicyStore: ps, + PolicyParser: pp, + Committer: committer, + Fetcher: fetcher, + TransientStore: store, + Validator: &validatorMock{}, + }, peerSelfSignedData) err = coordinator.StoreBlock(block, pvtData) assert.Error(t, err) assert.Contains(t, err.Error(), "Block data size") @@ -496,7 +595,14 @@ func TestCoordinatorStoreInvalidBlock(t *testing.T) { }).Return(nil) block = bf.withInvalidTxns(1).AddTxn("tx1", "ns1", hash, "c1", "c2").AddTxn("tx2", "ns2", hash, "c1").create() pvtData = pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").create() - coordinator = NewCoordinator(committer, store, fetcher, &validatorMock{}) + coordinator = NewCoordinator(Support{ + PolicyParser: pp, + PolicyStore: ps, + Committer: committer, + Fetcher: fetcher, + TransientStore: store, + Validator: &validatorMock{}, + }, peerSelfSignedData) err = coordinator.StoreBlock(block, pvtData) assert.NoError(t, err) assertCommitHappened() @@ -507,7 +613,7 @@ func TestCoordinatorStoreInvalidBlock(t *testing.T) { assert.Error(t, err) assert.Contains(t, err.Error(), "Block header is nil") - // Scenario V: Block doesn't contain Data + // Scenario VI: Block doesn't contain Data block.Data = nil err = coordinator.StoreBlock(block, pvtData) assert.Error(t, err) @@ -515,7 +621,16 @@ func TestCoordinatorStoreInvalidBlock(t *testing.T) { } func TestCoordinatorStoreBlock(t *testing.T) { + peerSelfSignedData := common.SignedData{ + Identity: []byte{0, 1, 2}, + Signature: []byte{3, 4, 5}, + Data: []byte{6, 7, 8}, + } // Green path test, all private data should be obtained successfully + + ps := createPolicyStore(peerSelfSignedData).thatAcceptsAll() + pp := &policyParser{} + var commitHappened bool assertCommitHappened := func() { assert.True(t, commitHappened) @@ -541,7 +656,14 @@ func TestCoordinatorStoreBlock(t *testing.T) { // If the coordinator tries fetching from the transientstore, or peers it would result in panic, // because we didn't define yet the "On(...)" invocation of the transient store or other peers. pvtData := pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").addRWSet().addNSRWSet("ns2", "c1").create() - coordinator := NewCoordinator(committer, store, fetcher, &validatorMock{}) + coordinator := NewCoordinator(Support{ + PolicyStore: ps, + PolicyParser: pp, + Committer: committer, + Fetcher: fetcher, + TransientStore: store, + Validator: &validatorMock{}, + }, peerSelfSignedData) err := coordinator.StoreBlock(block, pvtData) assert.NoError(t, err) assertCommitHappened() @@ -640,17 +762,63 @@ func TestCoordinatorStoreBlock(t *testing.T) { assert.True(t, privateDataPassed2Ledger.Equal(expectedCommittedPrivateData2)) commitHappened = true }).Return(nil) - coordinator = NewCoordinator(committer, store, fetcher, &validatorMock{}) + coordinator = NewCoordinator(Support{ + PolicyStore: ps, + PolicyParser: pp, + Committer: committer, + Fetcher: fetcher, + TransientStore: store, + Validator: &validatorMock{}, + }, peerSelfSignedData) err = coordinator.StoreBlock(block, nil) assert.NoError(t, err) assertCommitHappened() + + // Scenario VI: Block contains 2 transactions, and the peer is eligible for only tx3-ns3-c3. + // Also, the blocks comes with a private data for tx3-ns3-c3 so that the peer won't have to fetch the + // private data from the transient store or peers, and in fact- if it attempts to fetch the data it's not eligible + // for from the transient store or from peers - the test would fail because the Mock wasn't initialized. + block = bf.AddTxn("tx3", "ns3", hash, "c3", "c2", "c1").AddTxn("tx1", "ns1", hash, "c1").create() + ps = createPolicyStore(peerSelfSignedData).thatAccepts(rwset.CollectionCriteria{ + TxId: "tx3", + Collection: "c3", + Namespace: "ns3", + Channel: "test", + }) + pp = &policyParser{} + store = &mockTransientStore{t: t} + fetcher = &fetcherMock{t: t} + committer = &committerMock{} + committer.On("CommitWithPvtData", mock.Anything).Run(func(args mock.Arguments) { + var privateDataPassed2Ledger privateData = args.Get(0).(*ledger.BlockAndPvtData).BlockPvtData + assert.True(t, privateDataPassed2Ledger.Equal(expectedCommittedPrivateData2)) + commitHappened = true + }).Return(nil) + coordinator = NewCoordinator(Support{ + PolicyStore: ps, + PolicyParser: pp, + Committer: committer, + Fetcher: fetcher, + TransientStore: store, + Validator: &validatorMock{}, + }, peerSelfSignedData) + + pvtData = pdFactory.addRWSet().addNSRWSet("ns3", "c3").create() + err = coordinator.StoreBlock(block, pvtData) + assert.NoError(t, err) + assertCommitHappened() } func TestCoordinatorGetBlocks(t *testing.T) { committer := &committerMock{} store := &mockTransientStore{t: t} fetcher := &fetcherMock{t: t} - coordinator := NewCoordinator(committer, store, fetcher, &validatorMock{}) + coordinator := NewCoordinator(Support{ + Committer: committer, + Fetcher: fetcher, + TransientStore: store, + Validator: &validatorMock{}, + }, common.SignedData{}) // Bad path: block is not returned committer.On("GetBlocks", mock.Anything).Return([]*common.Block{}) diff --git a/gossip/privdata/util.go b/gossip/privdata/util.go index ede6238b8d7..454e7104de9 100644 --- a/gossip/privdata/util.go +++ b/gossip/privdata/util.go @@ -96,7 +96,7 @@ func (bf *blockFactory) AddTxn(txID string, nsName string, hash []byte, collecti func (bf *blockFactory) create() *common.Block { defer func() { - *bf = blockFactory{} + *bf = blockFactory{channelID: bf.channelID} }() block := &common.Block{ Header: &common.BlockHeader{ diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go index 633631067bb..8873d62e369 100644 --- a/gossip/service/gossip_service.go +++ b/gossip/service/gossip_service.go @@ -217,7 +217,14 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, endpoints []string logger.Debug("Creating state provider for chainID", chainID) servicesAdapter := &state.ServicesMediator{GossipAdapter: g, MCSAdapter: g.mcs} fetcher := privdata2.NewPuller(support.Ps, support.Pp, g.gossipSvc, NewDataRetriever(support.Store), chainID) - coordinator := privdata2.NewCoordinator(support.Committer, support.Store, fetcher, support.Validator) + coordinator := privdata2.NewCoordinator(privdata2.Support{ + PolicyParser: support.Pp, + PolicyStore: support.Ps, + Validator: support.Validator, + TransientStore: support.Store, + Committer: support.Committer, + Fetcher: fetcher, + }, g.createSelfSignedData()) g.privateHandlers[chainID] = privateHandler{ support: support, coordinator: coordinator, @@ -262,6 +269,19 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, endpoints []string } } +func (g *gossipServiceImpl) createSelfSignedData() common.SignedData { + msg := make([]byte, 32) + sig, err := g.mcs.Sign(msg) + if err != nil { + logger.Panicf("Failed creating self signed data because message signing failed: %v", err) + } + return common.SignedData{ + Data: msg, + Signature: sig, + Identity: g.peerIdentity, + } +} + // configUpdated constructs a joinChannelMessage and sends it to the gossipSvc func (g *gossipServiceImpl) configUpdated(config Config) { myOrg := string(g.secAdv.OrgByPeerIdentity(api.PeerIdentityType(g.peerIdentity))) diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go index 9787f27fe12..5d4be9e45ea 100644 --- a/gossip/state/state_test.go +++ b/gossip/state/state_test.go @@ -264,7 +264,11 @@ func newPeerNodeWithGossip(config *gossip.Config, committer committer.Committer, // basic parts servicesAdapater := &ServicesMediator{GossipAdapter: g, MCSAdapter: cs} - coord := privdata.NewCoordinator(committer, &mockTransientStore{}, nil, &validator.MockValidator{}) + coord := privdata.NewCoordinator(privdata.Support{ + Validator: &validator.MockValidator{}, + TransientStore: &mockTransientStore{}, + Committer: committer, + }, pcomm.SignedData{}) sp := NewGossipStateProvider(util.GetTestChainID(), servicesAdapater, coord) if sp == nil { return nil