Skip to content

Commit

Permalink
[FAB-17667] move couchdb util package to statecouchdb package (hyperl…
Browse files Browse the repository at this point in the history
…edger#926)

We have a CouchDB util package that exposes API to read from and
write to CouchDB. This package is used only by the statecouchdb and
the tests package (i.e., ledger level integration test).

In general, a util package is justified only when an API needs to be
used by so many other packages. For e.g., if we have 10 packages and
each of them need a common API, it is not good to duplicate the API
code and instead create a separate util package.

Hence, we move all files in the CouchDB util package to the statecouchdb
package itself. Note that there is a lot of opportunities to refactor
couchdb package but we will do that in a separate PR.

Given that we have moved couchdb util to the statecouchdb package,
we can have only one TestMain. It would be hacky to reuse
existing testVDBEnv for testing couchDB too. Hence, we introduce
another testEnv called testCouchDBEnv to test couchDB utils.

Signed-off-by: senthil <cendhu@gmail.com>
  • Loading branch information
cendhu authored Apr 15, 2020
1 parent 252795a commit c41f9eb
Show file tree
Hide file tree
Showing 24 changed files with 1,461 additions and 1,533 deletions.
3 changes: 1 addition & 2 deletions core/ledger/kvledger/benchmark/chainmgmt/chains.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt/ledgermgmttest"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
)

// ChainID is a type used for the ids for the chains for experiments
Expand Down Expand Up @@ -50,7 +49,7 @@ func newChainsMgr(mgrConf *ChainMgrConf, batchConf *BatchConf, initOp chainInitO
panic("environment variable 'useCouchDB' is set to true but 'COUCHDB_ADDR' is not set")
}
ledgermgmtInitializer.Config.StateDBConfig.StateDatabase = "CouchDB"
ledgermgmtInitializer.Config.StateDBConfig.CouchDB = &couchdb.Config{
ledgermgmtInitializer.Config.StateDBConfig.CouchDB = &ledger.CouchDBConfig{
Address: couchdbAddr,
RedoLogPath: filepath.Join(dataDir, "couchdbRedologs"),
UserCacheSizeMBs: 500,
Expand Down
21 changes: 4 additions & 17 deletions core/ledger/kvledger/tests/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ import (
configtxtest "github.com/hyperledger/fabric/common/configtx/test"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/common/policydsl"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/tests/fakes"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/statecouchdb"
"github.com/hyperledger/fabric/internal/pkg/txflags"
"github.com/hyperledger/fabric/protoutil"
"github.com/stretchr/testify/require"
)

var logger = flogging.MustGetLogger("test2")
Expand Down Expand Up @@ -231,18 +230,6 @@ func setBlockFlagsToValid(block *common.Block) {
txflags.NewWithValues(len(block.Data.Data), protopeer.TxValidationCode_VALID)
}

func dropCouchDBs(t *testing.T, couchdbConfig *couchdb.Config) {
couchInstance, err := couchdb.CreateCouchInstance(couchdbConfig, &disabled.Provider{})
require.NoError(t, err)
dbNames, err := couchInstance.RetrieveApplicationDBNames()
require.NoError(t, err)
for _, dbName := range dbNames {
db := &couchdb.CouchDatabase{
CouchInstance: couchInstance,
DBName: dbName,
}
response, err := db.DropDatabase()
require.NoError(t, err)
require.True(t, response.Ok)
}
func dropCouchDBs(t *testing.T, couchdbConfig *ledger.CouchDBConfig) {
statecouchdb.DeleteApplicationDBs(t, couchdbConfig)
}
14 changes: 9 additions & 5 deletions core/ledger/kvledger/tests/v1x_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (

protopeer "github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/statecouchdb"
"github.com/hyperledger/fabric/core/ledger/mock"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
"github.com/hyperledger/fabric/core/ledger/util/couchdbtest"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -76,11 +76,11 @@ func TestV13WithStateCouchdb(t *testing.T) {
fmt.Sprintf("%s:%s", couchdbDataUnzipDir, "/opt/couchdb/data"),
fmt.Sprintf("%s:%s", localdHostDir, "/opt/couchdb/etc/local.d"),
}
couchAddress, cleanup := couchdbtest.CouchDBSetup(couchdbBinds)
couchAddress, cleanup := couchDBSetup(t, couchdbBinds)
defer cleanup()

// set required config data to use state couchdb
couchdbConfig := &couchdb.Config{
couchdbConfig := &ledger.CouchDBConfig{
Address: couchAddress,
Username: "",
Password: "",
Expand Down Expand Up @@ -114,7 +114,11 @@ func TestV13WithStateCouchdb(t *testing.T) {
dataHelper.verify(h2)
}

func checkInitLedgerPanicAndDropDBs(t *testing.T, env *env, ledgerFSRoot string, couchdbConfig *couchdb.Config) {
func couchDBSetup(t *testing.T, binds []string) (addr string, cleanup func()) {
return statecouchdb.StartCouchDB(t, binds)
}

func checkInitLedgerPanicAndDropDBs(t *testing.T, env *env, ledgerFSRoot string, couchdbConfig *ledger.CouchDBConfig) {
t.Logf("verifying that a panic occurs because idStore has old format and then reformat the idstore to proceed")
idStorePath := kvledger.LedgerProviderPath(ledgerFSRoot)
require.PanicsWithValue(
Expand Down
15 changes: 8 additions & 7 deletions core/ledger/kvledger/txmgmt/privacyenabledstate/test_exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"github.com/hyperledger/fabric/core/ledger/kvledger/bookkeeping"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/statecouchdb"
"github.com/hyperledger/fabric/core/ledger/mock"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
"github.com/hyperledger/fabric/core/ledger/util/couchdbtest"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -109,13 +107,15 @@ type CouchDBCommonStorageTestEnv struct {
bookkeeperTestEnv *bookkeeping.TestEnv
redoPath string
couchCleanup func()
couchDBConfig *ledger.CouchDBConfig
}

// StartExternalResource starts external couchDB resources.
func (env *CouchDBCommonStorageTestEnv) StartExternalResource() {
if env.couchAddress == "" {
env.couchAddress, env.couchCleanup = couchdbtest.CouchDBSetup(nil)
if env.couchAddress != "" {
return
}
env.couchAddress, env.couchCleanup = statecouchdb.StartCouchDB(env.t.(*testing.T), nil)
}

// StopExternalResource stops external couchDB resources.
Expand All @@ -132,12 +132,13 @@ func (env *CouchDBCommonStorageTestEnv) Init(t testing.TB) {
t.Fatalf("Failed to create redo log directory: %s", err)
}

env.t = t
env.StartExternalResource()

stateDBConfig := &StateDBConfig{
StateDBConfig: &ledger.StateDBConfig{
StateDatabase: "CouchDB",
CouchDB: &couchdb.Config{
CouchDB: &ledger.CouchDBConfig{
Address: env.couchAddress,
Username: "",
Password: "",
Expand All @@ -161,9 +162,9 @@ func (env *CouchDBCommonStorageTestEnv) Init(t testing.TB) {
[]string{"lscc", "_lifecycle"},
)
assert.NoError(t, err)
env.t = t
env.provider = dbProvider
env.redoPath = redoPath
env.couchDBConfig = stateDBConfig.CouchDB
}

// GetDBHandle implements corresponding function from interface TestEnv
Expand All @@ -182,7 +183,7 @@ func (env *CouchDBCommonStorageTestEnv) GetName() string {
func (env *CouchDBCommonStorageTestEnv) Cleanup() {
csdbProvider := env.provider.(*CommonStorageDBProvider)
if csdbProvider != nil {
statecouchdb.CleanupDB(env.t, csdbProvider.VersionedDBProvider)
statecouchdb.DeleteApplicationDBs(env.t, env.couchDBConfig)
}
os.RemoveAll(env.redoPath)
env.bookkeeperTestEnv.Cleanup()
Expand Down
21 changes: 10 additions & 11 deletions core/ledger/kvledger/txmgmt/statedb/statecouchdb/commit_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ import (
"sync"

"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
"github.com/pkg/errors"
)

type committer struct {
db *couchdb.CouchDatabase
db *couchDatabase
batchUpdateMap map[string]*batchableDocument
namespace string
cacheKVs cacheKVs
Expand Down Expand Up @@ -105,7 +104,7 @@ func (vdb *VersionedDB) buildCommittersForNs(ns string, nsUpdates map[string]*st
return nil, err
}
// for each namespace, build mutiple committers based on the maxBatchSize
maxBatchSize := db.CouchInstance.MaxBatchUpdateSize()
maxBatchSize := db.couchInstance.maxBatchUpdateSize()
numCommitters := 1
if maxBatchSize > 0 {
numCommitters = int(math.Ceil(float64(len(nsUpdates)) / float64(maxBatchSize)))
Expand Down Expand Up @@ -173,13 +172,13 @@ func (vdb *VersionedDB) executeCommitter(committers []*committer) error {

// commitUpdates commits the given updates to couchdb
func (c *committer) commitUpdates() error {
docs := []*couchdb.CouchDoc{}
docs := []*couchDoc{}
for _, update := range c.batchUpdateMap {
docs = append(docs, &update.CouchDoc)
}

// Do the bulk update into couchdb. Note that this will do retries if the entire bulk update fails or times out
responses, err := c.db.BatchUpdateDocuments(docs)
responses, err := c.db.batchUpdateDocuments(docs)
if err != nil {
return err
}
Expand All @@ -198,8 +197,8 @@ func (c *committer) commitUpdates() error {
//Remove the "_rev" from the JSON before saving
//this will allow the CouchDB retry logic to retry revisions without encountering
//a mismatch between the "If-Match" and the "_rev" tag in the JSON
if doc.CouchDoc.JSONValue != nil {
err = removeJSONRevision(&doc.CouchDoc.JSONValue)
if doc.CouchDoc.jsonValue != nil {
err = removeJSONRevision(&doc.CouchDoc.jsonValue)
if err != nil {
return err
}
Expand All @@ -210,13 +209,13 @@ func (c *committer) commitUpdates() error {
// If this is a deleted document, then retry the delete
// If the delete fails due to a document not being found (404 error),
// the document has already been deleted and the DeleteDoc will not return an error
err = c.db.DeleteDoc(resp.ID, "")
err = c.db.deleteDoc(resp.ID, "")
} else {
logger.Warningf("CouchDB batch document update encountered an problem. Reason:%s, Retrying update for document ID:%s", resp.Reason, resp.ID)
// Save the individual document to couchdb
// Note that this will do retries as needed
var revision string
revision, err = c.db.SaveDoc(resp.ID, "", &doc.CouchDoc)
revision, err = c.db.saveDoc(resp.ID, "", &doc.CouchDoc)
c.updateRevisionInCacheUpdate(resp.ID, revision)
}

Expand Down Expand Up @@ -293,7 +292,7 @@ func (vdb *VersionedDB) addMissingRevisionsFromDB(ns string, missingKeys []strin
return err
}

logger.Debugf("Pulling revisions for the [%d] keys for namsespace [%s] that were not part of the readset", len(missingKeys), db.DBName)
logger.Debugf("Pulling revisions for the [%d] keys for namsespace [%s] that were not part of the readset", len(missingKeys), db.dbName)
retrievedMetadata, err := retrieveNsMetadata(db, missingKeys)
if err != nil {
return err
Expand All @@ -307,6 +306,6 @@ func (vdb *VersionedDB) addMissingRevisionsFromDB(ns string, missingKeys []strin

//batchableDocument defines a document for a batch
type batchableDocument struct {
CouchDoc couchdb.CouchDoc
CouchDoc couchDoc
Deleted bool
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ import (
)

func TestGetRevision(t *testing.T) {
env := testEnv
env.init(t, nil)
defer env.cleanup()
vdbEnv.init(t, nil)
defer vdbEnv.cleanup()

versionedDB, err := testEnv.DBProvider.GetDBHandle("test-get-revisions")
versionedDB, err := vdbEnv.DBProvider.GetDBHandle("test-get-revisions")
assert.NoError(t, err)
db := versionedDB.(*VersionedDB)

Expand Down Expand Up @@ -83,11 +82,10 @@ func TestGetRevision(t *testing.T) {
}

func TestBuildCommittersForNs(t *testing.T) {
env := testEnv
env.init(t, nil)
defer env.cleanup()
vdbEnv.init(t, nil)
defer vdbEnv.cleanup()

versionedDB, err := testEnv.DBProvider.GetDBHandle("test-build-committers-for-ns")
versionedDB, err := vdbEnv.DBProvider.GetDBHandle("test-build-committers-for-ns")
assert.NoError(t, err)
db := versionedDB.(*VersionedDB)

Expand All @@ -101,7 +99,7 @@ func TestBuildCommittersForNs(t *testing.T) {
nsUpdates = make(map[string]*statedb.VersionedValue)
// populate updates with maxBatchSize + 1.
dummyHeight := version.NewHeight(1, 1)
for i := 0; i <= env.config.MaxBatchUpdateSize; i++ {
for i := 0; i <= vdbEnv.config.MaxBatchUpdateSize; i++ {
nsUpdates[strconv.Itoa(i)] = &statedb.VersionedValue{
Value: nil,
Metadata: nil,
Expand All @@ -118,19 +116,18 @@ func TestBuildCommittersForNs(t *testing.T) {
}

func TestBuildCommitters(t *testing.T) {
env := testEnv
env.init(t, nil)
defer env.cleanup()
vdbEnv.init(t, nil)
defer vdbEnv.cleanup()

versionedDB, err := testEnv.DBProvider.GetDBHandle("test-build-committers")
versionedDB, err := vdbEnv.DBProvider.GetDBHandle("test-build-committers")
assert.NoError(t, err)
db := versionedDB.(*VersionedDB)

dummyHeight := version.NewHeight(1, 1)
batch := statedb.NewUpdateBatch()
batch.Put("ns-1", "key1", []byte("value1"), dummyHeight)
batch.Put("ns-2", "key1", []byte("value2"), dummyHeight)
for i := 0; i <= env.config.MaxBatchUpdateSize; i++ {
for i := 0; i <= vdbEnv.config.MaxBatchUpdateSize; i++ {
batch.Put("maxBatch", "key1", []byte("value3"), dummyHeight)
}
namespaceSet := map[string]bool{
Expand All @@ -152,11 +149,10 @@ func TestBuildCommitters(t *testing.T) {
}

func TestExecuteCommitter(t *testing.T) {
env := testEnv
env.init(t, nil)
defer env.cleanup()
vdbEnv.init(t, nil)
defer vdbEnv.cleanup()

versionedDB, err := testEnv.DBProvider.GetDBHandle("test-execute-committer")
versionedDB, err := vdbEnv.DBProvider.GetDBHandle("test-execute-committer")
assert.NoError(t, err)
db := versionedDB.(*VersionedDB)

Expand Down Expand Up @@ -213,11 +209,10 @@ func TestExecuteCommitter(t *testing.T) {
}

func TestCommitUpdates(t *testing.T) {
env := testEnv
env.init(t, nil)
defer env.cleanup()
vdbEnv.init(t, nil)
defer vdbEnv.cleanup()

versionedDB, err := testEnv.DBProvider.GetDBHandle("test-commitupdates")
versionedDB, err := vdbEnv.DBProvider.GetDBHandle("test-commitupdates")
assert.NoError(t, err)
db := versionedDB.(*VersionedDB)

Expand Down
Loading

0 comments on commit c41f9eb

Please sign in to comment.