diff --git a/core/common/privdata/collection.go b/core/common/privdata/collection.go index 6dca1299bdc..5f22d1d82f9 100644 --- a/core/common/privdata/collection.go +++ b/core/common/privdata/collection.go @@ -40,6 +40,10 @@ type CollectionAccessPolicy interface { // RequiredExternalPeerCount returns the minimum number of internal peers // required to send private data to RequiredInternalPeerCount() int + + // MemberOrgs returns the collection's members as MSP IDs. This serves as + // a human-readable way of quickly identifying who is part of a collection. + MemberOrgs() []string } // Filter defines a rule that filters peers according to data signed by them. diff --git a/gossip/privdata/coordinator.go b/gossip/privdata/coordinator.go index d5bb459ae6a..ec43b2166c1 100644 --- a/gossip/privdata/coordinator.go +++ b/gossip/privdata/coordinator.go @@ -12,6 +12,7 @@ import ( "fmt" "time" + "github.com/golang/protobuf/proto" util2 "github.com/hyperledger/fabric/common/util" "github.com/hyperledger/fabric/core/committer" "github.com/hyperledger/fabric/core/committer/txvalidator" @@ -23,6 +24,7 @@ import ( "github.com/hyperledger/fabric/protos/common" gossip2 "github.com/hyperledger/fabric/protos/gossip" "github.com/hyperledger/fabric/protos/ledger/rwset" + msp "github.com/hyperledger/fabric/protos/msp" "github.com/hyperledger/fabric/protos/peer" "github.com/hyperledger/fabric/protos/utils" "github.com/op/go-logging" @@ -77,8 +79,18 @@ type Coordinator interface { Close() } +type dig2sources map[*gossip2.PvtDataDigest][]*peer.Endorsement + +func (d2s dig2sources) keys() []*gossip2.PvtDataDigest { + var res []*gossip2.PvtDataDigest + for dig := range d2s { + res = append(res, dig) + } + return res +} + type Fetcher interface { - fetch(req *gossip2.RemotePvtDataRequest) ([]*gossip2.PvtDataElement, error) + fetch(dig2src dig2sources) ([]*gossip2.PvtDataElement, error) } type Support struct { @@ -132,35 +144,24 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa } logger.Info("Got block", block.Header.Number, "with", len(privateDataSets), "rwsets") - missing, txList, err := c.listMissingPrivateData(block, ownedRWsets) + privateInfo, err := c.listMissingPrivateData(block, ownedRWsets) if err != nil { logger.Warning(err) return err } - logger.Debug("Missing", len(missing), "rwsets") - - // Put into ownedRWsets RW sets that are missing and found in the transient store - c.fetchMissingFromTransientStore(missing, ownedRWsets) - - missingKeys := missing.flatten() - // Remove all keys we already own - missingKeys.exclude(func(key rwSetKey) bool { - _, exists := ownedRWsets[key] - return exists - }) retryThresh := viper.GetDuration("peer.gossip.pvtData.pullRetryThreshold") - logger.Debug("Fetching", len(missingKeys), "rwsets from peers for a maximum duration of", retryThresh) + logger.Debug("Fetching", len(privateInfo.missingKeys), "rwsets from peers for a maximum duration of", retryThresh) start := time.Now() limit := start.Add(retryThresh) - for len(missingKeys) > 0 && time.Now().Before(limit) { - c.fetchFromPeers(block.Header.Number, missingKeys, ownedRWsets) + for len(privateInfo.missingKeys) > 0 && time.Now().Before(limit) { + c.fetchFromPeers(block.Header.Number, ownedRWsets, privateInfo) time.Sleep(pullRetrySleepInterval) } - if len(missingKeys) == 0 { + if len(privateInfo.missingKeys) == 0 { logger.Debug("Fetched all missing rwsets from peers") } else { - logger.Warning("Missing", missingKeys) + logger.Warning("Missing", privateInfo.missingKeys) } // populate the private RWSets passed to the ledger @@ -174,7 +175,7 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa } // populate missing RWSets to be passed to the ledger - for missingRWS := range missingKeys { + for missingRWS := range privateInfo.missingKeys { blockAndPvtData.Missing = append(blockAndPvtData.Missing, ledger.MissingPrivateData{ TxId: missingRWS.txID, Namespace: missingRWS.namespace, @@ -191,28 +192,30 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa if len(blockAndPvtData.BlockPvtData) > 0 { // Finally, purge all transactions in block - valid or not valid. - if err := c.PurgeByTxids(txList); err != nil { - logger.Error("Purging transactions", txList, "failed:", err) + if err := c.PurgeByTxids(privateInfo.txns); err != nil { + logger.Error("Purging transactions", privateInfo.txns, "failed:", err) } } return nil } -func (c *coordinator) fetchFromPeers(blockSeq uint64, missingKeys rwsetKeys, ownedRWsets map[rwSetKey][]byte) { - req := &gossip2.RemotePvtDataRequest{} - missingKeys.foreach(func(k rwSetKey) { - req.Digests = append(req.Digests, &gossip2.PvtDataDigest{ +type dataSources map[rwSetKey][]*peer.Endorsement + +func (c *coordinator) fetchFromPeers(blockSeq uint64, ownedRWsets map[rwSetKey][]byte, privateInfo *privateDataInfo) { + dig2src := make(map[*gossip2.PvtDataDigest][]*peer.Endorsement) + privateInfo.missingKeys.foreach(func(k rwSetKey) { + logger.Debug("Fetching", k, "from peers") + dig := &gossip2.PvtDataDigest{ TxId: k.txID, SeqInBlock: k.seqInBlock, Collection: k.collection, Namespace: k.namespace, BlockSeq: blockSeq, - }) + } + dig2src[dig] = privateInfo.sources[k] }) - - logger.Debug("Fetching", req.Digests, "from peers") - fetchedData, err := c.fetch(req) + fetchedData, err := c.fetch(dig2src) if err != nil { logger.Warning("Failed fetching private data for block", blockSeq, "from peers:", err) return @@ -230,12 +233,12 @@ func (c *coordinator) fetchFromPeers(blockSeq uint64, missingKeys rwsetKeys, own seqInBlock: dig.SeqInBlock, hash: hash, } - if _, isMissing := missingKeys[key]; !isMissing { + if _, isMissing := privateInfo.missingKeys[key]; !isMissing { logger.Debug("Ignoring", key, "because it wasn't found in the block") continue } ownedRWsets[key] = rws - delete(missingKeys, key) + delete(privateInfo.missingKeys, key) // TODO Pass received at block height instead of 0 c.TransientStore.Persist(dig.TxId, 0, key.toTxPvtReadWriteSet(rws)) logger.Debug("Fetched", key) @@ -461,8 +464,9 @@ func (k *rwSetKey) toTxPvtReadWriteSet(rws []byte) *rwset.TxPvtReadWriteSet { type txns []string type blockData [][]byte +type blockConsumer func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet, endorsers []*peer.Endorsement) -func (data blockData) forEachTxn(txsFilter txValidationFlags, consumer func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet)) (txns, error) { +func (data blockData) forEachTxn(txsFilter txValidationFlags, consumer blockConsumer) (txns, error) { var txList []string for seqInBlock, envBytes := range data { env, err := utils.GetEnvelopeFromBlock(envBytes) @@ -500,54 +504,81 @@ func (data blockData) forEachTxn(txsFilter txValidationFlags, consumer func(seqI continue } + tx, err := utils.GetTransaction(payload.Data) + if err != nil { + logger.Warning("Invalid transaction in payload data for tx ", chdr.TxId, ":", err) + continue + } + + ccActionPayload, err := utils.GetChaincodeActionPayload(tx.Actions[0].Payload) + if err != nil { + logger.Warning("Invalid chaincode action in payload for tx", chdr.TxId, ":", err) + continue + } + + if ccActionPayload.Action == nil { + logger.Warning("Action in ChaincodeActionPayload for", chdr.TxId, "is nil") + continue + } + txRWSet := &rwsetutil.TxRwSet{} if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil { logger.Warning("Failed obtaining TxRwSet from ChaincodeAction's results", err) continue } - consumer(uint64(seqInBlock), chdr, txRWSet) + consumer(uint64(seqInBlock), chdr, txRWSet, ccActionPayload.Action.Endorsements) } return txList, nil } -func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets map[rwSetKey][]byte) (rwSetKeysByTxIDs, txns, error) { +func endorsersFromOrgs(ns string, col string, endorsers []*peer.Endorsement, orgs []string) []*peer.Endorsement { + var res []*peer.Endorsement + for _, e := range endorsers { + sId := &msp.SerializedIdentity{} + err := proto.Unmarshal(e.Endorser, sId) + if err != nil { + logger.Warning("Failed unmarshalling endorser:", err) + continue + } + if !util.Contains(sId.Mspid, orgs) { + logger.Debug(sId.Mspid, "isn't among the collection's orgs:", orgs, "for namespace", ns, ",collection", col) + continue + } + res = append(res, e) + } + return res +} + +type privateDataInfo struct { + sources map[rwSetKey][]*peer.Endorsement + missingKeysByTxIDs rwSetKeysByTxIDs + missingKeys rwsetKeys + txns txns +} + +func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets map[rwSetKey][]byte) (*privateDataInfo, error) { if block.Metadata == nil || len(block.Metadata.Metadata) <= int(common.BlockMetadataIndex_TRANSACTIONS_FILTER) { - return nil, nil, errors.New("Block.Metadata is nil or Block.Metadata lacks a Tx filter bitmap") + return nil, errors.New("Block.Metadata is nil or Block.Metadata lacks a Tx filter bitmap") } txsFilter := txValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) if len(txsFilter) != len(block.Data.Data) { - return nil, nil, errors.Errorf("Block data size(%d) is different from Tx filter size(%d)", len(block.Data.Data), len(txsFilter)) + return nil, errors.Errorf("Block data size(%d) is different from Tx filter size(%d)", len(block.Data.Data), len(txsFilter)) } + sources := make(map[rwSetKey][]*peer.Endorsement) privateRWsetsInBlock := make(map[rwSetKey]struct{}) missing := make(rwSetKeysByTxIDs) data := blockData(block.Data.Data) - txList, err := data.forEachTxn(txsFilter, func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet) { - for _, ns := range txRWSet.NsRwSets { - for _, hashed := range ns.CollHashedRwSets { - if !c.isEligible(chdr, ns.NameSpace, hashed.CollectionName) { - continue - } - key := rwSetKey{ - txID: chdr.TxId, - seqInBlock: seqInBlock, - hash: hex.EncodeToString(hashed.PvtRwSetHash), - namespace: ns.NameSpace, - collection: hashed.CollectionName, - } - privateRWsetsInBlock[key] = struct{}{} - if _, exists := ownedRWsets[key]; !exists { - txAndSeq := txAndSeqInBlock{ - txID: chdr.TxId, - seqInBlock: seqInBlock, - } - missing[txAndSeq] = append(missing[txAndSeq], key) - } - } // for all hashed RW sets - } // for all RW sets - }) + bi := &transactionInspector{ + sources: sources, + missingKeys: missing, + ownedRWsets: ownedRWsets, + privateRWsetsInBlock: privateRWsetsInBlock, + coordinator: c, + } + txList, err := data.forEachTxn(txsFilter, bi.inspectTransaction) if err != nil { - return nil, nil, errors.WithStack(err) + return nil, errors.WithStack(err) } // In the end, iterate over the ownedRWsets, and if the key doesn't exist in // the privateRWsetsInBlock - delete it from the ownedRWsets @@ -558,11 +589,68 @@ func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets ma } } - return missing, txList, nil + privateInfo := &privateDataInfo{ + sources: sources, + missingKeysByTxIDs: missing, + txns: txList, + } + + logger.Debug("Missing", len(privateInfo.missingKeysByTxIDs), "rwsets") + + // Put into ownedRWsets RW sets that are missing and found in the transient store + c.fetchMissingFromTransientStore(privateInfo.missingKeysByTxIDs, ownedRWsets) + + privateInfo.missingKeys = privateInfo.missingKeysByTxIDs.flatten() + // Remove all keys we already own + privateInfo.missingKeys.exclude(func(key rwSetKey) bool { + _, exists := ownedRWsets[key] + return exists + }) + + return privateInfo, nil +} + +type transactionInspector struct { + *coordinator + privateRWsetsInBlock map[rwSetKey]struct{} + missingKeys rwSetKeysByTxIDs + sources map[rwSetKey][]*peer.Endorsement + ownedRWsets map[rwSetKey][]byte +} + +func (bi *transactionInspector) inspectTransaction(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet, endorsers []*peer.Endorsement) { + for _, ns := range txRWSet.NsRwSets { + for _, hashed := range ns.CollHashedRwSets { + policy := bi.accessPolicyForCollection(chdr, ns.NameSpace, hashed.CollectionName) + if policy == nil { + continue + } + if !bi.isEligible(policy, ns.NameSpace, hashed.CollectionName) { + continue + } + key := rwSetKey{ + txID: chdr.TxId, + seqInBlock: seqInBlock, + hash: hex.EncodeToString(hashed.PvtRwSetHash), + namespace: ns.NameSpace, + collection: hashed.CollectionName, + } + bi.privateRWsetsInBlock[key] = struct{}{} + if _, exists := bi.ownedRWsets[key]; !exists { + txAndSeq := txAndSeqInBlock{ + txID: chdr.TxId, + seqInBlock: seqInBlock, + } + bi.missingKeys[txAndSeq] = append(bi.missingKeys[txAndSeq], key) + bi.sources[key] = endorsersFromOrgs(ns.NameSpace, hashed.CollectionName, endorsers, policy.MemberOrgs()) + } + } // for all hashed RW sets + } // for all RW sets } -// isEligible checks if this peer is eligible for a collection in a given namespace -func (c *coordinator) isEligible(chdr *common.ChannelHeader, namespace string, col string) bool { +// accessPolicyForCollection retrieves a CollectionAccessPolicy for a given namespace, collection name +// that corresponds to a given ChannelHeader +func (c *coordinator) accessPolicyForCollection(chdr *common.ChannelHeader, namespace string, col string) privdata.CollectionAccessPolicy { cp := common.CollectionCriteria{ Channel: chdr.ChannelId, Namespace: namespace, @@ -572,16 +660,21 @@ func (c *coordinator) isEligible(chdr *common.ChannelHeader, namespace string, c sp := c.CollectionStore.RetrieveCollectionAccessPolicy(cp) if sp == nil { logger.Warning("Failed obtaining policy for", cp, "skipping collection") - return false + return nil } - filt := sp.AccessFilter() + return sp +} + +// isEligible checks if this peer is eligible for a given CollectionAccessPolicy +func (c *coordinator) isEligible(ap privdata.CollectionAccessPolicy, namespace string, col string) bool { + filt := ap.AccessFilter() if filt == nil { - logger.Warning("Failed parsing policy for", cp, "skipping collection") + logger.Warning("Failed parsing policy for namespace", namespace, "collection", col, "skipping collection") return false } eligible := filt(c.selfSignedData) if !eligible { - logger.Debug("Skipping", cp, "because we're not eligible for the private data") + logger.Debug("Skipping namespace", namespace, "collection", col, "because we're not eligible for the private data") } return eligible } @@ -638,7 +731,7 @@ func (c *coordinator) GetPvtDataAndBlockByNum(seqNum uint64, peerAuthInfo common seqs2Namespaces := aggregatedCollections(make(map[seqAndDataModel]map[string][]*rwset.CollectionPvtReadWriteSet)) data := blockData(blockAndPvtData.Block.Data.Data) - _, err = data.forEachTxn(make(txValidationFlags, len(data)), func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet) { + _, err = data.forEachTxn(make(txValidationFlags, len(data)), func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet, _ []*peer.Endorsement) { item, exists := blockAndPvtData.BlockPvtData[seqInBlock] if !exists { return diff --git a/gossip/privdata/coordinator_test.go b/gossip/privdata/coordinator_test.go index b06704e45c8..322d72df171 100644 --- a/gossip/privdata/coordinator_test.go +++ b/gossip/privdata/coordinator_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + pb "github.com/golang/protobuf/proto" util2 "github.com/hyperledger/fabric/common/util" "github.com/hyperledger/fabric/core/common/privdata" "github.com/hyperledger/fabric/core/ledger" @@ -23,6 +24,7 @@ import ( "github.com/hyperledger/fabric/protos/common" proto "github.com/hyperledger/fabric/protos/gossip" "github.com/hyperledger/fabric/protos/ledger/rwset" + "github.com/hyperledger/fabric/protos/msp" "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -206,8 +208,21 @@ type fetchCall struct { *mock.Call } -func (fc *fetchCall) expectingReq(req *proto.RemotePvtDataRequest) *fetchCall { - fc.fetcher.expectedReq = req +func (fc *fetchCall) expectingEndorsers(orgs ...string) *fetchCall { + if fc.fetcher.expectedEndorsers == nil { + fc.fetcher.expectedEndorsers = make(map[string]struct{}) + } + for _, org := range orgs { + sId := &msp.SerializedIdentity{Mspid: org, IdBytes: []byte(fmt.Sprintf("p0%s", org))} + b, _ := pb.Marshal(sId) + fc.fetcher.expectedEndorsers[string(b)] = struct{}{} + } + + return fc +} + +func (fc *fetchCall) expectingDigests(dig []*proto.PvtDataDigest) *fetchCall { + fc.fetcher.expectedDigests = dig return fc } @@ -219,7 +234,8 @@ func (fc *fetchCall) Return(returnArguments ...interface{}) *mock.Call { type fetcherMock struct { t *testing.T mock.Mock - expectedReq *proto.RemotePvtDataRequest + expectedDigests []*proto.PvtDataDigest + expectedEndorsers map[string]struct{} } func (f *fetcherMock) On(methodName string, arguments ...interface{}) *fetchCall { @@ -229,9 +245,20 @@ func (f *fetcherMock) On(methodName string, arguments ...interface{}) *fetchCall } } -func (f *fetcherMock) fetch(req *proto.RemotePvtDataRequest) ([]*proto.PvtDataElement, error) { - assert.True(f.t, digests(req.Digests).Equal(digests(f.expectedReq.Digests))) - args := f.Called(req) +func (f *fetcherMock) fetch(dig2src dig2sources) ([]*proto.PvtDataElement, error) { + for _, endorsements := range dig2src { + for _, endorsement := range endorsements { + _, exists := f.expectedEndorsers[string(endorsement.Endorser)] + if !exists { + f.t.Fatalf("Encountered a non-expected endorser: %s", string(endorsement.Endorser)) + } + // Else, it exists, so delete it so we will end up with an empty expected map at the end of the call + delete(f.expectedEndorsers, string(endorsement.Endorser)) + } + } + assert.True(f.t, digests(dig2src.keys()).Equal(digests(f.expectedDigests))) + assert.Empty(f.t, f.expectedEndorsers) + args := f.Called(dig2src) if args.Get(1) == nil { return args.Get(0).([]*proto.PvtDataElement), nil } @@ -287,6 +314,10 @@ type collectionAccessPolicy struct { n uint64 } +func (cap *collectionAccessPolicy) MemberOrgs() []string { + return []string{"org0", "org1"} +} + func (cap *collectionAccessPolicy) RequiredInternalPeerCount() int { return viper.GetInt("peer.gossip.pvtData.minInternalPeers") } @@ -693,7 +724,7 @@ func TestCoordinatorStoreBlock(t *testing.T) { bf := &blockFactory{ channelID: "test", } - block := bf.AddTxn("tx1", "ns1", hash, "c1", "c2").AddTxn("tx2", "ns2", hash, "c1").create() + block := bf.AddTxnWithEndorsement("tx1", "ns1", hash, "org1", "c1", "c2").AddTxnWithEndorsement("tx2", "ns2", hash, "org2", "c1").create() // Scenario I: Block we got has sufficient private data alongside it. // If the coordinator tries fetching from the transientstore, or peers it would result in panic, @@ -729,17 +760,17 @@ func TestCoordinatorStoreBlock(t *testing.T) { // Scenario III: Block doesn't have sufficient private data alongside it, // it is missing ns1: c2, and the data exists in the transient store, - // but it is also missing ns2: c1, and that data doesn't exist in the transient store - but in a peer - fetcher.On("fetch", mock.Anything).expectingReq(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{ - { - TxId: "tx1", Namespace: "ns1", Collection: "c2", BlockSeq: 1, - }, - { - TxId: "tx2", Namespace: "ns2", Collection: "c1", BlockSeq: 1, SeqInBlock: 1, - }, + // but it is also missing ns2: c1, and that data doesn't exist in the transient store - but in a peer. + // Additionally, the coordinator should pass an endorser identity of org1, but not of org2, since + // the MemberOrgs() call doesn't return org2 but only org0 and org1. + fetcher.On("fetch", mock.Anything).expectingDigests([]*proto.PvtDataDigest{ + { + TxId: "tx1", Namespace: "ns1", Collection: "c2", BlockSeq: 1, }, - }).Return([]*proto.PvtDataElement{ + { + TxId: "tx2", Namespace: "ns2", Collection: "c1", BlockSeq: 1, SeqInBlock: 1, + }, + }).expectingEndorsers("org1").Return([]*proto.PvtDataElement{ { Digest: &proto.PvtDataDigest{ BlockSeq: 1, @@ -782,11 +813,9 @@ func TestCoordinatorStoreBlock(t *testing.T) { // In this case, we should try to fetch data from peers. block = bf.AddTxn("tx3", "ns3", hash, "c3").create() fetcher = &fetcherMock{t: t} - fetcher.On("fetch", mock.Anything).expectingReq(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{ - { - TxId: "tx3", Namespace: "ns3", Collection: "c3", BlockSeq: 1, - }, + fetcher.On("fetch", mock.Anything).expectingDigests([]*proto.PvtDataDigest{ + { + TxId: "tx3", Namespace: "ns3", Collection: "c3", BlockSeq: 1, }, }).Return([]*proto.PvtDataElement{ { @@ -811,8 +840,6 @@ func TestCoordinatorStoreBlock(t *testing.T) { committer.On("CommitWithPvtData", mock.Anything).Run(func(args mock.Arguments) { var privateDataPassed2Ledger privateData = args.Get(0).(*ledger.BlockAndPvtData).BlockPvtData assert.True(t, privateDataPassed2Ledger.Equal(expectedCommittedPrivateData2)) - fmt.Println(privateDataPassed2Ledger) - fmt.Println(expectedCommittedPrivateData2) commitHappened = true }).Return(nil) coordinator = NewCoordinator(Support{ @@ -914,11 +941,9 @@ func TestProceedWithoutPrivateData(t *testing.T) { fetcher := &fetcherMock{t: t} // Have the peer return in response to the pull, a private data with a non matching hash - fetcher.On("fetch", mock.Anything).expectingReq(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{ - { - TxId: "tx1", Namespace: "ns3", Collection: "c2", BlockSeq: 1, - }, + fetcher.On("fetch", mock.Anything).expectingDigests([]*proto.PvtDataDigest{ + { + TxId: "tx1", Namespace: "ns3", Collection: "c2", BlockSeq: 1, }, }).Return([]*proto.PvtDataElement{ { diff --git a/gossip/privdata/pull.go b/gossip/privdata/pull.go index d6e0f0bffcf..0e0fd1426c3 100644 --- a/gossip/privdata/pull.go +++ b/gossip/privdata/pull.go @@ -200,9 +200,9 @@ func (p *puller) waitForMembership() []discovery.NetworkMember { return members } -func (p *puller) fetch(req *proto.RemotePvtDataRequest) ([]*proto.PvtDataElement, error) { +func (p *puller) fetch(dig2src dig2sources) ([]*proto.PvtDataElement, error) { // computeFilters returns a map from a digest to a routing filter - dig2Filter, err := p.computeFilters(req) + dig2Filter, err := p.computeFilters(dig2src.keys()) if err != nil { return nil, errors.WithStack(err) } @@ -355,9 +355,9 @@ func (dig2Filter digestToFilterMapping) String() string { return buffer.String() } -func (p *puller) computeFilters(req *proto.RemotePvtDataRequest) (digestToFilterMapping, error) { +func (p *puller) computeFilters(digests []*proto.PvtDataDigest) (digestToFilterMapping, error) { filters := make(map[proto.PvtDataDigest]filter.RoutingFilter) - for _, digest := range req.Digests { + for _, digest := range digests { collection := p.cs.RetrieveCollectionAccessPolicy(fcommon.CollectionCriteria{ Channel: p.channel, TxId: digest.TxId, diff --git a/gossip/privdata/pull_test.go b/gossip/privdata/pull_test.go index 06413ab4d7b..e29d165c787 100644 --- a/gossip/privdata/pull_test.go +++ b/gossip/privdata/pull_test.go @@ -9,9 +9,8 @@ package privdata import ( "bytes" "crypto/rand" - "testing" - "sync" + "testing" "github.com/hyperledger/fabric/core/common/privdata" "github.com/hyperledger/fabric/gossip/api" @@ -78,6 +77,10 @@ func (mc *mockCollectionAccess) thatMapsTo(peers ...string) *mockCollectionStore return mc.cs } +func (mc *mockCollectionAccess) MemberOrgs() []string { + return nil +} + func (mc *mockCollectionAccess) AccessFilter() privdata.Filter { policyLock.Lock() defer policyLock.Unlock() @@ -237,9 +240,9 @@ func TestPullerFromOnly1Peer(t *testing.T) { t.Fatal("p3 shouldn't have been selected for pull") }) - fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{dig}, - }) + dasf := &digestsAndSourceFactory{} + + fetchedMessages, err := p1.fetch(dasf.mapDigest(dig).toSources().create()) rws1 := util.PrivateRWSet(fetchedMessages[0].Payload[0]) rws2 := util.PrivateRWSet(fetchedMessages[0].Payload[1]) fetched := []util.PrivateRWSet{rws1, rws2} @@ -270,9 +273,8 @@ func TestPullerDataNotAvailable(t *testing.T) { t.Fatal("p3 shouldn't have been selected for pull") }) - fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{dig}, - }) + dasf := &digestsAndSourceFactory{} + fetchedMessages, err := p1.fetch(dasf.mapDigest(dig).toSources().create()) assert.Empty(t, fetchedMessages) assert.NoError(t, err) } @@ -283,9 +285,9 @@ func TestPullerNoPeersKnown(t *testing.T) { gn := &gossipNetwork{} policyStore := newCollectionStore().withPolicy("col1").thatMapsTo("p2").withPolicy("col1").thatMapsTo("p3") p1 := gn.newPuller("p1", policyStore) - fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{{Collection: "col1", TxId: "txID1", Namespace: "ns1"}}, - }) + dasf := &digestsAndSourceFactory{} + d2s := dasf.mapDigest(&proto.PvtDataDigest{Collection: "col1", TxId: "txID1", Namespace: "ns1"}).toSources().create() + fetchedMessages, err := p1.fetch(d2s) assert.Empty(t, fetchedMessages) assert.Error(t, err) assert.Contains(t, err.Error(), "Empty membership") @@ -298,9 +300,9 @@ func TestPullPeerFilterError(t *testing.T) { policyStore := newCollectionStore().withPolicy("col1").thatMapsTo("p2") p1 := gn.newPuller("p1", policyStore) gn.peers[0].On("PeerFilter", mock.Anything, mock.Anything).Return(nil, errors.New("Failed obtaining filter")) - fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{{Collection: "col1", TxId: "txID1", Namespace: "ns1"}}, - }) + dasf := &digestsAndSourceFactory{} + d2s := dasf.mapDigest(&proto.PvtDataDigest{Collection: "col1", TxId: "txID1", Namespace: "ns1"}).toSources().create() + fetchedMessages, err := p1.fetch(d2s) assert.Error(t, err) assert.Contains(t, err.Error(), "Failed obtaining filter") assert.Empty(t, fetchedMessages) @@ -332,10 +334,9 @@ func TestPullerPeerNotEligible(t *testing.T) { p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig).Run(func(_ mock.Arguments) { t.Fatal("p3 shouldn't have approved the pull") }) - - fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{{Collection: "col1", TxId: "txID1"}}, - }) + dasf := &digestsAndSourceFactory{} + d2s := dasf.mapDigest(&proto.PvtDataDigest{Collection: "col1", TxId: "txID1"}).toSources().create() + fetchedMessages, err := p1.fetch(d2s) assert.Empty(t, fetchedMessages) assert.NoError(t, err) } @@ -371,9 +372,8 @@ func TestPullerDifferentPeersDifferentCollections(t *testing.T) { p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig2).Return(p3TransientStore) - fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{dig1, dig2}, - }) + dasf := &digestsAndSourceFactory{} + fetchedMessages, err := p1.fetch(dasf.mapDigest(dig1).toSources().mapDigest(dig2).toSources().create()) assert.NoError(t, err) rws1 := util.PrivateRWSet(fetchedMessages[0].Payload[0]) rws2 := util.PrivateRWSet(fetchedMessages[0].Payload[1]) @@ -426,9 +426,8 @@ func TestPullerRetries(t *testing.T) { p5.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig).Return(transientStore) // Fetch from someone - fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{dig}, - }) + dasf := &digestsAndSourceFactory{} + fetchedMessages, err := p1.fetch(dasf.mapDigest(dig).toSources().create()) assert.NoError(t, err) rws1 := util.PrivateRWSet(fetchedMessages[0].Payload[0]) rws2 := util.PrivateRWSet(fetchedMessages[0].Payload[1]) diff --git a/gossip/privdata/util.go b/gossip/privdata/util.go index 454e7104de9..62625dba1d7 100644 --- a/gossip/privdata/util.go +++ b/gossip/privdata/util.go @@ -7,12 +7,16 @@ SPDX-License-Identifier: Apache-2.0 package privdata import ( + "fmt" + "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil" "github.com/hyperledger/fabric/protos/common" + gossip2 "github.com/hyperledger/fabric/protos/gossip" "github.com/hyperledger/fabric/protos/ledger/rwset" "github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset" + "github.com/hyperledger/fabric/protos/msp" "github.com/hyperledger/fabric/protos/peer" ) @@ -27,6 +31,10 @@ type blockFactory struct { } func (bf *blockFactory) AddTxn(txID string, nsName string, hash []byte, collections ...string) *blockFactory { + return bf.AddTxnWithEndorsement(txID, nsName, hash, "", collections...) +} + +func (bf *blockFactory) AddTxnWithEndorsement(txID string, nsName string, hash []byte, org string, collections ...string) *blockFactory { txn := &peer.Transaction{ Actions: []*peer.TransactionAction{ {}, @@ -63,6 +71,16 @@ func (bf *blockFactory) AddTxn(txID string, nsName string, hash []byte, collecti }, } + if org != "" { + sId := &msp.SerializedIdentity{Mspid: org, IdBytes: []byte(fmt.Sprintf("p0%s", org))} + b, _ := proto.Marshal(sId) + ccPayload.Action.Endorsements = []*peer.Endorsement{ + { + Endorser: b, + }, + } + } + ccPayloadBytes, err := proto.Marshal(ccPayload) if err != nil { panic(err) @@ -221,3 +239,36 @@ func (df *pvtDataFactory) create() []*ledger.TxPvtData { }() return df.data } + +type digestsAndSourceFactory struct { + d2s dig2sources + lastDig *gossip2.PvtDataDigest +} + +func (f *digestsAndSourceFactory) mapDigest(dig *gossip2.PvtDataDigest) *digestsAndSourceFactory { + f.lastDig = dig + return f +} + +func (f *digestsAndSourceFactory) toSources(orgs ...string) *digestsAndSourceFactory { + if f.d2s == nil { + f.d2s = make(dig2sources) + } + var endorsements []*peer.Endorsement + for i, org := range orgs { + sId := &msp.SerializedIdentity{ + Mspid: org, + IdBytes: []byte(fmt.Sprintf("p%d.%s", i, org)), + } + b, _ := proto.Marshal(sId) + endorsements = append(endorsements, &peer.Endorsement{ + Endorser: b, + }) + } + f.d2s[f.lastDig] = endorsements + return f +} + +func (f *digestsAndSourceFactory) create() dig2sources { + return f.d2s +} diff --git a/gossip/util/misc.go b/gossip/util/misc.go index e192b45dd25..791e0bc7899 100644 --- a/gossip/util/misc.go +++ b/gossip/util/misc.go @@ -25,6 +25,16 @@ type Equals func(a interface{}, b interface{}) bool var viperLock sync.RWMutex +// Contains returns whether a given slice a contains a string s +func Contains(s string, a []string) bool { + for _, e := range a { + if e == s { + return true + } + } + return false +} + // IndexInSlice returns the index of given object o in array func IndexInSlice(array interface{}, o interface{}, equals Equals) int { arr := reflect.ValueOf(array) diff --git a/gossip/util/misc_test.go b/gossip/util/misc_test.go index 5026f01eb1a..f9946586b2d 100644 --- a/gossip/util/misc_test.go +++ b/gossip/util/misc_test.go @@ -27,6 +27,11 @@ func testHappyPath(t *testing.T) { assert.NotEqual(t, n3, n4) } +func TestContains(t *testing.T) { + assert.True(t, Contains("foo", []string{"bar", "foo", "baz"})) + assert.False(t, Contains("foo", []string{"bar", "baz"})) +} + func TestGetRandomInt(t *testing.T) { testHappyPath(t) }