diff --git a/core/chaincode/lifecycle/cache.go b/core/chaincode/lifecycle/cache.go index d08cb941b02..1fa9e943f16 100644 --- a/core/chaincode/lifecycle/cache.go +++ b/core/chaincode/lifecycle/cache.go @@ -179,6 +179,11 @@ func (c *Cache) InitializeLocalChaincodes() error { return nil } +// Name returns the name of the listener +func (c *Cache) Name() string { + return "lifecycle cache listener" +} + // Initialize will populate the set of currently committed chaincode definitions // for a channel into the cache. Note, it this looks like a bit of a DRY violation // with respect to 'Update', but, the error handling is quite different and attempting diff --git a/core/ledger/cceventmgmt/lsccstate_listener.go b/core/ledger/cceventmgmt/lsccstate_listener.go index 987c300511a..f1f58e587ae 100644 --- a/core/ledger/cceventmgmt/lsccstate_listener.go +++ b/core/ledger/cceventmgmt/lsccstate_listener.go @@ -16,6 +16,11 @@ type KVLedgerLSCCStateListener struct { DeployedChaincodeInfoProvider ledger.DeployedChaincodeInfoProvider } +// Name returns the name of the listener +func (listener *KVLedgerLSCCStateListener) Name() string { + return "lscc state listener" +} + func (listener *KVLedgerLSCCStateListener) Initialize(ledgerID string, qe ledger.SimpleQueryExecutor) error { // Noop return nil diff --git a/core/ledger/confighistory/mgr.go b/core/ledger/confighistory/mgr.go index 986f371dd18..b94e8c1d684 100644 --- a/core/ledger/confighistory/mgr.go +++ b/core/ledger/confighistory/mgr.go @@ -45,6 +45,11 @@ func NewMgr(dbPath string, ccInfoProvider ledger.DeployedChaincodeInfoProvider) return &mgr{ccInfoProvider, p}, nil } +// Name returns the name of the listener +func (m *mgr) Name() string { + return "collection configuration history listener" +} + func (m *mgr) Initialize(ledgerID string, qe ledger.SimpleQueryExecutor) error { // Noop return nil diff --git a/core/ledger/kvledger/coll_elg_notifier.go b/core/ledger/kvledger/coll_elg_notifier.go index 8ce001e31d5..b8ddd686e25 100644 --- a/core/ledger/kvledger/coll_elg_notifier.go +++ b/core/ledger/kvledger/coll_elg_notifier.go @@ -20,6 +20,11 @@ type collElgNotifier struct { listeners map[string]collElgListener } +// Name returns the name of the listener +func (n *collElgNotifier) Name() string { + return "collection eligibility listener" +} + func (n *collElgNotifier) Initialize(ledgerID string, qe ledger.SimpleQueryExecutor) error { // Noop return nil diff --git a/core/ledger/kvledger/hashcheck_pvtdata.go b/core/ledger/kvledger/hashcheck_pvtdata.go index 94eaf0be2ee..63aecb5323e 100644 --- a/core/ledger/kvledger/hashcheck_pvtdata.go +++ b/core/ledger/kvledger/hashcheck_pvtdata.go @@ -10,16 +10,16 @@ import ( "bytes" "github.com/hyperledger/fabric-protos-go/ledger/rwset" + "github.com/hyperledger/fabric/common/ledger/blkstorage" "github.com/hyperledger/fabric/common/util" "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil" - "github.com/hyperledger/fabric/core/ledger/ledgerstorage" "github.com/hyperledger/fabric/protoutil" ) // constructValidAndInvalidPvtData computes the valid pvt data and hash mismatch list // from a received pvt data list of old blocks. -func constructValidAndInvalidPvtData(reconciledPvtdata []*ledger.ReconciledPvtdata, blockStore *ledgerstorage.Store) ( +func constructValidAndInvalidPvtData(reconciledPvtdata []*ledger.ReconciledPvtdata, blockStore blkstorage.BlockStore) ( map[uint64][]*ledger.TxPvtData, []*ledger.PvtdataHashMismatch, error, ) { // for each block, for each transaction, retrieve the txEnvelope to @@ -42,7 +42,7 @@ func constructValidAndInvalidPvtData(reconciledPvtdata []*ledger.ReconciledPvtda return validPvtData, invalidPvtData, nil } -func findValidAndInvalidPvtdata(reconciledPvtdata *ledger.ReconciledPvtdata, blockStore *ledgerstorage.Store) ( +func findValidAndInvalidPvtdata(reconciledPvtdata *ledger.ReconciledPvtdata, blockStore blkstorage.BlockStore) ( []*ledger.TxPvtData, []*ledger.PvtdataHashMismatch, error, ) { var validPvtData []*ledger.TxPvtData @@ -70,7 +70,7 @@ func findValidAndInvalidPvtdata(reconciledPvtdata *ledger.ReconciledPvtdata, blo return validPvtData, invalidPvtData, nil } -func retrieveRwsetForTx(blkNum uint64, txNum uint64, blockStore *ledgerstorage.Store) (*rwsetutil.TxRwSet, error) { +func retrieveRwsetForTx(blkNum uint64, txNum uint64, blockStore blkstorage.BlockStore) (*rwsetutil.TxRwSet, error) { // retrieve the txEnvelope from the block store so that the hash of // the pvtData can be retrieved for comparison txEnvelope, err := blockStore.RetrieveTxByBlockNumTranNum(blkNum, txNum) diff --git a/core/ledger/kvledger/hashcheck_pvtdata_test.go b/core/ledger/kvledger/hashcheck_pvtdata_test.go index 2e390f35036..a55526058ed 100644 --- a/core/ledger/kvledger/hashcheck_pvtdata_test.go +++ b/core/ledger/kvledger/hashcheck_pvtdata_test.go @@ -16,6 +16,7 @@ import ( "github.com/hyperledger/fabric/common/ledger/testutil" "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil" + "github.com/hyperledger/fabric/core/ledger/mock" "github.com/hyperledger/fabric/protoutil" "github.com/stretchr/testify/assert" ) @@ -23,7 +24,7 @@ import ( func TestConstructValidInvalidBlocksPvtData(t *testing.T) { conf, cleanup := testConfig(t) defer cleanup() - provider := testutilNewProvider(conf, t) + provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) defer provider.Close() _, gb := testutil.NewBlockGenerator(t, "testLedger", false) @@ -82,7 +83,7 @@ func TestConstructValidInvalidBlocksPvtData(t *testing.T) { Block: blk1, PvtData: pvtDataBlk1, MissingPvtData: missingData} - assert.NoError(t, lg.(*kvLedger).blockStore.CommitWithPvtData(blockAndPvtData1)) + assert.NoError(t, lg.(*kvLedger).commitToPvtAndBlockStore(blockAndPvtData1)) // construct pvtData from missing data in tx3, tx6, and tx7 pvtdata := []*ledger.ReconciledPvtdata{ diff --git a/core/ledger/kvledger/kv_ledger.go b/core/ledger/kvledger/kv_ledger.go index 9e6b3b3e9b0..f965ee9a21a 100644 --- a/core/ledger/kvledger/kv_ledger.go +++ b/core/ledger/kvledger/kv_ledger.go @@ -9,6 +9,7 @@ package kvledger import ( "fmt" "sync" + "sync/atomic" "time" "github.com/golang/protobuf/proto" @@ -16,6 +17,7 @@ import ( "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/common/flogging" commonledger "github.com/hyperledger/fabric/common/ledger" + "github.com/hyperledger/fabric/common/ledger/blkstorage" "github.com/hyperledger/fabric/common/util" "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/cceventmgmt" @@ -25,8 +27,8 @@ import ( "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr" - "github.com/hyperledger/fabric/core/ledger/ledgerstorage" "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy" + "github.com/hyperledger/fabric/core/ledger/pvtdatastorage" "github.com/hyperledger/fabric/internal/pkg/txflags" "github.com/hyperledger/fabric/protoutil" "github.com/pkg/errors" @@ -38,19 +40,24 @@ var logger = flogging.MustGetLogger("kvledger") // This implementation provides a key-value based data model type kvLedger struct { ledgerID string - blockStore *ledgerstorage.Store + blockStore blkstorage.BlockStore + pvtdataStore pvtdatastorage.Store txtmgmt txmgr.TxMgr historyDB *history.DB configHistoryRetriever ledger.ConfigHistoryRetriever blockAPIsRWLock *sync.RWMutex stats *ledgerStats commitHash []byte + // isPvtDataStoreAheadOfBlockStore is read during missing pvtData + // reconciliation and may be updated during a regular block commit. + // Hence, we use atomic value to ensure consistent read. + isPvtstoreAheadOfBlkstore atomic.Value } -// newKVLedger constructs new `KVLedger` func newKVLedger( ledgerID string, - blockStore *ledgerstorage.Store, + blockStore blkstorage.BlockStore, + pvtdataStore pvtdatastorage.Store, versionedDB privacyenabledstate.DB, historyDB *history.DB, configHistoryMgr confighistory.Mgr, @@ -63,9 +70,13 @@ func newKVLedger( hasher ledger.Hasher, ) (*kvLedger, error) { logger.Debugf("Creating KVLedger ledgerID=%s: ", ledgerID) - // Create a kvLedger for this chain/ledger, which encapsulates the underlying - // id store, blockstore, txmgr (state database), history database - l := &kvLedger{ledgerID: ledgerID, blockStore: blockStore, historyDB: historyDB, blockAPIsRWLock: &sync.RWMutex{}} + l := &kvLedger{ + ledgerID: ledgerID, + blockStore: blockStore, + pvtdataStore: pvtdataStore, + historyDB: historyDB, + blockAPIsRWLock: &sync.RWMutex{}, + } btlPolicy := pvtdatapolicy.ConstructBTLPolicy(&collectionInfoRetriever{ledgerID, l, ccInfoProvider}) @@ -80,16 +91,22 @@ func newKVLedger( ); err != nil { return nil, err } + // btlPolicy internally uses queryexecuter and indirectly ends up using txmgr. + // Hence, we need to init the pvtdataStore once the txmgr is initiated. + l.pvtdataStore.Init(btlPolicy) - l.initBlockStore(btlPolicy) - - // Retrieves the current commit hash from the blockstore var err error l.commitHash, err = l.lastPersistedCommitHash() if err != nil { return nil, err } + isAhead, err := l.isPvtDataStoreAheadOfBlockStore() + if err != nil { + return nil, err + } + l.isPvtstoreAheadOfBlkstore.Store(isAhead) + // TODO Move the function `GetChaincodeEventListener` to ledger interface and // this functionality of registering for events to ledgermgmt package so that this // is reused across other future ledger implementations @@ -149,10 +166,6 @@ func (l *kvLedger) initTxMgr( return err } -func (l *kvLedger) initBlockStore(btlPolicy pvtdatapolicy.BTLPolicy) { - l.blockStore.Init(btlPolicy) -} - func (l *kvLedger) lastPersistedCommitHash() ([]byte, error) { bcInfo, err := l.GetBlockchainInfo() if err != nil { @@ -182,14 +195,24 @@ func (l *kvLedger) lastPersistedCommitHash() ([]byte, error) { return commitHash.Value, nil } -//Recover the state database and history database (if exist) -//by recommitting last valid blocks +func (l *kvLedger) isPvtDataStoreAheadOfBlockStore() (bool, error) { + blockStoreInfo, err := l.blockStore.GetBlockchainInfo() + if err != nil { + return false, err + } + pvtstoreHeight, err := l.pvtdataStore.LastCommittedBlockHeight() + if err != nil { + return false, err + } + return pvtstoreHeight > blockStoreInfo.Height, nil +} + func (l *kvLedger) recoverDBs() error { logger.Debugf("Entering recoverDB()") if err := l.syncStateAndHistoryDBWithBlockstore(); err != nil { return err } - if err := l.syncStateDBWithPvtdatastore(); err != nil { + if err := l.syncStateDBWithOldBlkPvtdata(); err != nil { return err } return nil @@ -256,24 +279,22 @@ func (l *kvLedger) syncStateAndHistoryDBWithBlockstore() error { recoverers[0].recoverable, recoverers[1].recoverable) } -func (l *kvLedger) syncStateDBWithPvtdatastore() error { - // TODO: So far, the design philosophy was that the scope of block storage is - // limited to storing and retrieving blocks data with certain guarantees and statedb is - // for the state management. The higher layer, 'kvledger', coordinates the acts between - // the two. However, with maintaining the state of the consumption of blocks (i.e, - // lastUpdatedOldBlockList for pvtstore reconciliation) within private data block storage - // breaks that assumption. The knowledge of what blocks have been consumed for the purpose - // of state update should not lie with the source (i.e., pvtdatastorage). A potential fix - // is mentioned in FAB-12731 +func (l *kvLedger) syncStateDBWithOldBlkPvtdata() error { + // TODO: syncStateDBWithOldBlkPvtdata, GetLastUpdatedOldBlocksPvtData(), + // and ResetLastUpdatedOldBlocksList() can be removed in > v2 LTS. + // From v2.0 onwards, we do not store the last updatedBlksList. + // Only to support the rolling upgrade from v14 LTS to v2 LTS, we + // retain these three functions in v2.0 - FAB-16294. - blocksPvtData, err := l.blockStore.GetLastUpdatedOldBlocksPvtData() + blocksPvtData, err := l.pvtdataStore.GetLastUpdatedOldBlocksPvtData() if err != nil { return err } - // as the pvtdataStore can contain pvtData of yet to be committed blocks, - // we need to filter them before passing it to the transaction manager for - // stateDB updates. + // Assume that the peer has restarted after a rollback or a reset. + // As the pvtdataStore can contain pvtData of yet to be committed blocks, + // we need to filter them before passing it to the transaction manager + // for stateDB updates. if err := l.filterYetToCommitBlocks(blocksPvtData); err != nil { return err } @@ -282,7 +303,7 @@ func (l *kvLedger) syncStateDBWithPvtdatastore() error { return err } - l.blockStore.ResetLastUpdatedOldBlocksList() + l.pvtdataStore.ResetLastUpdatedOldBlocksList() return nil } @@ -323,6 +344,8 @@ func (l *kvLedger) recommitLostBlocks(firstBlockNum uint64, lastBlockNum uint64, // GetTransactionByID retrieves a transaction by id func (l *kvLedger) GetTransactionByID(txID string) (*peer.ProcessedTransaction, error) { + l.blockAPIsRWLock.RLock() + defer l.blockAPIsRWLock.RUnlock() tranEnv, err := l.blockStore.RetrieveTxByID(txID) if err != nil { return nil, err @@ -332,25 +355,23 @@ func (l *kvLedger) GetTransactionByID(txID string) (*peer.ProcessedTransaction, return nil, err } processedTran := &peer.ProcessedTransaction{TransactionEnvelope: tranEnv, ValidationCode: int32(txVResult)} - l.blockAPIsRWLock.RLock() - l.blockAPIsRWLock.RUnlock() return processedTran, nil } // GetBlockchainInfo returns basic info about blockchain func (l *kvLedger) GetBlockchainInfo() (*common.BlockchainInfo, error) { - bcInfo, err := l.blockStore.GetBlockchainInfo() l.blockAPIsRWLock.RLock() defer l.blockAPIsRWLock.RUnlock() + bcInfo, err := l.blockStore.GetBlockchainInfo() return bcInfo, err } // GetBlockByNumber returns block at a given height // blockNumber of math.MaxUint64 will return last block func (l *kvLedger) GetBlockByNumber(blockNumber uint64) (*common.Block, error) { - block, err := l.blockStore.RetrieveBlockByNumber(blockNumber) l.blockAPIsRWLock.RLock() - l.blockAPIsRWLock.RUnlock() + defer l.blockAPIsRWLock.RUnlock() + block, err := l.blockStore.RetrieveBlockByNumber(blockNumber) return block, err } @@ -375,16 +396,16 @@ func (l *kvLedger) GetBlockByHash(blockHash []byte) (*common.Block, error) { // GetBlockByTxID returns a block which contains a transaction func (l *kvLedger) GetBlockByTxID(txID string) (*common.Block, error) { - block, err := l.blockStore.RetrieveBlockByTxID(txID) l.blockAPIsRWLock.RLock() - l.blockAPIsRWLock.RUnlock() + defer l.blockAPIsRWLock.RUnlock() + block, err := l.blockStore.RetrieveBlockByTxID(txID) return block, err } func (l *kvLedger) GetTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error) { - txValidationCode, err := l.blockStore.RetrieveTxValidationCodeByTxID(txID) l.blockAPIsRWLock.RLock() - l.blockAPIsRWLock.RUnlock() + defer l.blockAPIsRWLock.RUnlock() + txValidationCode, err := l.blockStore.RetrieveTxValidationCodeByTxID(txID) return txValidationCode, err } @@ -431,7 +452,7 @@ func (l *kvLedger) CommitLegacy(pvtdataAndBlock *ledger.BlockAndPvtData, commitO // and no longer available in pvtdataStore, eventually these // pvtdata would get expired in the stateDB as well (though it // would miss the pvtData until then) - txPvtData, err := l.blockStore.GetPvtDataByNum(blockNo, nil) + txPvtData, err := l.pvtdataStore.GetPvtDataByBlockNum(blockNo, nil) if err != nil { return err } @@ -454,10 +475,10 @@ func (l *kvLedger) CommitLegacy(pvtdataAndBlock *ledger.BlockAndPvtData, commitO l.addBlockCommitHash(pvtdataAndBlock.Block, updateBatchBytes) } - logger.Debugf("[%s] Committing block [%d] to storage", l.ledgerID, blockNo) + logger.Debugf("[%s] Committing pvtdata and block [%d] to storage", l.ledgerID, blockNo) l.blockAPIsRWLock.Lock() defer l.blockAPIsRWLock.Unlock() - if err = l.blockStore.CommitWithPvtData(pvtdataAndBlock); err != nil { + if err = l.commitToPvtAndBlockStore(pvtdataAndBlock); err != nil { return err } elapsedBlockstorageAndPvtdataCommit := time.Since(startBlockstorageAndPvtdataCommit) @@ -496,6 +517,47 @@ func (l *kvLedger) CommitLegacy(pvtdataAndBlock *ledger.BlockAndPvtData, commitO return nil } +func (l *kvLedger) commitToPvtAndBlockStore(blockAndPvtdata *ledger.BlockAndPvtData) error { + pvtdataStoreHt, err := l.pvtdataStore.LastCommittedBlockHeight() + if err != nil { + return err + } + blockNum := blockAndPvtdata.Block.Header.Number + + if !l.isPvtstoreAheadOfBlkstore.Load().(bool) { + logger.Debugf("Writing block [%d] to pvt data store", blockNum) + // If a state fork occurs during a regular block commit, + // we have a mechanism to drop all blocks followed by refetching of blocks + // and re-processing them. In the current way of doing this, we only drop + // the block files (and related artifacts) but we do not drop/overwrite the + // pvtdatastorage as it might leads to data loss. + // During block reprocessing, as there is a possibility of an invalid pvtdata + // transaction to become valid, we store the pvtdata of invalid transactions + // too in the pvtdataStore as we do for the publicdata in the case of blockStore. + // Hence, we pass all pvtData present in the block to the pvtdataStore committer. + pvtData, missingPvtData := constructPvtDataAndMissingData(blockAndPvtdata) + if err := l.pvtdataStore.Commit(blockNum, pvtData, missingPvtData); err != nil { + return err + } + } else { + logger.Debugf("Skipping writing pvtData to pvt block store as it ahead of the block store") + } + + if err := l.blockStore.AddBlock(blockAndPvtdata.Block); err != nil { + return err + } + + if pvtdataStoreHt == blockNum+1 { + // Only when the pvtdataStore was ahead of blockStore + // during the ledger initialization time, we reach here. + // The pvtdataStore would be ahead of blockstore when + // the peer restarts after a reset of rollback. + l.isPvtstoreAheadOfBlkstore.Store(false) + } + + return nil +} + func convertTxPvtDataArrayToMap(txPvtData []*ledger.TxPvtData) ledger.TxPvtDataMap { txPvtDataMap := make(ledger.TxPvtDataMap) for _, pvtData := range txPvtData { @@ -520,13 +582,17 @@ func (l *kvLedger) updateBlockStats( // most recent `maxBlock` blocks which miss at least a private data of a eligible collection. func (l *kvLedger) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledger.MissingPvtDataInfo, error) { // the missing pvtData info in the pvtdataStore could belong to a block which is yet - // to be processed and committed to the blockStore and stateDB. - // In such cases, we cannot return missing pvtData info. Otherwise, we would end up in - // an inconsistent state database. - if l.blockStore.IsPvtStoreAheadOfBlockStore() { + // to be processed and committed to the blockStore and stateDB (such a scenario is possible + // after a peer rollback). In such cases, we cannot return missing pvtData info. Otherwise, + // we would end up in an inconsistent state database. + if l.isPvtstoreAheadOfBlkstore.Load().(bool) { return nil, nil } - return l.blockStore.GetMissingPvtDataInfoForMostRecentBlocks(maxBlock) + // it is safe to not acquire a read lock on l.blockAPIsRWLock. Without a lock, the value of + // lastCommittedBlock can change due to a new block commit. As a result, we may not + // be able to fetch the missing data info of truly the most recent blocks. This + // decision was made to ensure that the regular block commit rate is not affected. + return l.pvtdataStore.GetMissingPvtDataInfoForMostRecentBlocks(maxBlock) } func (l *kvLedger) addBlockCommitHash(block *common.Block, updateBatchBytes []byte) { @@ -545,28 +611,49 @@ func (l *kvLedger) addBlockCommitHash(block *common.Block, updateBatchBytes []by // GetPvtDataAndBlockByNum returns the block and the corresponding pvt data. // The pvt data is filtered by the list of 'collections' supplied func (l *kvLedger) GetPvtDataAndBlockByNum(blockNum uint64, filter ledger.PvtNsCollFilter) (*ledger.BlockAndPvtData, error) { - blockAndPvtdata, err := l.blockStore.GetPvtDataAndBlockByNum(blockNum, filter) l.blockAPIsRWLock.RLock() - l.blockAPIsRWLock.RUnlock() - return blockAndPvtdata, err + defer l.blockAPIsRWLock.RUnlock() + + var block *common.Block + var pvtdata []*ledger.TxPvtData + var err error + + if block, err = l.blockStore.RetrieveBlockByNumber(blockNum); err != nil { + return nil, err + } + + if pvtdata, err = l.pvtdataStore.GetPvtDataByBlockNum(blockNum, filter); err != nil { + return nil, err + } + + return &ledger.BlockAndPvtData{Block: block, PvtData: constructPvtdataMap(pvtdata)}, nil } // GetPvtDataByNum returns only the pvt data corresponding to the given block number // The pvt data is filtered by the list of 'collections' supplied func (l *kvLedger) GetPvtDataByNum(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) { - pvtdata, err := l.blockStore.GetPvtDataByNum(blockNum, filter) l.blockAPIsRWLock.RLock() - l.blockAPIsRWLock.RUnlock() - return pvtdata, err + defer l.blockAPIsRWLock.RUnlock() + var pvtdata []*ledger.TxPvtData + var err error + if pvtdata, err = l.pvtdataStore.GetPvtDataByBlockNum(blockNum, filter); err != nil { + return nil, err + } + return pvtdata, nil } // DoesPvtDataInfoExist returns true when // (1) the ledger has pvtdata associated with the given block number (or) // (2) a few or all pvtdata associated with the given block number is missing but the // missing info is recorded in the ledger (or) -// (3) the block is committed does not contain any pvtData. +// (3) the block is committed but it does not contain even a single +// transaction with pvtData. func (l *kvLedger) DoesPvtDataInfoExist(blockNum uint64) (bool, error) { - return l.blockStore.DoesPvtDataInfoExist(blockNum) + pvtStoreHt, err := l.pvtdataStore.LastCommittedBlockHeight() + if err != nil { + return false, err + } + return blockNum+1 <= pvtStoreHt, nil } func (l *kvLedger) GetConfigHistoryRetriever() (ledger.ConfigHistoryRetriever, error) { @@ -588,7 +675,8 @@ func (l *kvLedger) CommitPvtDataOfOldBlocks(reconciledPvtdata []*ledger.Reconcil } logger.Debugf("[%s:] Committing pvtData of [%d] old blocks to the pvtdatastore", l.ledgerID, len(reconciledPvtdata)) - err = l.blockStore.CommitPvtDataOfOldBlocks(hashVerifiedPvtData) + + err = l.pvtdataStore.CommitPvtDataOfOldBlocks(hashVerifiedPvtData) if err != nil { return nil, err } @@ -679,13 +767,13 @@ func (a *ccEventListenerAdaptor) ChaincodeDeployDone(succeeded bool) { a.legacyEventListener.ChaincodeDeployDone(succeeded) } -func filterPvtDataOfInvalidTx(hashVerifiedPvtData map[uint64][]*ledger.TxPvtData, blockStore *ledgerstorage.Store) (map[uint64][]*ledger.TxPvtData, error) { +func filterPvtDataOfInvalidTx(hashVerifiedPvtData map[uint64][]*ledger.TxPvtData, blockStore blkstorage.BlockStore) (map[uint64][]*ledger.TxPvtData, error) { committedPvtData := make(map[uint64][]*ledger.TxPvtData) for blkNum, txsPvtData := range hashVerifiedPvtData { // TODO: Instead of retrieving the whole block, we need to retrieve only // the TxValidationFlags from the block metadata. For that, we would need - // to add a new index for the block metadata. FAB- FAB-15808 + // to add a new index for the block metadata - FAB-15808 block, err := blockStore.RetrieveBlockByNumber(blkNum) if err != nil { return nil, err @@ -702,3 +790,37 @@ func filterPvtDataOfInvalidTx(hashVerifiedPvtData map[uint64][]*ledger.TxPvtData } return committedPvtData, nil } + +func constructPvtdataMap(pvtdata []*ledger.TxPvtData) ledger.TxPvtDataMap { + if pvtdata == nil { + return nil + } + m := make(map[uint64]*ledger.TxPvtData) + for _, pvtdatum := range pvtdata { + m[pvtdatum.SeqInBlock] = pvtdatum + } + return m +} + +func constructPvtDataAndMissingData(blockAndPvtData *ledger.BlockAndPvtData) ([]*ledger.TxPvtData, + ledger.TxMissingPvtDataMap) { + + var pvtData []*ledger.TxPvtData + missingPvtData := make(ledger.TxMissingPvtDataMap) + + numTxs := uint64(len(blockAndPvtData.Block.Data.Data)) + + for txNum := uint64(0); txNum < numTxs; txNum++ { + if pvtdata, ok := blockAndPvtData.PvtData[txNum]; ok { + pvtData = append(pvtData, pvtdata) + } + + if missingData, ok := blockAndPvtData.MissingPvtData[txNum]; ok { + for _, missing := range missingData { + missingPvtData.Add(txNum, missing.Namespace, + missing.Collection, missing.IsEligible) + } + } + } + return pvtData, missingPvtData +} diff --git a/core/ledger/kvledger/kv_ledger_provider.go b/core/ledger/kvledger/kv_ledger_provider.go index 40dd35752fc..f5fff6f54ae 100644 --- a/core/ledger/kvledger/kv_ledger_provider.go +++ b/core/ledger/kvledger/kv_ledger_provider.go @@ -12,6 +12,8 @@ import ( "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/common/ledger/blkstorage" + "github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage" "github.com/hyperledger/fabric/common/ledger/dataformat" "github.com/hyperledger/fabric/common/ledger/util/leveldbhelper" "github.com/hyperledger/fabric/core/ledger" @@ -20,7 +22,6 @@ import ( "github.com/hyperledger/fabric/core/ledger/kvledger/history" "github.com/hyperledger/fabric/core/ledger/kvledger/msgs" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate" - "github.com/hyperledger/fabric/core/ledger/ledgerstorage" "github.com/hyperledger/fabric/core/ledger/pvtdatastorage" "github.com/hyperledger/fabric/protoutil" "github.com/pkg/errors" @@ -49,22 +50,32 @@ var ( // formatKey formatKey = []byte("f") + + attrsToIndex = []blkstorage.IndexableAttr{ + blkstorage.IndexableAttrBlockHash, + blkstorage.IndexableAttrBlockNum, + blkstorage.IndexableAttrTxID, + blkstorage.IndexableAttrBlockNumTranNum, + } ) +const maxBlockFileSize = 64 * 1024 * 1024 + // Provider implements interface ledger.PeerLedgerProvider type Provider struct { - idStore *idStore - ledgerStoreProvider *ledgerstorage.Provider - vdbProvider privacyenabledstate.DBProvider - historydbProvider *history.DBProvider - configHistoryMgr confighistory.Mgr - stateListeners []ledger.StateListener - bookkeepingProvider bookkeeping.Provider - initializer *ledger.Initializer - collElgNotifier *collElgNotifier - stats *stats - fileLock *leveldbhelper.FileLock - hasher ledger.Hasher + idStore *idStore + blkStoreProvider blkstorage.BlockStoreProvider + pvtdataStoreProvider pvtdatastorage.Provider + vdbProvider privacyenabledstate.DBProvider + historydbProvider *history.DBProvider + configHistoryMgr confighistory.Mgr + stateListeners []ledger.StateListener + bookkeepingProvider bookkeeping.Provider + initializer *ledger.Initializer + collElgNotifier *collElgNotifier + stats *stats + fileLock *leveldbhelper.FileLock + hasher ledger.Hasher } // NewProvider instantiates a new Provider. @@ -101,7 +112,11 @@ func NewProvider(initializer *ledger.Initializer) (pr *Provider, e error) { return nil, err } - if err := p.initLedgerStorageProvider(); err != nil { + if err := p.initBlockStoreProvider(); err != nil { + return nil, err + } + + if err := p.initPvtDataStoreProvider(); err != nil { return nil, err } @@ -137,22 +152,33 @@ func (p *Provider) initLedgerIDInventory() error { return nil } -func (p *Provider) initLedgerStorageProvider() error { - // initialize ledger storage - privateData := &pvtdatastorage.PrivateDataConfig{ +func (p *Provider) initBlockStoreProvider() error { + indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex} + blkStoreProvider, err := fsblkstorage.NewProvider( + fsblkstorage.NewConf( + BlockStorePath(p.initializer.Config.RootFSPath), + maxBlockFileSize, + ), + indexConfig, + p.initializer.MetricsProvider, + ) + if err != nil { + return err + } + p.blkStoreProvider = blkStoreProvider + return nil +} + +func (p *Provider) initPvtDataStoreProvider() error { + privateDataConfig := &pvtdatastorage.PrivateDataConfig{ PrivateDataConfig: p.initializer.Config.PrivateDataConfig, StorePath: PvtDataStorePath(p.initializer.Config.RootFSPath), } - - ledgerStoreProvider, err := ledgerstorage.NewProvider( - BlockStorePath(p.initializer.Config.RootFSPath), - privateData, - p.initializer.MetricsProvider, - ) + pvtdataStoreProvider, err := pvtdatastorage.NewProvider(privateDataConfig) if err != nil { return err } - p.ledgerStoreProvider = ledgerStoreProvider + p.pvtdataStoreProvider = pvtdataStoreProvider return nil } @@ -281,11 +307,17 @@ func (p *Provider) Open(ledgerID string) (ledger.PeerLedger, error) { func (p *Provider) openInternal(ledgerID string) (ledger.PeerLedger, error) { // Get the block store for a chain/ledger - blockStore, err := p.ledgerStoreProvider.Open(ledgerID) + blockStore, err := p.blkStoreProvider.OpenBlockStore(ledgerID) if err != nil { return nil, err } - p.collElgNotifier.registerListener(ledgerID, blockStore) + + pvtdataStore, err := p.pvtdataStoreProvider.OpenStore(ledgerID) + if err != nil { + return nil, err + } + + p.collElgNotifier.registerListener(ledgerID, pvtdataStore) // Get the versioned database (state database) for a chain/ledger vDB, err := p.vdbProvider.GetDBHandle(ledgerID) @@ -302,11 +334,10 @@ func (p *Provider) openInternal(ledgerID string) (ledger.PeerLedger, error) { } } - // Create a kvLedger for this chain/ledger, which encapsulates the underlying data stores - // (id store, blockstore, state database, history database) l, err := newKVLedger( ledgerID, blockStore, + pvtdataStore, vDB, historyDB, p.configHistoryMgr, @@ -339,8 +370,11 @@ func (p *Provider) Close() { if p.idStore != nil { p.idStore.close() } - if p.ledgerStoreProvider != nil { - p.ledgerStoreProvider.Close() + if p.blkStoreProvider != nil { + p.blkStoreProvider.Close() + } + if p.pvtdataStoreProvider != nil { + p.pvtdataStoreProvider.Close() } if p.vdbProvider != nil { p.vdbProvider.Close() diff --git a/core/ledger/kvledger/kv_ledger_provider_test.go b/core/ledger/kvledger/kv_ledger_provider_test.go index 6760554ed4f..99f92aaf92b 100644 --- a/core/ledger/kvledger/kv_ledger_provider_test.go +++ b/core/ledger/kvledger/kv_ledger_provider_test.go @@ -36,7 +36,7 @@ import ( func TestLedgerProvider(t *testing.T) { conf, cleanup := testConfig(t) defer cleanup() - provider := testutilNewProvider(conf, t) + provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) numLedgers := 10 existingLedgerIDs, err := provider.List() assert.NoError(t, err) @@ -59,7 +59,7 @@ func TestLedgerProvider(t *testing.T) { provider.Close() - provider = testutilNewProvider(conf, t) + provider = testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) defer provider.Close() ledgerIds, _ := provider.List() assert.Len(t, ledgerIds, numLedgers) @@ -107,7 +107,7 @@ func TestLedgerProvider(t *testing.T) { func TestGetActiveLedgerIDsIteratorError(t *testing.T) { conf, cleanup := testConfig(t) defer cleanup() - provider := testutilNewProvider(conf, t) + provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) for i := 0; i < 2; i++ { genesisBlock, _ := configtxtest.MakeGenesisBlock(constructTestLedgerID(i)) @@ -123,7 +123,7 @@ func TestGetActiveLedgerIDsIteratorError(t *testing.T) { func TestLedgerMetataDataUnmarshalError(t *testing.T) { conf, cleanup := testConfig(t) defer cleanup() - provider := testutilNewProvider(conf, t) + provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) defer provider.Close() ledgerID := constructTestLedgerID(0) @@ -160,7 +160,7 @@ func TestNewProviderIdStoreFormatError(t *testing.T) { func TestUpgradeIDStoreFormatDBError(t *testing.T) { conf, cleanup := testConfig(t) defer cleanup() - provider := testutilNewProvider(conf, t) + provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) provider.Close() err := provider.idStore.upgradeFormat() @@ -171,7 +171,7 @@ func TestLedgerProviderHistoryDBDisabled(t *testing.T) { conf, cleanup := testConfig(t) conf.HistoryDBConfig.Enabled = false defer cleanup() - provider := testutilNewProvider(conf, t) + provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) numLedgers := 10 existingLedgerIDs, err := provider.List() assert.NoError(t, err) @@ -188,7 +188,7 @@ func TestLedgerProviderHistoryDBDisabled(t *testing.T) { provider.Close() - provider = testutilNewProvider(conf, t) + provider = testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) defer provider.Close() ledgerIds, _ := provider.List() assert.Len(t, ledgerIds, numLedgers) @@ -231,7 +231,7 @@ func TestLedgerProviderHistoryDBDisabled(t *testing.T) { func TestRecovery(t *testing.T) { conf, cleanup := testConfig(t) defer cleanup() - provider1 := testutilNewProvider(conf, t) + provider1 := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) defer provider1.Close() // now create the genesis block @@ -246,7 +246,7 @@ func TestRecovery(t *testing.T) { provider1.Close() // construct a new provider1 to invoke recovery - provider1 = testutilNewProvider(conf, t) + provider1 = testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) // verify the underecoveryflag and open the ledger flag, err := provider1.idStore.getUnderConstructionFlag() assert.NoError(t, err, "Failed to read the underconstruction flag") @@ -261,7 +261,7 @@ func TestRecovery(t *testing.T) { provider1.Close() // construct a new provider to invoke recovery - provider2 := testutilNewProvider(conf, t) + provider2 := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) defer provider2.Close() assert.NoError(t, err, "Provider failed to recover an underConstructionLedger") flag, err = provider2.idStore.getUnderConstructionFlag() @@ -273,7 +273,7 @@ func TestRecoveryHistoryDBDisabled(t *testing.T) { conf, cleanup := testConfig(t) conf.HistoryDBConfig.Enabled = false defer cleanup() - provider1 := testutilNewProvider(conf, t) + provider1 := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) defer provider1.Close() // now create the genesis block @@ -289,7 +289,7 @@ func TestRecoveryHistoryDBDisabled(t *testing.T) { provider1.Close() // construct a new provider to invoke recovery - provider2 := testutilNewProvider(conf, t) + provider2 := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) defer provider2.Close() // verify the underecoveryflag and open the ledger flag, err := provider2.idStore.getUnderConstructionFlag() @@ -305,7 +305,7 @@ func TestRecoveryHistoryDBDisabled(t *testing.T) { provider2.Close() // construct a new provider to invoke recovery - provider3 := testutilNewProvider(conf, t) + provider3 := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) defer provider3.Close() assert.NoError(t, err, "Provider failed to recover an underConstructionLedger") flag, err = provider3.idStore.getUnderConstructionFlag() @@ -316,7 +316,7 @@ func TestRecoveryHistoryDBDisabled(t *testing.T) { func TestMultipleLedgerBasicRW(t *testing.T) { conf, cleanup := testConfig(t) defer cleanup() - provider1 := testutilNewProvider(conf, t) + provider1 := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) defer provider1.Close() numLedgers := 10 @@ -342,7 +342,7 @@ func TestMultipleLedgerBasicRW(t *testing.T) { provider1.Close() - provider2 := testutilNewProvider(conf, t) + provider2 := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) defer provider2.Close() ledgers = make([]lgr.PeerLedger, numLedgers) for i := 0; i < numLedgers; i++ { @@ -382,7 +382,7 @@ func TestLedgerBackup(t *testing.T) { Enabled: true, }, } - provider := testutilNewProvider(origConf, t) + provider := testutilNewProvider(origConf, t, &mock.DeployedChaincodeInfoProvider{}) bg, gb := testutil.NewBlockGenerator(t, ledgerid, false) gbHash := protoutil.BlockHeaderHash(gb.Header) ledger, _ := provider.Create(gb) @@ -432,7 +432,7 @@ func TestLedgerBackup(t *testing.T) { Enabled: true, }, } - provider = testutilNewProvider(restoreConf, t) + provider = testutilNewProvider(restoreConf, t, &mock.DeployedChaincodeInfoProvider{}) defer provider.Close() _, err = provider.Create(gb) @@ -525,13 +525,13 @@ func testConfig(t *testing.T) (conf *lgr.Config, cleanup func()) { return conf, cleanup } -func testutilNewProvider(conf *lgr.Config, t *testing.T) *Provider { +func testutilNewProvider(conf *lgr.Config, t *testing.T, ccInfoProvider *mock.DeployedChaincodeInfoProvider) *Provider { cryptoProvider, err := sw.NewDefaultSecurityLevelWithKeystore(sw.NewDummyKeyStore()) assert.NoError(t, err) provider, err := NewProvider( &lgr.Initializer{ - DeployedChaincodeInfoProvider: &mock.DeployedChaincodeInfoProvider{}, + DeployedChaincodeInfoProvider: ccInfoProvider, MetricsProvider: &disabled.Provider{}, Config: conf, Hasher: cryptoProvider, @@ -547,7 +547,7 @@ func testutilNewProviderWithCollectionConfig( btlConfigs map[string]uint64, conf *lgr.Config, ) *Provider { - provider := testutilNewProvider(conf, t) + provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) mockCCInfoProvider := provider.initializer.DeployedChaincodeInfoProvider.(*mock.DeployedChaincodeInfoProvider) collMap := map[string]*peer.StaticCollectionConfig{} var collConf []*peer.CollectionConfig diff --git a/core/ledger/kvledger/kv_ledger_test.go b/core/ledger/kvledger/kv_ledger_test.go index 6cc98644c7e..a3e7542b3f5 100644 --- a/core/ledger/kvledger/kv_ledger_test.go +++ b/core/ledger/kvledger/kv_ledger_test.go @@ -13,11 +13,18 @@ import ( "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric-protos-go/common" "github.com/hyperledger/fabric-protos-go/ledger/queryresult" + "github.com/hyperledger/fabric-protos-go/ledger/rwset" "github.com/hyperledger/fabric-protos-go/peer" + pb "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/ledger/testutil" "github.com/hyperledger/fabric/common/util" + "github.com/hyperledger/fabric/core/ledger" lgr "github.com/hyperledger/fabric/core/ledger" + "github.com/hyperledger/fabric/core/ledger/mock" + "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy" + btltestutil "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy/testutil" + "github.com/hyperledger/fabric/internal/pkg/txflags" "github.com/hyperledger/fabric/protoutil" "github.com/stretchr/testify/assert" ) @@ -45,7 +52,7 @@ func TestKVLedgerNilHistoryDBProvider(t *testing.T) { func TestKVLedgerBlockStorage(t *testing.T) { conf, cleanup := testConfig(t) defer cleanup() - provider := testutilNewProvider(conf, t) + provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) defer provider.Close() bg, gb := testutil.NewBlockGenerator(t, "testLedger", false) @@ -131,7 +138,7 @@ func TestKVLedgerBlockStorage(t *testing.T) { func TestAddCommitHash(t *testing.T) { conf, cleanup := testConfig(t) defer cleanup() - provider := testutilNewProvider(conf, t) + provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) defer provider.Close() bg, gb := testutil.NewBlockGenerator(t, "testLedger", false) @@ -185,7 +192,7 @@ func TestKVLedgerBlockStorageWithPvtdata(t *testing.T) { t.Skip() conf, cleanup := testConfig(t) defer cleanup() - provider := testutilNewProvider(conf, t) + provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) defer provider.Close() bg, gb := testutil.NewBlockGenerator(t, "testLedger", false) @@ -295,7 +302,7 @@ func TestKVLedgerDBRecovery(t *testing.T) { _, _, err = ledger1.(*kvLedger).txtmgmt.ValidateAndPrepare(blockAndPvtdata2, true) assert.NoError(t, err) - assert.NoError(t, ledger1.(*kvLedger).blockStore.CommitWithPvtData(blockAndPvtdata2)) + assert.NoError(t, ledger1.(*kvLedger).commitToPvtAndBlockStore(blockAndPvtdata2)) // block storage should be as of block-2 but the state and history db should be as of block-1 checkBCSummaryForTest(t, ledger1, @@ -351,7 +358,7 @@ func TestKVLedgerDBRecovery(t *testing.T) { ) _, _, err = ledger2.(*kvLedger).txtmgmt.ValidateAndPrepare(blockAndPvtdata3, true) assert.NoError(t, err) - assert.NoError(t, ledger2.(*kvLedger).blockStore.CommitWithPvtData(blockAndPvtdata3)) + assert.NoError(t, ledger2.(*kvLedger).commitToPvtAndBlockStore(blockAndPvtdata3)) // committing the transaction to state DB assert.NoError(t, ledger2.(*kvLedger).txtmgmt.Commit()) @@ -411,7 +418,7 @@ func TestKVLedgerDBRecovery(t *testing.T) { _, _, err = ledger3.(*kvLedger).txtmgmt.ValidateAndPrepare(blockAndPvtdata4, true) assert.NoError(t, err) - assert.NoError(t, ledger3.(*kvLedger).blockStore.CommitWithPvtData(blockAndPvtdata4)) + assert.NoError(t, ledger3.(*kvLedger).commitToPvtAndBlockStore(blockAndPvtdata4)) assert.NoError(t, ledger3.(*kvLedger).historyDB.Commit(blockAndPvtdata4.Block)) checkBCSummaryForTest(t, ledger3, @@ -460,7 +467,7 @@ func TestKVLedgerDBRecovery(t *testing.T) { func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) { conf, cleanup := testConfig(t) defer cleanup() - provider := testutilNewProvider(conf, t) + provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) defer provider.Close() bg, gb := testutil.NewBlockGenerator(t, "testLedger", false) gbHash := protoutil.BlockHeaderHash(gb.Header) @@ -567,6 +574,410 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) { } } +func TestPvtDataAPIs(t *testing.T) { + conf, cleanup := testConfig(t) + defer cleanup() + provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) + defer provider.Close() + + ledgerID := "testLedger" + bg, gb := testutil.NewBlockGenerator(t, ledgerID, false) + gbHash := protoutil.BlockHeaderHash(gb.Header) + lgr, err := provider.Create(gb) + assert.NoError(t, err) + defer lgr.Close() + lgr.(*kvLedger).pvtdataStore.Init(btlPolicyForSampleData()) + + bcInfo, _ := lgr.GetBlockchainInfo() + assert.Equal(t, &common.BlockchainInfo{ + Height: 1, CurrentBlockHash: gbHash, PreviousBlockHash: nil, + }, bcInfo) + + kvlgr := lgr.(*kvLedger) + + sampleData := sampleDataWithPvtdataForSelectiveTx(t, bg) + for _, sampleDatum := range sampleData { + assert.NoError(t, kvlgr.commitToPvtAndBlockStore(sampleDatum)) + } + + // block 2 has no pvt data + pvtdata, err := lgr.GetPvtDataByNum(2, nil) + assert.NoError(t, err) + assert.Nil(t, pvtdata) + + // block 5 has no pvt data + pvtdata, err = lgr.GetPvtDataByNum(5, nil) + assert.NoError(t, err) + assert.Equal(t, 0, len(pvtdata)) + + // block 3 has pvt data for tx 3, 5 and 6. Though the tx 6 + // is marked as invalid in the block, the pvtData should + // have been stored + pvtdata, err = lgr.GetPvtDataByNum(3, nil) + assert.NoError(t, err) + assert.Equal(t, 3, len(pvtdata)) + assert.Equal(t, uint64(3), pvtdata[0].SeqInBlock) + assert.Equal(t, uint64(5), pvtdata[1].SeqInBlock) + assert.Equal(t, uint64(6), pvtdata[2].SeqInBlock) + + // block 4 has pvt data for tx 4 and 6 only + pvtdata, err = lgr.GetPvtDataByNum(4, nil) + assert.NoError(t, err) + assert.Equal(t, 2, len(pvtdata)) + assert.Equal(t, uint64(4), pvtdata[0].SeqInBlock) + assert.Equal(t, uint64(6), pvtdata[1].SeqInBlock) + + blockAndPvtdata, err := lgr.GetPvtDataAndBlockByNum(3, nil) + assert.NoError(t, err) + assert.True(t, proto.Equal(sampleData[2].Block, blockAndPvtdata.Block)) + + blockAndPvtdata, err = lgr.GetPvtDataAndBlockByNum(4, nil) + assert.NoError(t, err) + assert.True(t, proto.Equal(sampleData[3].Block, blockAndPvtdata.Block)) + + // pvt data retrieval for block 3 with filter should return filtered pvtdata + filter := ledger.NewPvtNsCollFilter() + filter.Add("ns-1", "coll-1") + blockAndPvtdata, err = lgr.GetPvtDataAndBlockByNum(4, filter) + assert.NoError(t, err) + assert.Equal(t, sampleData[3].Block, blockAndPvtdata.Block) + // two transactions should be present + assert.Equal(t, 2, len(blockAndPvtdata.PvtData)) + // both tran number 4 and 6 should have only one collection because of filter + assert.Equal(t, 1, len(blockAndPvtdata.PvtData[4].WriteSet.NsPvtRwset)) + assert.Equal(t, 1, len(blockAndPvtdata.PvtData[6].WriteSet.NsPvtRwset)) + // any other transaction entry should be nil + assert.Nil(t, blockAndPvtdata.PvtData[2]) + + // test missing data retrieval in the presence of invalid tx. Block 6 had + // missing data (for tx4 and tx5). Though tx5 was marked as invalid tx, + // both tx4 and tx5 missing data should be returned + expectedMissingDataInfo := make(ledger.MissingPvtDataInfo) + expectedMissingDataInfo.Add(6, 4, "ns-4", "coll-4") + expectedMissingDataInfo.Add(6, 5, "ns-5", "coll-5") + missingDataInfo, err := lgr.(*kvLedger).GetMissingPvtDataInfoForMostRecentBlocks(1) + assert.NoError(t, err) + assert.Equal(t, expectedMissingDataInfo, missingDataInfo) +} + +func TestCrashAfterPvtdataStoreCommit(t *testing.T) { + conf, cleanup := testConfig(t) + defer cleanup() + ccInfoProvider := &mock.DeployedChaincodeInfoProvider{} + ccInfoProvider.CollectionInfoReturns(&peer.StaticCollectionConfig{BlockToLive: 0}, nil) + provider := testutilNewProvider(conf, t, ccInfoProvider) + defer provider.Close() + + ledgerID := "testLedger" + bg, gb := testutil.NewBlockGenerator(t, ledgerID, false) + gbHash := protoutil.BlockHeaderHash(gb.Header) + lgr, err := provider.Create(gb) + assert.NoError(t, err) + defer lgr.Close() + + bcInfo, _ := lgr.GetBlockchainInfo() + assert.Equal(t, &common.BlockchainInfo{ + Height: 1, CurrentBlockHash: gbHash, PreviousBlockHash: nil, + }, bcInfo) + + sampleData := sampleDataWithPvtdataForAllTxs(t, bg) + dataBeforeCrash := sampleData[0:3] + dataAtCrash := sampleData[3] + + for _, sampleDatum := range dataBeforeCrash { + assert.NoError(t, lgr.(*kvLedger).commitToPvtAndBlockStore(sampleDatum)) + } + blockNumAtCrash := dataAtCrash.Block.Header.Number + var pvtdataAtCrash []*ledger.TxPvtData + for _, p := range dataAtCrash.PvtData { + pvtdataAtCrash = append(pvtdataAtCrash, p) + } + // call Commit on pvt data store and mimic a crash before committing the block to block store + lgr.(*kvLedger).pvtdataStore.Commit(blockNumAtCrash, pvtdataAtCrash, nil) + + // Now, assume that peer fails here before committing the block to blockstore. + lgr.Close() + provider.Close() + + // mimic peer restart + provider1 := testutilNewProvider(conf, t, ccInfoProvider) + defer provider1.Close() + lgr1, err := provider1.Open(ledgerID) + assert.NoError(t, err) + defer lgr1.Close() + + isPvtStoreAhead, err := lgr1.(*kvLedger).isPvtDataStoreAheadOfBlockStore() + assert.NoError(t, err) + assert.True(t, isPvtStoreAhead) + + // When starting the storage after a crash, we should be able to fetch the pvtData from pvtStore + testVerifyPvtData(t, lgr1, blockNumAtCrash, dataAtCrash.PvtData) + bcInfo, err = lgr.GetBlockchainInfo() + assert.NoError(t, err) + assert.Equal(t, blockNumAtCrash, bcInfo.Height) + + // we should be able to write the last block again + // to ensure that the pvtdataStore is not updated, we send a different pvtData for + // the same block such that we can retrieve the pvtData and compare. + expectedPvtData := dataAtCrash.PvtData + dataAtCrash.PvtData = make(ledger.TxPvtDataMap) + dataAtCrash.PvtData[0] = &ledger.TxPvtData{ + SeqInBlock: 0, + WriteSet: &rwset.TxPvtReadWriteSet{ + NsPvtRwset: []*rwset.NsPvtReadWriteSet{ + { + Namespace: "ns-1", + CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{ + { + CollectionName: "coll-1", + Rwset: []byte("pvtdata"), + }, + }, + }, + }, + }, + } + assert.NoError(t, lgr1.(*kvLedger).commitToPvtAndBlockStore(dataAtCrash)) + testVerifyPvtData(t, lgr1, blockNumAtCrash, expectedPvtData) + bcInfo, err = lgr1.GetBlockchainInfo() + assert.NoError(t, err) + assert.Equal(t, blockNumAtCrash+1, bcInfo.Height) + + isPvtStoreAhead, err = lgr1.(*kvLedger).isPvtDataStoreAheadOfBlockStore() + assert.NoError(t, err) + assert.False(t, isPvtStoreAhead) +} + +func testVerifyPvtData(t *testing.T, ledger ledger.PeerLedger, blockNum uint64, expectedPvtData lgr.TxPvtDataMap) { + pvtdata, err := ledger.GetPvtDataByNum(blockNum, nil) + assert.NoError(t, err) + constructed := constructPvtdataMap(pvtdata) + assert.Equal(t, len(expectedPvtData), len(constructed)) + for k, v := range expectedPvtData { + ov, ok := constructed[k] + assert.True(t, ok) + assert.Equal(t, v.SeqInBlock, ov.SeqInBlock) + assert.True(t, proto.Equal(v.WriteSet, ov.WriteSet)) + } +} + +func TestPvtStoreAheadOfBlockStore(t *testing.T) { + conf, cleanup := testConfig(t) + defer cleanup() + ccInfoProvider := &mock.DeployedChaincodeInfoProvider{} + ccInfoProvider.CollectionInfoReturns(&peer.StaticCollectionConfig{BlockToLive: 0}, nil) + provider := testutilNewProvider(conf, t, ccInfoProvider) + defer provider.Close() + + ledgerID := "testLedger" + bg, gb := testutil.NewBlockGenerator(t, ledgerID, false) + gbHash := protoutil.BlockHeaderHash(gb.Header) + lgr, err := provider.Create(gb) + assert.NoError(t, err) + defer lgr.Close() + + bcInfo, _ := lgr.GetBlockchainInfo() + assert.Equal(t, &common.BlockchainInfo{ + Height: 1, CurrentBlockHash: gbHash, PreviousBlockHash: nil, + }, bcInfo) + + // when both stores contain genesis block only, isPvtstoreAheadOfBlockstore should be false + kvlgr := lgr.(*kvLedger) + isPvtStoreAhead, err := kvlgr.isPvtDataStoreAheadOfBlockStore() + assert.NoError(t, err) + assert.False(t, isPvtStoreAhead) + + sampleData := sampleDataWithPvtdataForSelectiveTx(t, bg) + for _, d := range sampleData[0:9] { // commit block number 0 to 8 + assert.NoError(t, kvlgr.commitToPvtAndBlockStore(d)) + } + + isPvtStoreAhead, err = kvlgr.isPvtDataStoreAheadOfBlockStore() + assert.NoError(t, err) + assert.False(t, isPvtStoreAhead) + + // close and reopen. + lgr.Close() + provider.Close() + + provider1 := testutilNewProvider(conf, t, ccInfoProvider) + defer provider1.Close() + lgr1, err := provider1.Open(ledgerID) + assert.NoError(t, err) + defer lgr1.Close() + kvlgr = lgr1.(*kvLedger) + + // as both stores are at the same block height, isPvtstoreAheadOfBlockstore should be false + info, err := lgr1.GetBlockchainInfo() + assert.NoError(t, err) + assert.Equal(t, uint64(10), info.Height) + pvtStoreHt, err := kvlgr.pvtdataStore.LastCommittedBlockHeight() + assert.NoError(t, err) + assert.Equal(t, uint64(10), pvtStoreHt) + isPvtStoreAhead, err = kvlgr.isPvtDataStoreAheadOfBlockStore() + assert.NoError(t, err) + assert.False(t, isPvtStoreAhead) + + lastBlkAndPvtData := sampleData[9] + // Add the last block directly to the pvtdataStore but not to blockstore. This would make + // the pvtdatastore height greater than the block store height. + validTxPvtData, validTxMissingPvtData := constructPvtDataAndMissingData(lastBlkAndPvtData) + err = kvlgr.pvtdataStore.Commit(lastBlkAndPvtData.Block.Header.Number, validTxPvtData, validTxMissingPvtData) + assert.NoError(t, err) + + // close and reopen. + lgr1.Close() + provider1.Close() + + provider2 := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) + defer provider2.Close() + lgr2, err := provider2.Open(ledgerID) + assert.NoError(t, err) + defer lgr2.Close() + kvlgr = lgr2.(*kvLedger) + + // pvtdataStore should be ahead of blockstore + info, err = lgr2.GetBlockchainInfo() + assert.NoError(t, err) + assert.Equal(t, uint64(10), info.Height) + pvtStoreHt, err = kvlgr.pvtdataStore.LastCommittedBlockHeight() + assert.NoError(t, err) + assert.Equal(t, uint64(11), pvtStoreHt) + isPvtStoreAhead, err = kvlgr.isPvtDataStoreAheadOfBlockStore() + assert.NoError(t, err) + assert.True(t, isPvtStoreAhead) + + // bring the height of BlockStore equal to pvtdataStore + assert.NoError(t, kvlgr.commitToPvtAndBlockStore(lastBlkAndPvtData)) + info, err = lgr2.GetBlockchainInfo() + assert.NoError(t, err) + assert.Equal(t, uint64(11), info.Height) + pvtStoreHt, err = kvlgr.pvtdataStore.LastCommittedBlockHeight() + assert.NoError(t, err) + assert.Equal(t, uint64(11), pvtStoreHt) + isPvtStoreAhead, err = kvlgr.isPvtDataStoreAheadOfBlockStore() + assert.NoError(t, err) + assert.False(t, isPvtStoreAhead) +} + +func TestCommitToPvtAndBlockstoreError(t *testing.T) { + conf, cleanup := testConfig(t) + defer cleanup() + ccInfoProvider := &mock.DeployedChaincodeInfoProvider{} + ccInfoProvider.CollectionInfoReturns(&peer.StaticCollectionConfig{BlockToLive: 0}, nil) + provider1 := testutilNewProvider(conf, t, ccInfoProvider) + defer provider1.Close() + + ledgerID := "testLedger" + bg, gb := testutil.NewBlockGenerator(t, ledgerID, false) + gbHash := protoutil.BlockHeaderHash(gb.Header) + lgr1, err := provider1.Create(gb) + assert.NoError(t, err) + defer lgr1.Close() + + bcInfo, _ := lgr1.GetBlockchainInfo() + assert.Equal(t, &common.BlockchainInfo{ + Height: 1, CurrentBlockHash: gbHash, PreviousBlockHash: nil, + }, bcInfo) + + kvlgr := lgr1.(*kvLedger) + sampleData := sampleDataWithPvtdataForSelectiveTx(t, bg) + for _, d := range sampleData[0:9] { // commit block number 1 to 9 + assert.NoError(t, kvlgr.commitToPvtAndBlockStore(d)) + } + + // try to write the last block again. The function should return an + // error from the private data store. + err = kvlgr.commitToPvtAndBlockStore(sampleData[8]) // block 9 + assert.EqualError(t, err, "Expected block number=10, received block number=9") + + lastBlkAndPvtData := sampleData[9] // block 10 + // Add the block directly to blockstore + kvlgr.blockStore.AddBlock(lastBlkAndPvtData.Block) + // Adding the same block should cause passing on the error caused by the block storgae + err = kvlgr.commitToPvtAndBlockStore(lastBlkAndPvtData) + assert.EqualError(t, err, "block number should have been 11 but was 10") + // At the end, the pvt store status should be changed + pvtStoreCommitHt, err := kvlgr.pvtdataStore.LastCommittedBlockHeight() + assert.NoError(t, err) + assert.Equal(t, uint64(11), pvtStoreCommitHt) +} + +func sampleDataWithPvtdataForSelectiveTx(t *testing.T, bg *testutil.BlockGenerator) []*ledger.BlockAndPvtData { + var blockAndpvtdata []*ledger.BlockAndPvtData + blocks := bg.NextTestBlocks(10) + for i := 0; i < 10; i++ { + blockAndpvtdata = append(blockAndpvtdata, &ledger.BlockAndPvtData{Block: blocks[i]}) + } + + // txNum 3, 5, 6 in block 2 has pvtdata but txNum 6 is invalid + blockAndpvtdata[2].PvtData = samplePvtData(t, []uint64{3, 5, 6}) + txFilter := txflags.ValidationFlags(blockAndpvtdata[2].Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) + txFilter.SetFlag(6, pb.TxValidationCode_INVALID_WRITESET) + blockAndpvtdata[2].Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txFilter + + // txNum 4, 6 in block 3 has pvtdata + blockAndpvtdata[3].PvtData = samplePvtData(t, []uint64{4, 6}) + + // txNum 4, 5 in block 5 has missing pvt data but txNum 5 is invalid + missingData := make(ledger.TxMissingPvtDataMap) + missingData.Add(4, "ns-4", "coll-4", true) + missingData.Add(5, "ns-5", "coll-5", true) + blockAndpvtdata[5].MissingPvtData = missingData + txFilter = txflags.ValidationFlags(blockAndpvtdata[5].Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) + txFilter.SetFlag(5, pb.TxValidationCode_INVALID_WRITESET) + blockAndpvtdata[5].Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txFilter + + return blockAndpvtdata +} + +func sampleDataWithPvtdataForAllTxs(t *testing.T, bg *testutil.BlockGenerator) []*ledger.BlockAndPvtData { + var blockAndpvtdata []*ledger.BlockAndPvtData + blocks := bg.NextTestBlocks(10) + for i := 0; i < 10; i++ { + blockAndpvtdata = append(blockAndpvtdata, + &ledger.BlockAndPvtData{ + Block: blocks[i], + PvtData: samplePvtData(t, []uint64{uint64(i), uint64(i + 1)}), + }, + ) + } + return blockAndpvtdata +} + +func samplePvtData(t *testing.T, txNums []uint64) map[uint64]*ledger.TxPvtData { + pvtWriteSet := &rwset.TxPvtReadWriteSet{DataModel: rwset.TxReadWriteSet_KV} + pvtWriteSet.NsPvtRwset = []*rwset.NsPvtReadWriteSet{ + { + Namespace: "ns-1", + CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{ + { + CollectionName: "coll-1", + Rwset: []byte("RandomBytes-PvtRWSet-ns1-coll1"), + }, + { + CollectionName: "coll-2", + Rwset: []byte("RandomBytes-PvtRWSet-ns1-coll2"), + }, + }, + }, + } + var pvtData []*ledger.TxPvtData + for _, txNum := range txNums { + pvtData = append(pvtData, &ledger.TxPvtData{SeqInBlock: txNum, WriteSet: pvtWriteSet}) + } + return constructPvtdataMap(pvtData) +} + +func btlPolicyForSampleData() pvtdatapolicy.BTLPolicy { + return btltestutil.SampleBTLPolicy( + map[[2]string]uint64{ + {"ns-1", "coll-1"}: 0, + {"ns-1", "coll-2"}: 0, + }, + ) +} + func prepareNextBlockWithMissingPvtDataForTest(t *testing.T, l lgr.PeerLedger, bg *testutil.BlockGenerator, txid string, pubKVs map[string]string, pvtKVs map[string]string) (*lgr.BlockAndPvtData, *lgr.TxPvtData) { diff --git a/core/ledger/kvledger/pause_resume_test.go b/core/ledger/kvledger/pause_resume_test.go index 24c11702f65..33a0637cf2e 100644 --- a/core/ledger/kvledger/pause_resume_test.go +++ b/core/ledger/kvledger/pause_resume_test.go @@ -12,6 +12,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric-protos-go/common" configtxtest "github.com/hyperledger/fabric/common/configtx/test" + "github.com/hyperledger/fabric/core/ledger/mock" "github.com/stretchr/testify/require" ) @@ -19,7 +20,7 @@ func TestPauseAndResume(t *testing.T) { conf, cleanup := testConfig(t) conf.HistoryDBConfig.Enabled = false defer cleanup() - provider := testutilNewProvider(conf, t) + provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) numLedgers := 10 activeLedgerIDs, err := provider.List() @@ -46,7 +47,7 @@ func TestPauseAndResume(t *testing.T) { err = PauseChannel(conf.RootFSPath, constructTestLedgerID(1)) require.NoError(t, err) // verify ledger status after pause - provider = testutilNewProvider(conf, t) + provider = testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) assertLedgerStatus(t, provider, genesisBlocks, numLedgers, pausedLedgers) provider.Close() @@ -61,7 +62,7 @@ func TestPauseAndResume(t *testing.T) { require.NoError(t, err) // verify ledger status after resume pausedLedgersAfterResume := []int{3} - provider = testutilNewProvider(conf, t) + provider = testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) defer provider.Close() assertLedgerStatus(t, provider, genesisBlocks, numLedgers, pausedLedgersAfterResume) @@ -74,7 +75,7 @@ func TestPauseAndResumeErrors(t *testing.T) { conf, cleanup := testConfig(t) conf.HistoryDBConfig.Enabled = false defer cleanup() - provider := testutilNewProvider(conf, t) + provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) ledgerID := constructTestLedgerID(0) genesisBlock, _ := configtxtest.MakeGenesisBlock(ledgerID) diff --git a/core/ledger/kvledger/rebuild_dbs_test.go b/core/ledger/kvledger/rebuild_dbs_test.go index b482b591fc5..1a85b2ee11a 100644 --- a/core/ledger/kvledger/rebuild_dbs_test.go +++ b/core/ledger/kvledger/rebuild_dbs_test.go @@ -12,13 +12,14 @@ import ( "testing" configtxtest "github.com/hyperledger/fabric/common/configtx/test" + "github.com/hyperledger/fabric/core/ledger/mock" "github.com/stretchr/testify/require" ) func TestRebuildDBs(t *testing.T) { conf, cleanup := testConfig(t) defer cleanup() - provider := testutilNewProvider(conf, t) + provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) numLedgers := 3 for i := 0; i < numLedgers; i++ { diff --git a/core/ledger/kvledger/rollback.go b/core/ledger/kvledger/rollback.go index 546b07da70a..c02c950dc62 100644 --- a/core/ledger/kvledger/rollback.go +++ b/core/ledger/kvledger/rollback.go @@ -7,8 +7,9 @@ SPDX-License-Identifier: Apache-2.0 package kvledger import ( + "github.com/hyperledger/fabric/common/ledger/blkstorage" + "github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage" "github.com/hyperledger/fabric/common/ledger/util/leveldbhelper" - "github.com/hyperledger/fabric/core/ledger/ledgerstorage" "github.com/pkg/errors" ) @@ -23,7 +24,7 @@ func RollbackKVLedger(rootFSPath, ledgerID string, blockNum uint64) error { defer fileLock.Unlock() blockstorePath := BlockStorePath(rootFSPath) - if err := ledgerstorage.ValidateRollbackParams(blockstorePath, ledgerID, blockNum); err != nil { + if err := fsblkstorage.ValidateRollbackParams(blockstorePath, ledgerID, blockNum); err != nil { return err } @@ -33,7 +34,8 @@ func RollbackKVLedger(rootFSPath, ledgerID string, blockNum uint64) error { } logger.Info("Rolling back ledger store") - if err := ledgerstorage.Rollback(blockstorePath, ledgerID, blockNum); err != nil { + indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex} + if err := fsblkstorage.Rollback(blockstorePath, ledgerID, blockNum, indexConfig); err != nil { return err } logger.Infof("The channel [%s] has been successfully rolled back to the block number [%d]", ledgerID, blockNum) diff --git a/core/ledger/kvledger/state_listener_test.go b/core/ledger/kvledger/state_listener_test.go index 5a90942147f..8ed8985e033 100644 --- a/core/ledger/kvledger/state_listener_test.go +++ b/core/ledger/kvledger/state_listener_test.go @@ -147,6 +147,10 @@ type mockStateListener struct { queryResultsInInitializeFunc []*queryresult.KV } +func (l *mockStateListener) Name() string { + return "mock state listener" +} + func (l *mockStateListener) Initialize(ledgerID string, qe ledger.SimpleQueryExecutor) error { _, err := qe.GetPrivateDataHash(l.namespace, "random-coll", "random-key") if err != nil { diff --git a/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go b/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go index ab25e3b2edd..50ee4abde4b 100644 --- a/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go +++ b/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go @@ -455,7 +455,7 @@ func (txmgr *LockBasedTxMgr) invokeNamespaceListeners() error { if err := listener.HandleStateUpdates(trigger); err != nil { return err } - logger.Debugf("Invoking listener for state changes:%s", listener) + logger.Debugf("Invoking listener for state changes:%s", listener.Name()) } return nil } diff --git a/core/ledger/kvledger/upgrade_dbs_test.go b/core/ledger/kvledger/upgrade_dbs_test.go index 328e0177e36..58d5e7d18b4 100644 --- a/core/ledger/kvledger/upgrade_dbs_test.go +++ b/core/ledger/kvledger/upgrade_dbs_test.go @@ -15,13 +15,14 @@ import ( "github.com/hyperledger/fabric/common/ledger/dataformat" "github.com/hyperledger/fabric/common/ledger/testutil" "github.com/hyperledger/fabric/common/ledger/util/leveldbhelper" + "github.com/hyperledger/fabric/core/ledger/mock" "github.com/stretchr/testify/require" ) func TestUpgradeDBs(t *testing.T) { conf, cleanup := testConfig(t) defer cleanup() - provider := testutilNewProvider(conf, t) + provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) // upgrade should fail when provider is still open err := UpgradeDBs(conf.RootFSPath) @@ -69,7 +70,7 @@ func TestUpgradeIDStoreWrongFormat(t *testing.T) { conf, cleanup := testConfig(t) conf.HistoryDBConfig.Enabled = false defer cleanup() - provider := testutilNewProvider(conf, t) + provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{}) // change format to a wrong value err := provider.idStore.db.Put(formatKey, []byte("x.0"), true) diff --git a/core/ledger/ledger_interface.go b/core/ledger/ledger_interface.go index fbfde772b92..15526a5a651 100644 --- a/core/ledger/ledger_interface.go +++ b/core/ledger/ledger_interface.go @@ -463,6 +463,7 @@ func (txSim *TxSimulationResults) ContainsPvtWrites() bool { // and result in a panic. // The function Initialize is invoked only once at the time of opening the ledger. type StateListener interface { + Name() string Initialize(ledgerID string, qe SimpleQueryExecutor) error InterestedInNamespaces() []string HandleStateUpdates(trigger *StateUpdateTrigger) error diff --git a/core/ledger/ledgerstorage/store.go b/core/ledger/ledgerstorage/store.go deleted file mode 100644 index 137b26455c8..00000000000 --- a/core/ledger/ledgerstorage/store.go +++ /dev/null @@ -1,307 +0,0 @@ -/* -Copyright IBM Corp. All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package ledgerstorage - -import ( - "sync" - "sync/atomic" - - "github.com/hyperledger/fabric-protos-go/common" - "github.com/hyperledger/fabric/common/flogging" - "github.com/hyperledger/fabric/common/ledger/blkstorage" - "github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage" - "github.com/hyperledger/fabric/common/metrics" - "github.com/hyperledger/fabric/core/ledger" - "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy" - "github.com/hyperledger/fabric/core/ledger/pvtdatastorage" -) - -const maxBlockFileSize = 64 * 1024 * 1024 - -var logger = flogging.MustGetLogger("ledgerstorage") - -// Provider encapsulates two providers 1) block store provider and 2) and pvt data store provider -type Provider struct { - blkStoreProvider blkstorage.BlockStoreProvider - pvtdataStoreProvider pvtdatastorage.Provider -} - -// Store encapsulates two stores 1) block store and pvt data store -type Store struct { - blkstorage.BlockStore - pvtdataStore pvtdatastorage.Store - rwlock sync.RWMutex - isPvtstoreAheadOfBlockstore atomic.Value -} - -var attrsToIndex = []blkstorage.IndexableAttr{ - blkstorage.IndexableAttrBlockHash, - blkstorage.IndexableAttrBlockNum, - blkstorage.IndexableAttrTxID, - blkstorage.IndexableAttrBlockNumTranNum, -} - -// NewProvider returns the handle to the provider -func NewProvider(blockStoreDir string, conf *pvtdatastorage.PrivateDataConfig, metricsProvider metrics.Provider) (*Provider, error) { - // Initialize the block storage - indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex} - blockStoreProvider, err := fsblkstorage.NewProvider( - fsblkstorage.NewConf( - blockStoreDir, - maxBlockFileSize, - ), - indexConfig, - metricsProvider, - ) - if err != nil { - return nil, err - } - pvtStoreProvider, err := pvtdatastorage.NewProvider(conf) - if err != nil { - return nil, err - } - return &Provider{blockStoreProvider, pvtStoreProvider}, nil -} - -// Open opens the store -func (p *Provider) Open(ledgerid string) (*Store, error) { - var blockStore blkstorage.BlockStore - var pvtdataStore pvtdatastorage.Store - var err error - - if blockStore, err = p.blkStoreProvider.OpenBlockStore(ledgerid); err != nil { - return nil, err - } - if pvtdataStore, err = p.pvtdataStoreProvider.OpenStore(ledgerid); err != nil { - return nil, err - } - store := &Store{ - BlockStore: blockStore, - pvtdataStore: pvtdataStore, - } - - info, err := blockStore.GetBlockchainInfo() - if err != nil { - return nil, err - } - pvtstoreHeight, err := pvtdataStore.LastCommittedBlockHeight() - if err != nil { - return nil, err - } - store.isPvtstoreAheadOfBlockstore.Store(pvtstoreHeight > info.Height) - - return store, nil -} - -// Close closes the provider -func (p *Provider) Close() { - p.blkStoreProvider.Close() - p.pvtdataStoreProvider.Close() -} - -// Exists checks whether the ledgerID already presents -func (p *Provider) Exists(ledgerID string) (bool, error) { - return p.blkStoreProvider.Exists(ledgerID) -} - -// Init initializes store with essential configurations -func (s *Store) Init(btlPolicy pvtdatapolicy.BTLPolicy) { - s.pvtdataStore.Init(btlPolicy) -} - -// CommitWithPvtData commits the block and the corresponding pvt data in an atomic operation -func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error { - blockNum := blockAndPvtdata.Block.Header.Number - s.rwlock.Lock() - defer s.rwlock.Unlock() - - pvtBlkStoreHt, err := s.pvtdataStore.LastCommittedBlockHeight() - if err != nil { - return err - } - - if pvtBlkStoreHt < blockNum+1 { // The pvt data store sanity check does not allow rewriting the pvt data. - // when re-processing blocks (rejoin the channel or re-fetching last few block), - // skip the pvt data commit to the pvtdata blockstore - logger.Debugf("Writing block [%d] to pvt block store", blockNum) - // If a state fork occurs during a regular block commit, - // we have a mechanism to drop all blocks followed by refetching of blocks - // and re-processing them. In the current way of doing this, we only drop - // the block files (and related artifacts) but we do not drop/overwrite the - // pvtdatastorage as it might leads to data loss. - // During block reprocessing, as there is a possibility of an invalid pvtdata - // transaction to become valid, we store the pvtdata of invalid transactions - // too in the pvtdataStore as we do for the publicdata in the case of blockStore. - pvtData, missingPvtData := constructPvtDataAndMissingData(blockAndPvtdata) - if err := s.pvtdataStore.Commit(blockAndPvtdata.Block.Header.Number, pvtData, missingPvtData); err != nil { - return err - } - } else { - logger.Debugf("Skipping writing block [%d] to pvt block store as the store height is [%d]", blockNum, pvtBlkStoreHt) - } - - if err := s.AddBlock(blockAndPvtdata.Block); err != nil { - return err - } - - if pvtBlkStoreHt == blockNum+1 { - // we reach here only when the pvtdataStore was ahead - // of blockStore during the store opening time (would - // occur after a peer rollback/reset). - s.isPvtstoreAheadOfBlockstore.Store(false) - } - - return nil -} - -func constructPvtDataAndMissingData(blockAndPvtData *ledger.BlockAndPvtData) ([]*ledger.TxPvtData, - ledger.TxMissingPvtDataMap) { - - var pvtData []*ledger.TxPvtData - missingPvtData := make(ledger.TxMissingPvtDataMap) - - numTxs := uint64(len(blockAndPvtData.Block.Data.Data)) - - // for all tx, construct pvtdata and missing pvtdata list - for txNum := uint64(0); txNum < numTxs; txNum++ { - if pvtdata, ok := blockAndPvtData.PvtData[txNum]; ok { - pvtData = append(pvtData, pvtdata) - } - - if missingData, ok := blockAndPvtData.MissingPvtData[txNum]; ok { - for _, missing := range missingData { - missingPvtData.Add(txNum, missing.Namespace, - missing.Collection, missing.IsEligible) - } - } - } - return pvtData, missingPvtData -} - -// CommitPvtDataOfOldBlocks commits the pvtData of old blocks -func (s *Store) CommitPvtDataOfOldBlocks(blocksPvtData map[uint64][]*ledger.TxPvtData) error { - err := s.pvtdataStore.CommitPvtDataOfOldBlocks(blocksPvtData) - if err != nil { - return err - } - return nil -} - -// GetPvtDataAndBlockByNum returns the block and the corresponding pvt data. -// The pvt data is filtered by the list of 'collections' supplied -func (s *Store) GetPvtDataAndBlockByNum(blockNum uint64, filter ledger.PvtNsCollFilter) (*ledger.BlockAndPvtData, error) { - s.rwlock.RLock() - defer s.rwlock.RUnlock() - - var block *common.Block - var pvtdata []*ledger.TxPvtData - var err error - if block, err = s.RetrieveBlockByNumber(blockNum); err != nil { - return nil, err - } - if pvtdata, err = s.getPvtDataByNumWithoutLock(blockNum, filter); err != nil { - return nil, err - } - return &ledger.BlockAndPvtData{Block: block, PvtData: constructPvtdataMap(pvtdata)}, nil -} - -// GetPvtDataByNum returns only the pvt data corresponding to the given block number -// The pvt data is filtered by the list of 'ns/collections' supplied in the filter -// A nil filter does not filter any results -func (s *Store) GetPvtDataByNum(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) { - s.rwlock.RLock() - defer s.rwlock.RUnlock() - return s.getPvtDataByNumWithoutLock(blockNum, filter) -} - -// getPvtDataByNumWithoutLock returns only the pvt data corresponding to the given block number. -// This function does not acquire a readlock and it is expected that in most of the circumstances, the caller -// possesses a read lock on `s.rwlock` -func (s *Store) getPvtDataByNumWithoutLock(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) { - var pvtdata []*ledger.TxPvtData - var err error - if pvtdata, err = s.pvtdataStore.GetPvtDataByBlockNum(blockNum, filter); err != nil { - return nil, err - } - return pvtdata, nil -} - -// DoesPvtDataInfoExist returns true when -// (1) the ledger has pvtdata associated with the given block number (or) -// (2) a few or all pvtdata associated with the given block number is missing but the -// missing info is recorded in the ledger (or) -// (3) the block is committed does not contain any pvtData. -func (s *Store) DoesPvtDataInfoExist(blockNum uint64) (bool, error) { - pvtStoreHt, err := s.pvtdataStore.LastCommittedBlockHeight() - if err != nil { - return false, err - } - return blockNum+1 <= pvtStoreHt, nil -} - -// GetMissingPvtDataInfoForMostRecentBlocks invokes the function on underlying pvtdata store -func (s *Store) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledger.MissingPvtDataInfo, error) { - // it is safe to not acquire a read lock on s.rwlock. Without a lock, the value of - // lastCommittedBlock can change due to a new block commit. As a result, we may not - // be able to fetch the missing data info of truly the most recent blocks. This - // decision was made to ensure that the regular block commit rate is not affected. - return s.pvtdataStore.GetMissingPvtDataInfoForMostRecentBlocks(maxBlock) -} - -// ProcessCollsEligibilityEnabled invokes the function on underlying pvtdata store -func (s *Store) ProcessCollsEligibilityEnabled(committingBlk uint64, nsCollMap map[string][]string) error { - return s.pvtdataStore.ProcessCollsEligibilityEnabled(committingBlk, nsCollMap) -} - -// GetLastUpdatedOldBlocksPvtData invokes the function on underlying pvtdata store -func (s *Store) GetLastUpdatedOldBlocksPvtData() (map[uint64][]*ledger.TxPvtData, error) { - return s.pvtdataStore.GetLastUpdatedOldBlocksPvtData() -} - -// ResetLastUpdatedOldBlocksList invokes the function on underlying pvtdata store -func (s *Store) ResetLastUpdatedOldBlocksList() error { - return s.pvtdataStore.ResetLastUpdatedOldBlocksList() -} - -// IsPvtStoreAheadOfBlockStore returns true when the pvtStore height is -// greater than the blockstore height. Otherwise, it returns false. -func (s *Store) IsPvtStoreAheadOfBlockStore() bool { - return s.isPvtstoreAheadOfBlockstore.Load().(bool) -} - -func constructPvtdataMap(pvtdata []*ledger.TxPvtData) ledger.TxPvtDataMap { - if pvtdata == nil { - return nil - } - m := make(map[uint64]*ledger.TxPvtData) - for _, pvtdatum := range pvtdata { - m[pvtdatum.SeqInBlock] = pvtdatum - } - return m -} - -// LoadPreResetHeight returns the pre reset height for the specified ledgers. -func LoadPreResetHeight(blockstorePath string, ledgerIDs []string) (map[string]uint64, error) { - return fsblkstorage.LoadPreResetHeight(blockstorePath, ledgerIDs) -} - -// ResetBlockStore resets all ledgers to the genesis block. -func ResetBlockStore(blockstorePath string) error { - return fsblkstorage.ResetBlockStore(blockstorePath) -} - -// ValidateRollbackParams performs necessary validation on the input given for -// the rollback operation. -func ValidateRollbackParams(blockstorePath, ledgerID string, blockNum uint64) error { - return fsblkstorage.ValidateRollbackParams(blockstorePath, ledgerID, blockNum) -} - -// Rollback reverts changes made to the block store beyond a given block number. -func Rollback(blockstorePath, ledgerID string, blockNum uint64) error { - indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex} - return fsblkstorage.Rollback(blockstorePath, ledgerID, blockNum, indexConfig) -} diff --git a/core/ledger/ledgerstorage/store_test.go b/core/ledger/ledgerstorage/store_test.go deleted file mode 100644 index 912b9612184..00000000000 --- a/core/ledger/ledgerstorage/store_test.go +++ /dev/null @@ -1,436 +0,0 @@ -/* -Copyright IBM Corp. All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package ledgerstorage - -import ( - "io/ioutil" - "os" - "path/filepath" - "testing" - - "github.com/golang/protobuf/proto" - "github.com/hyperledger/fabric-protos-go/common" - "github.com/hyperledger/fabric-protos-go/ledger/rwset" - pb "github.com/hyperledger/fabric-protos-go/peer" - "github.com/hyperledger/fabric/common/flogging" - "github.com/hyperledger/fabric/common/ledger/testutil" - "github.com/hyperledger/fabric/common/metrics/disabled" - "github.com/hyperledger/fabric/core/ledger" - "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy" - btltestutil "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy/testutil" - "github.com/hyperledger/fabric/core/ledger/pvtdatastorage" - "github.com/hyperledger/fabric/internal/pkg/txflags" - "github.com/stretchr/testify/assert" -) - -var metricsProvider = &disabled.Provider{} - -func TestMain(m *testing.M) { - flogging.ActivateSpec("ledgerstorage,pvtdatastorage=debug") - os.Exit(m.Run()) -} - -func TestStore(t *testing.T) { - storeDir, err := ioutil.TempDir("", "lstore") - if err != nil { - t.Fatalf("Failed to create ledger storage directory: %s", err) - } - defer os.RemoveAll(storeDir) - conf := buildPrivateDataConfig(storeDir) - blockStoreDir := filepath.Join(storeDir, "chains") - provider, err := NewProvider(blockStoreDir, conf, metricsProvider) - assert.NoError(t, err) - defer provider.Close() - store, err := provider.Open("testLedger") - store.Init(btlPolicyForSampleData()) - defer store.Shutdown() - - assert.NoError(t, err) - sampleData := sampleDataWithPvtdataForSelectiveTx(t) - for _, sampleDatum := range sampleData { - assert.NoError(t, store.CommitWithPvtData(sampleDatum)) - } - - // block 1 has no pvt data - pvtdata, err := store.GetPvtDataByNum(1, nil) - assert.NoError(t, err) - assert.Nil(t, pvtdata) - - // block 4 has no pvt data - pvtdata, err = store.GetPvtDataByNum(4, nil) - assert.NoError(t, err) - assert.Nil(t, pvtdata) - - // block 2 has pvt data for tx 3, 5 and 6. Though the tx 6 - // is marked as invalid in the block, the pvtData should - // have been stored - pvtdata, err = store.GetPvtDataByNum(2, nil) - assert.NoError(t, err) - assert.Equal(t, 3, len(pvtdata)) - assert.Equal(t, uint64(3), pvtdata[0].SeqInBlock) - assert.Equal(t, uint64(5), pvtdata[1].SeqInBlock) - assert.Equal(t, uint64(6), pvtdata[2].SeqInBlock) - - // block 3 has pvt data for tx 4 and 6 only - pvtdata, err = store.GetPvtDataByNum(3, nil) - assert.NoError(t, err) - assert.Equal(t, 2, len(pvtdata)) - assert.Equal(t, uint64(4), pvtdata[0].SeqInBlock) - assert.Equal(t, uint64(6), pvtdata[1].SeqInBlock) - - blockAndPvtdata, err := store.GetPvtDataAndBlockByNum(2, nil) - assert.NoError(t, err) - assert.True(t, proto.Equal(sampleData[2].Block, blockAndPvtdata.Block)) - - blockAndPvtdata, err = store.GetPvtDataAndBlockByNum(3, nil) - assert.NoError(t, err) - assert.True(t, proto.Equal(sampleData[3].Block, blockAndPvtdata.Block)) - - // pvt data retrieval for block 3 with filter should return filtered pvtdata - filter := ledger.NewPvtNsCollFilter() - filter.Add("ns-1", "coll-1") - blockAndPvtdata, err = store.GetPvtDataAndBlockByNum(3, filter) - assert.NoError(t, err) - assert.Equal(t, sampleData[3].Block, blockAndPvtdata.Block) - // two transactions should be present - assert.Equal(t, 2, len(blockAndPvtdata.PvtData)) - // both tran number 4 and 6 should have only one collection because of filter - assert.Equal(t, 1, len(blockAndPvtdata.PvtData[4].WriteSet.NsPvtRwset)) - assert.Equal(t, 1, len(blockAndPvtdata.PvtData[6].WriteSet.NsPvtRwset)) - // any other transaction entry should be nil - assert.Nil(t, blockAndPvtdata.PvtData[2]) - - // test missing data retrieval in the presence of invalid tx. Block 5 had - // missing data (for tx4 and tx5). Though tx5 was marked as invalid tx, - // both tx4 and tx5 missing data should be returned - expectedMissingDataInfo := make(ledger.MissingPvtDataInfo) - expectedMissingDataInfo.Add(5, 4, "ns-4", "coll-4") - expectedMissingDataInfo.Add(5, 5, "ns-5", "coll-5") - missingDataInfo, err := store.GetMissingPvtDataInfoForMostRecentBlocks(1) - assert.NoError(t, err) - assert.Equal(t, expectedMissingDataInfo, missingDataInfo) -} - -func TestCrashAfterPvtdataStoreCommit(t *testing.T) { - storeDir, err := ioutil.TempDir("", "lstore") - if err != nil { - t.Fatalf("Failed to create ledger storage directory: %s", err) - } - defer os.RemoveAll(storeDir) - conf := buildPrivateDataConfig(storeDir) - blockStoreDir := filepath.Join(storeDir, "chains") - provider, err := NewProvider(blockStoreDir, conf, metricsProvider) - assert.NoError(t, err) - defer provider.Close() - store, err := provider.Open("testLedger") - store.Init(btlPolicyForSampleData()) - defer store.Shutdown() - assert.NoError(t, err) - - sampleData := sampleDataWithPvtdataForAllTxs(t) - dataBeforeCrash := sampleData[0:3] - dataAtCrash := sampleData[3] - - for _, sampleDatum := range dataBeforeCrash { - assert.NoError(t, store.CommitWithPvtData(sampleDatum)) - } - blockNumAtCrash := dataAtCrash.Block.Header.Number - var pvtdataAtCrash []*ledger.TxPvtData - for _, p := range dataAtCrash.PvtData { - pvtdataAtCrash = append(pvtdataAtCrash, p) - } - // call Commit on pvt data store and mimic a crash before committing the block to block store - store.pvtdataStore.Commit(blockNumAtCrash, pvtdataAtCrash, nil) - store.Shutdown() - provider.Close() - provider, err = NewProvider(blockStoreDir, conf, metricsProvider) - assert.NoError(t, err) - store, err = provider.Open("testLedger") - assert.NoError(t, err) - store.Init(btlPolicyForSampleData()) - - // When starting the storage after a crash, we should be able to fetch the pvtData from pvtStore - testVerifyPvtData(t, store, blockNumAtCrash, dataAtCrash.PvtData) - bcInfo, err := store.GetBlockchainInfo() - assert.NoError(t, err) - assert.Equal(t, blockNumAtCrash, bcInfo.Height) - - // we should be able to write the last block again - // to ensure that the pvtdataStore is not updated, we send a different pvtData for - // the same block such that we can retrieve the pvtData and compare. - expectedPvtData := dataAtCrash.PvtData - dataAtCrash.PvtData = make(ledger.TxPvtDataMap) - dataAtCrash.PvtData[0] = &ledger.TxPvtData{ - SeqInBlock: 0, - WriteSet: &rwset.TxPvtReadWriteSet{ - NsPvtRwset: []*rwset.NsPvtReadWriteSet{ - { - Namespace: "ns-1", - CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{ - { - CollectionName: "coll-1", - Rwset: []byte("pvtdata"), - }, - }, - }, - }, - }, - } - assert.NoError(t, store.CommitWithPvtData(dataAtCrash)) - testVerifyPvtData(t, store, blockNumAtCrash, expectedPvtData) - bcInfo, err = store.GetBlockchainInfo() - assert.NoError(t, err) - assert.Equal(t, blockNumAtCrash+1, bcInfo.Height) - -} - -func testVerifyPvtData(t *testing.T, store *Store, blockNum uint64, expectedPvtData ledger.TxPvtDataMap) { - pvtdata, err := store.GetPvtDataByNum(blockNum, nil) - assert.NoError(t, err) - constructed := constructPvtdataMap(pvtdata) - assert.Equal(t, len(expectedPvtData), len(constructed)) - for k, v := range expectedPvtData { - ov, ok := constructed[k] - assert.True(t, ok) - assert.Equal(t, v.SeqInBlock, ov.SeqInBlock) - assert.True(t, proto.Equal(v.WriteSet, ov.WriteSet)) - } -} - -func TestAddAfterPvtdataStoreError(t *testing.T) { - storeDir, err := ioutil.TempDir("", "lstore") - if err != nil { - t.Fatalf("Failed to create ledger storage directory: %s", err) - } - defer os.RemoveAll(storeDir) - conf := buildPrivateDataConfig(storeDir) - blockStoreDir := filepath.Join(storeDir, "chains") - provider, err := NewProvider(blockStoreDir, conf, metricsProvider) - assert.NoError(t, err) - defer provider.Close() - store, err := provider.Open("testLedger") - store.Init(btlPolicyForSampleData()) - defer store.Shutdown() - assert.NoError(t, err) - - sampleData := sampleDataWithPvtdataForAllTxs(t) - for _, d := range sampleData[0:9] { - assert.NoError(t, store.CommitWithPvtData(d)) - } - // try to write the last block again. The function should skip adding block to the private store - // as the pvt store but the block storage should return error - assert.Error(t, store.CommitWithPvtData(sampleData[8])) - - // At the end, the pvt store status should not have changed - pvtStoreCommitHt, err := store.pvtdataStore.LastCommittedBlockHeight() - assert.NoError(t, err) - assert.Equal(t, uint64(9), pvtStoreCommitHt) - - // commit the rightful next block - assert.NoError(t, store.CommitWithPvtData(sampleData[9])) - pvtStoreCommitHt, err = store.pvtdataStore.LastCommittedBlockHeight() - assert.NoError(t, err) - assert.Equal(t, uint64(10), pvtStoreCommitHt) -} - -func TestAddAfterBlkStoreError(t *testing.T) { - storeDir, err := ioutil.TempDir("", "lstore") - if err != nil { - t.Fatalf("Failed to create ledger storage directory: %s", err) - } - defer os.RemoveAll(storeDir) - conf := buildPrivateDataConfig(storeDir) - blockStoreDir := filepath.Join(storeDir, "chains") - provider, err := NewProvider(blockStoreDir, conf, metricsProvider) - assert.NoError(t, err) - defer provider.Close() - store, err := provider.Open("testLedger") - store.Init(btlPolicyForSampleData()) - defer store.Shutdown() - assert.NoError(t, err) - - sampleData := sampleDataWithPvtdataForAllTxs(t) - for _, d := range sampleData[0:9] { - assert.NoError(t, store.CommitWithPvtData(d)) - } - lastBlkAndPvtData := sampleData[9] - // Add the block directly to blockstore - store.BlockStore.AddBlock(lastBlkAndPvtData.Block) - // Adding the same block should cause passing on the error caused by the block storgae - assert.Error(t, store.CommitWithPvtData(lastBlkAndPvtData)) - // At the end, the pvt store status should be changed - pvtStoreCommitHt, err := store.pvtdataStore.LastCommittedBlockHeight() - assert.NoError(t, err) - assert.Equal(t, uint64(10), pvtStoreCommitHt) -} - -func TestPvtStoreAheadOfBlockStore(t *testing.T) { - storeDir, err := ioutil.TempDir("", "lstore") - if err != nil { - t.Fatalf("Failed to create ledger storage directory: %s", err) - } - defer os.RemoveAll(storeDir) - conf := buildPrivateDataConfig(storeDir) - blockStoreDir := filepath.Join(storeDir, "chains") - provider, err := NewProvider(blockStoreDir, conf, metricsProvider) - assert.NoError(t, err) - defer provider.Close() - store, err := provider.Open("testLedger") - store.Init(btlPolicyForSampleData()) - defer store.Shutdown() - assert.NoError(t, err) - - // when both stores are empty, isPvtstoreAheadOfBlockstore should be false - assert.False(t, store.IsPvtStoreAheadOfBlockStore()) - - sampleData := sampleDataWithPvtdataForSelectiveTx(t) - for _, d := range sampleData[0:9] { // commit block number 0 to 8 - assert.NoError(t, store.CommitWithPvtData(d)) - } - assert.False(t, store.IsPvtStoreAheadOfBlockStore()) - - // close and reopen - store.Shutdown() - provider.Close() - provider, err = NewProvider(blockStoreDir, conf, metricsProvider) - assert.NoError(t, err) - store, err = provider.Open("testLedger") - assert.NoError(t, err) - store.Init(btlPolicyForSampleData()) - - // as both stores are at the same block height, isPvtstoreAheadOfBlockstore should be false - info, err := store.GetBlockchainInfo() - assert.NoError(t, err) - assert.Equal(t, uint64(9), info.Height) - pvtStoreHt, err := store.pvtdataStore.LastCommittedBlockHeight() - assert.NoError(t, err) - assert.Equal(t, uint64(9), pvtStoreHt) - assert.False(t, store.IsPvtStoreAheadOfBlockStore()) - - lastBlkAndPvtData := sampleData[9] - // Add the last block directly to the pvtdataStore but not to blockstore. This would make - // the pvtdatastore height greater than the block store height. - validTxPvtData, validTxMissingPvtData := constructPvtDataAndMissingData(lastBlkAndPvtData) - err = store.pvtdataStore.Commit(lastBlkAndPvtData.Block.Header.Number, validTxPvtData, validTxMissingPvtData) - assert.NoError(t, err) - - // close and reopen - store.Shutdown() - provider.Close() - provider, err = NewProvider(blockStoreDir, conf, metricsProvider) - assert.NoError(t, err) - store, err = provider.Open("testLedger") - assert.NoError(t, err) - store.Init(btlPolicyForSampleData()) - - // pvtdataStore should be ahead of blockstore - info, err = store.GetBlockchainInfo() - assert.NoError(t, err) - assert.Equal(t, uint64(9), info.Height) - pvtStoreHt, err = store.pvtdataStore.LastCommittedBlockHeight() - assert.NoError(t, err) - assert.Equal(t, uint64(10), pvtStoreHt) - assert.True(t, store.IsPvtStoreAheadOfBlockStore()) - - // bring the height of BlockStore equal to pvtdataStore - assert.NoError(t, store.CommitWithPvtData(lastBlkAndPvtData)) - info, err = store.GetBlockchainInfo() - assert.NoError(t, err) - assert.Equal(t, uint64(10), info.Height) - pvtStoreHt, err = store.pvtdataStore.LastCommittedBlockHeight() - assert.NoError(t, err) - assert.Equal(t, uint64(10), pvtStoreHt) - assert.False(t, store.IsPvtStoreAheadOfBlockStore()) -} - -func TestConstructPvtdataMap(t *testing.T) { - assert.Nil(t, constructPvtdataMap(nil)) -} - -func sampleDataWithPvtdataForSelectiveTx(t *testing.T) []*ledger.BlockAndPvtData { - var blockAndpvtdata []*ledger.BlockAndPvtData - blocks := testutil.ConstructTestBlocks(t, 10) - for i := 0; i < 10; i++ { - blockAndpvtdata = append(blockAndpvtdata, &ledger.BlockAndPvtData{Block: blocks[i]}) - } - - // txNum 3, 5, 6 in block 2 has pvtdata but txNum 6 is invalid - blockAndpvtdata[2].PvtData = samplePvtData(t, []uint64{3, 5, 6}) - txFilter := txflags.ValidationFlags(blockAndpvtdata[2].Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) - txFilter.SetFlag(6, pb.TxValidationCode_INVALID_WRITESET) - blockAndpvtdata[2].Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txFilter - - // txNum 4, 6 in block 3 has pvtdata - blockAndpvtdata[3].PvtData = samplePvtData(t, []uint64{4, 6}) - - // txNum 4, 5 in block 5 has missing pvt data but txNum 5 is invalid - missingData := make(ledger.TxMissingPvtDataMap) - missingData.Add(4, "ns-4", "coll-4", true) - missingData.Add(5, "ns-5", "coll-5", true) - blockAndpvtdata[5].MissingPvtData = missingData - txFilter = txflags.ValidationFlags(blockAndpvtdata[5].Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) - txFilter.SetFlag(5, pb.TxValidationCode_INVALID_WRITESET) - blockAndpvtdata[5].Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txFilter - - return blockAndpvtdata -} - -func sampleDataWithPvtdataForAllTxs(t *testing.T) []*ledger.BlockAndPvtData { - var blockAndpvtdata []*ledger.BlockAndPvtData - blocks := testutil.ConstructTestBlocks(t, 10) - for i := 0; i < 10; i++ { - blockAndpvtdata = append(blockAndpvtdata, - &ledger.BlockAndPvtData{ - Block: blocks[i], - PvtData: samplePvtData(t, []uint64{uint64(i), uint64(i + 1)}), - }, - ) - } - return blockAndpvtdata -} - -func samplePvtData(t *testing.T, txNums []uint64) map[uint64]*ledger.TxPvtData { - pvtWriteSet := &rwset.TxPvtReadWriteSet{DataModel: rwset.TxReadWriteSet_KV} - pvtWriteSet.NsPvtRwset = []*rwset.NsPvtReadWriteSet{ - { - Namespace: "ns-1", - CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{ - { - CollectionName: "coll-1", - Rwset: []byte("RandomBytes-PvtRWSet-ns1-coll1"), - }, - { - CollectionName: "coll-2", - Rwset: []byte("RandomBytes-PvtRWSet-ns1-coll2"), - }, - }, - }, - } - var pvtData []*ledger.TxPvtData - for _, txNum := range txNums { - pvtData = append(pvtData, &ledger.TxPvtData{SeqInBlock: txNum, WriteSet: pvtWriteSet}) - } - return constructPvtdataMap(pvtData) -} - -func btlPolicyForSampleData() pvtdatapolicy.BTLPolicy { - return btltestutil.SampleBTLPolicy( - map[[2]string]uint64{ - {"ns-1", "coll-1"}: 0, - {"ns-1", "coll-2"}: 0, - }, - ) -} - -func buildPrivateDataConfig(rootFSPath string) *pvtdatastorage.PrivateDataConfig { - return &pvtdatastorage.PrivateDataConfig{ - PrivateDataConfig: &ledger.PrivateDataConfig{ - PurgeInterval: 1, - }, - StorePath: filepath.Join(rootFSPath, "pvtdataStore"), - } -} diff --git a/core/ledger/mock/state_listener.go b/core/ledger/mock/state_listener.go index ea324bde800..d68721c9e08 100644 --- a/core/ledger/mock/state_listener.go +++ b/core/ledger/mock/state_listener.go @@ -41,6 +41,16 @@ type StateListener struct { interestedInNamespacesReturnsOnCall map[int]struct { result1 []string } + NameStub func() string + nameMutex sync.RWMutex + nameArgsForCall []struct { + } + nameReturns struct { + result1 string + } + nameReturnsOnCall map[int]struct { + result1 string + } StateCommitDoneStub func(string) stateCommitDoneMutex sync.RWMutex stateCommitDoneArgsForCall []struct { @@ -223,6 +233,58 @@ func (fake *StateListener) InterestedInNamespacesReturnsOnCall(i int, result1 [] }{result1} } +func (fake *StateListener) Name() string { + fake.nameMutex.Lock() + ret, specificReturn := fake.nameReturnsOnCall[len(fake.nameArgsForCall)] + fake.nameArgsForCall = append(fake.nameArgsForCall, struct { + }{}) + fake.recordInvocation("Name", []interface{}{}) + fake.nameMutex.Unlock() + if fake.NameStub != nil { + return fake.NameStub() + } + if specificReturn { + return ret.result1 + } + fakeReturns := fake.nameReturns + return fakeReturns.result1 +} + +func (fake *StateListener) NameCallCount() int { + fake.nameMutex.RLock() + defer fake.nameMutex.RUnlock() + return len(fake.nameArgsForCall) +} + +func (fake *StateListener) NameCalls(stub func() string) { + fake.nameMutex.Lock() + defer fake.nameMutex.Unlock() + fake.NameStub = stub +} + +func (fake *StateListener) NameReturns(result1 string) { + fake.nameMutex.Lock() + defer fake.nameMutex.Unlock() + fake.NameStub = nil + fake.nameReturns = struct { + result1 string + }{result1} +} + +func (fake *StateListener) NameReturnsOnCall(i int, result1 string) { + fake.nameMutex.Lock() + defer fake.nameMutex.Unlock() + fake.NameStub = nil + if fake.nameReturnsOnCall == nil { + fake.nameReturnsOnCall = make(map[int]struct { + result1 string + }) + } + fake.nameReturnsOnCall[i] = struct { + result1 string + }{result1} +} + func (fake *StateListener) StateCommitDone(arg1 string) { fake.stateCommitDoneMutex.Lock() fake.stateCommitDoneArgsForCall = append(fake.stateCommitDoneArgsForCall, struct { @@ -263,6 +325,8 @@ func (fake *StateListener) Invocations() map[string][][]interface{} { defer fake.initializeMutex.RUnlock() fake.interestedInNamespacesMutex.RLock() defer fake.interestedInNamespacesMutex.RUnlock() + fake.nameMutex.RLock() + defer fake.nameMutex.RUnlock() fake.stateCommitDoneMutex.RLock() defer fake.stateCommitDoneMutex.RUnlock() copiedInvocations := map[string][][]interface{}{}