Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LEDGER] move couchdb util package to statecouchdb package #926

Merged
merged 1 commit into from
Apr 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions core/ledger/kvledger/benchmark/chainmgmt/chains.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt/ledgermgmttest"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
)

// ChainID is a type used for the ids for the chains for experiments
Expand Down Expand Up @@ -50,7 +49,7 @@ func newChainsMgr(mgrConf *ChainMgrConf, batchConf *BatchConf, initOp chainInitO
panic("environment variable 'useCouchDB' is set to true but 'COUCHDB_ADDR' is not set")
}
ledgermgmtInitializer.Config.StateDBConfig.StateDatabase = "CouchDB"
ledgermgmtInitializer.Config.StateDBConfig.CouchDB = &couchdb.Config{
ledgermgmtInitializer.Config.StateDBConfig.CouchDB = &ledger.CouchDBConfig{
Address: couchdbAddr,
RedoLogPath: filepath.Join(dataDir, "couchdbRedologs"),
UserCacheSizeMBs: 500,
Expand Down
21 changes: 4 additions & 17 deletions core/ledger/kvledger/tests/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ import (
configtxtest "github.com/hyperledger/fabric/common/configtx/test"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/common/policydsl"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/tests/fakes"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/statecouchdb"
"github.com/hyperledger/fabric/internal/pkg/txflags"
"github.com/hyperledger/fabric/protoutil"
"github.com/stretchr/testify/require"
)

var logger = flogging.MustGetLogger("test2")
Expand Down Expand Up @@ -231,18 +230,6 @@ func setBlockFlagsToValid(block *common.Block) {
txflags.NewWithValues(len(block.Data.Data), protopeer.TxValidationCode_VALID)
}

func dropCouchDBs(t *testing.T, couchdbConfig *couchdb.Config) {
couchInstance, err := couchdb.CreateCouchInstance(couchdbConfig, &disabled.Provider{})
require.NoError(t, err)
dbNames, err := couchInstance.RetrieveApplicationDBNames()
require.NoError(t, err)
for _, dbName := range dbNames {
db := &couchdb.CouchDatabase{
CouchInstance: couchInstance,
DBName: dbName,
}
response, err := db.DropDatabase()
require.NoError(t, err)
require.True(t, response.Ok)
}
func dropCouchDBs(t *testing.T, couchdbConfig *ledger.CouchDBConfig) {
statecouchdb.DeleteApplicationDBs(t, couchdbConfig)
}
14 changes: 9 additions & 5 deletions core/ledger/kvledger/tests/v1x_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (

protopeer "github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/statecouchdb"
"github.com/hyperledger/fabric/core/ledger/mock"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
"github.com/hyperledger/fabric/core/ledger/util/couchdbtest"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -76,11 +76,11 @@ func TestV13WithStateCouchdb(t *testing.T) {
fmt.Sprintf("%s:%s", couchdbDataUnzipDir, "/opt/couchdb/data"),
fmt.Sprintf("%s:%s", localdHostDir, "/opt/couchdb/etc/local.d"),
}
couchAddress, cleanup := couchdbtest.CouchDBSetup(couchdbBinds)
couchAddress, cleanup := couchDBSetup(t, couchdbBinds)
defer cleanup()

// set required config data to use state couchdb
couchdbConfig := &couchdb.Config{
couchdbConfig := &ledger.CouchDBConfig{
Address: couchAddress,
Username: "",
Password: "",
Expand Down Expand Up @@ -114,7 +114,11 @@ func TestV13WithStateCouchdb(t *testing.T) {
dataHelper.verify(h2)
}

func checkInitLedgerPanicAndDropDBs(t *testing.T, env *env, ledgerFSRoot string, couchdbConfig *couchdb.Config) {
func couchDBSetup(t *testing.T, binds []string) (addr string, cleanup func()) {
return statecouchdb.StartCouchDB(t, binds)
}

func checkInitLedgerPanicAndDropDBs(t *testing.T, env *env, ledgerFSRoot string, couchdbConfig *ledger.CouchDBConfig) {
t.Logf("verifying that a panic occurs because idStore has old format and then reformat the idstore to proceed")
idStorePath := kvledger.LedgerProviderPath(ledgerFSRoot)
require.PanicsWithValue(
Expand Down
15 changes: 8 additions & 7 deletions core/ledger/kvledger/txmgmt/privacyenabledstate/test_exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"github.com/hyperledger/fabric/core/ledger/kvledger/bookkeeping"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/statecouchdb"
"github.com/hyperledger/fabric/core/ledger/mock"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
"github.com/hyperledger/fabric/core/ledger/util/couchdbtest"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -109,13 +107,15 @@ type CouchDBCommonStorageTestEnv struct {
bookkeeperTestEnv *bookkeeping.TestEnv
redoPath string
couchCleanup func()
couchDBConfig *ledger.CouchDBConfig
}

// StartExternalResource starts external couchDB resources.
func (env *CouchDBCommonStorageTestEnv) StartExternalResource() {
if env.couchAddress == "" {
env.couchAddress, env.couchCleanup = couchdbtest.CouchDBSetup(nil)
if env.couchAddress != "" {
return
}
env.couchAddress, env.couchCleanup = statecouchdb.StartCouchDB(env.t.(*testing.T), nil)
}

// StopExternalResource stops external couchDB resources.
Expand All @@ -132,12 +132,13 @@ func (env *CouchDBCommonStorageTestEnv) Init(t testing.TB) {
t.Fatalf("Failed to create redo log directory: %s", err)
}

env.t = t
env.StartExternalResource()

stateDBConfig := &StateDBConfig{
StateDBConfig: &ledger.StateDBConfig{
StateDatabase: "CouchDB",
CouchDB: &couchdb.Config{
CouchDB: &ledger.CouchDBConfig{
Address: env.couchAddress,
Username: "",
Password: "",
Expand All @@ -161,9 +162,9 @@ func (env *CouchDBCommonStorageTestEnv) Init(t testing.TB) {
[]string{"lscc", "_lifecycle"},
)
assert.NoError(t, err)
env.t = t
env.provider = dbProvider
env.redoPath = redoPath
env.couchDBConfig = stateDBConfig.CouchDB
}

// GetDBHandle implements corresponding function from interface TestEnv
Expand All @@ -182,7 +183,7 @@ func (env *CouchDBCommonStorageTestEnv) GetName() string {
func (env *CouchDBCommonStorageTestEnv) Cleanup() {
csdbProvider := env.provider.(*CommonStorageDBProvider)
if csdbProvider != nil {
statecouchdb.CleanupDB(env.t, csdbProvider.VersionedDBProvider)
statecouchdb.DeleteApplicationDBs(env.t, env.couchDBConfig)
}
os.RemoveAll(env.redoPath)
env.bookkeeperTestEnv.Cleanup()
Expand Down
21 changes: 10 additions & 11 deletions core/ledger/kvledger/txmgmt/statedb/statecouchdb/commit_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ import (
"sync"

"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
"github.com/pkg/errors"
)

type committer struct {
db *couchdb.CouchDatabase
db *couchDatabase
batchUpdateMap map[string]*batchableDocument
namespace string
cacheKVs cacheKVs
Expand Down Expand Up @@ -105,7 +104,7 @@ func (vdb *VersionedDB) buildCommittersForNs(ns string, nsUpdates map[string]*st
return nil, err
}
// for each namespace, build mutiple committers based on the maxBatchSize
maxBatchSize := db.CouchInstance.MaxBatchUpdateSize()
maxBatchSize := db.couchInstance.maxBatchUpdateSize()
numCommitters := 1
if maxBatchSize > 0 {
numCommitters = int(math.Ceil(float64(len(nsUpdates)) / float64(maxBatchSize)))
Expand Down Expand Up @@ -173,13 +172,13 @@ func (vdb *VersionedDB) executeCommitter(committers []*committer) error {

// commitUpdates commits the given updates to couchdb
func (c *committer) commitUpdates() error {
docs := []*couchdb.CouchDoc{}
docs := []*couchDoc{}
for _, update := range c.batchUpdateMap {
docs = append(docs, &update.CouchDoc)
}

// Do the bulk update into couchdb. Note that this will do retries if the entire bulk update fails or times out
responses, err := c.db.BatchUpdateDocuments(docs)
responses, err := c.db.batchUpdateDocuments(docs)
if err != nil {
return err
}
Expand All @@ -198,8 +197,8 @@ func (c *committer) commitUpdates() error {
//Remove the "_rev" from the JSON before saving
//this will allow the CouchDB retry logic to retry revisions without encountering
//a mismatch between the "If-Match" and the "_rev" tag in the JSON
if doc.CouchDoc.JSONValue != nil {
err = removeJSONRevision(&doc.CouchDoc.JSONValue)
if doc.CouchDoc.jsonValue != nil {
err = removeJSONRevision(&doc.CouchDoc.jsonValue)
if err != nil {
return err
}
Expand All @@ -210,13 +209,13 @@ func (c *committer) commitUpdates() error {
// If this is a deleted document, then retry the delete
// If the delete fails due to a document not being found (404 error),
// the document has already been deleted and the DeleteDoc will not return an error
err = c.db.DeleteDoc(resp.ID, "")
err = c.db.deleteDoc(resp.ID, "")
} else {
logger.Warningf("CouchDB batch document update encountered an problem. Reason:%s, Retrying update for document ID:%s", resp.Reason, resp.ID)
// Save the individual document to couchdb
// Note that this will do retries as needed
var revision string
revision, err = c.db.SaveDoc(resp.ID, "", &doc.CouchDoc)
revision, err = c.db.saveDoc(resp.ID, "", &doc.CouchDoc)
c.updateRevisionInCacheUpdate(resp.ID, revision)
}

Expand Down Expand Up @@ -293,7 +292,7 @@ func (vdb *VersionedDB) addMissingRevisionsFromDB(ns string, missingKeys []strin
return err
}

logger.Debugf("Pulling revisions for the [%d] keys for namsespace [%s] that were not part of the readset", len(missingKeys), db.DBName)
logger.Debugf("Pulling revisions for the [%d] keys for namsespace [%s] that were not part of the readset", len(missingKeys), db.dbName)
retrievedMetadata, err := retrieveNsMetadata(db, missingKeys)
if err != nil {
return err
Expand All @@ -307,6 +306,6 @@ func (vdb *VersionedDB) addMissingRevisionsFromDB(ns string, missingKeys []strin

//batchableDocument defines a document for a batch
type batchableDocument struct {
CouchDoc couchdb.CouchDoc
CouchDoc couchDoc
Deleted bool
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ import (
)

func TestGetRevision(t *testing.T) {
env := testEnv
env.init(t, nil)
defer env.cleanup()
vdbEnv.init(t, nil)
defer vdbEnv.cleanup()

versionedDB, err := testEnv.DBProvider.GetDBHandle("test-get-revisions")
versionedDB, err := vdbEnv.DBProvider.GetDBHandle("test-get-revisions")
assert.NoError(t, err)
db := versionedDB.(*VersionedDB)

Expand Down Expand Up @@ -83,11 +82,10 @@ func TestGetRevision(t *testing.T) {
}

func TestBuildCommittersForNs(t *testing.T) {
env := testEnv
env.init(t, nil)
defer env.cleanup()
vdbEnv.init(t, nil)
defer vdbEnv.cleanup()

versionedDB, err := testEnv.DBProvider.GetDBHandle("test-build-committers-for-ns")
versionedDB, err := vdbEnv.DBProvider.GetDBHandle("test-build-committers-for-ns")
assert.NoError(t, err)
db := versionedDB.(*VersionedDB)

Expand All @@ -101,7 +99,7 @@ func TestBuildCommittersForNs(t *testing.T) {
nsUpdates = make(map[string]*statedb.VersionedValue)
// populate updates with maxBatchSize + 1.
dummyHeight := version.NewHeight(1, 1)
for i := 0; i <= env.config.MaxBatchUpdateSize; i++ {
for i := 0; i <= vdbEnv.config.MaxBatchUpdateSize; i++ {
nsUpdates[strconv.Itoa(i)] = &statedb.VersionedValue{
Value: nil,
Metadata: nil,
Expand All @@ -118,19 +116,18 @@ func TestBuildCommittersForNs(t *testing.T) {
}

func TestBuildCommitters(t *testing.T) {
env := testEnv
env.init(t, nil)
defer env.cleanup()
vdbEnv.init(t, nil)
defer vdbEnv.cleanup()

versionedDB, err := testEnv.DBProvider.GetDBHandle("test-build-committers")
versionedDB, err := vdbEnv.DBProvider.GetDBHandle("test-build-committers")
assert.NoError(t, err)
db := versionedDB.(*VersionedDB)

dummyHeight := version.NewHeight(1, 1)
batch := statedb.NewUpdateBatch()
batch.Put("ns-1", "key1", []byte("value1"), dummyHeight)
batch.Put("ns-2", "key1", []byte("value2"), dummyHeight)
for i := 0; i <= env.config.MaxBatchUpdateSize; i++ {
for i := 0; i <= vdbEnv.config.MaxBatchUpdateSize; i++ {
batch.Put("maxBatch", "key1", []byte("value3"), dummyHeight)
}
namespaceSet := map[string]bool{
Expand All @@ -152,11 +149,10 @@ func TestBuildCommitters(t *testing.T) {
}

func TestExecuteCommitter(t *testing.T) {
env := testEnv
env.init(t, nil)
defer env.cleanup()
vdbEnv.init(t, nil)
defer vdbEnv.cleanup()

versionedDB, err := testEnv.DBProvider.GetDBHandle("test-execute-committer")
versionedDB, err := vdbEnv.DBProvider.GetDBHandle("test-execute-committer")
assert.NoError(t, err)
db := versionedDB.(*VersionedDB)

Expand Down Expand Up @@ -213,11 +209,10 @@ func TestExecuteCommitter(t *testing.T) {
}

func TestCommitUpdates(t *testing.T) {
env := testEnv
env.init(t, nil)
defer env.cleanup()
vdbEnv.init(t, nil)
defer vdbEnv.cleanup()

versionedDB, err := testEnv.DBProvider.GetDBHandle("test-commitupdates")
versionedDB, err := vdbEnv.DBProvider.GetDBHandle("test-commitupdates")
assert.NoError(t, err)
db := versionedDB.(*VersionedDB)

Expand Down
Loading