diff --git a/core/ledger/kvledger/txmgmt/privacyenabledstate/common_storage_db.go b/core/ledger/kvledger/txmgmt/privacyenabledstate/common_storage_db.go index 18d6dd15f85..d3dfea19f6e 100644 --- a/core/ledger/kvledger/txmgmt/privacyenabledstate/common_storage_db.go +++ b/core/ledger/kvledger/txmgmt/privacyenabledstate/common_storage_db.go @@ -20,7 +20,7 @@ import ( ) const ( - nsJoiner = "$" + nsJoiner = "$$" pvtDataPrefix = "p" hashDataPrefix = "h" ) diff --git a/core/ledger/kvledger/txmgmt/privacyenabledstate/db_test.go b/core/ledger/kvledger/txmgmt/privacyenabledstate/db_test.go index fec91d87c85..54eee5481ab 100644 --- a/core/ledger/kvledger/txmgmt/privacyenabledstate/db_test.go +++ b/core/ledger/kvledger/txmgmt/privacyenabledstate/db_test.go @@ -312,6 +312,47 @@ func testQueryOnCouchDB(t *testing.T, env TestEnv) { testQueryItr(t, itr, []string{testKey(10)}, []string{"joe", "1000007"}) } +func TestLongDBNameOnCouchDB(t *testing.T) { + for _, env := range testEnvs { + _, ok := env.(*CouchDBCommonStorageTestEnv) + if !ok { + continue + } + t.Run(env.GetName(), func(t *testing.T) { + testLongDBNameOnCouchDB(t, env) + }) + } +} + +func testLongDBNameOnCouchDB(t *testing.T, env TestEnv) { + env.Init(t) + defer env.Cleanup() + + // Creates metadataDB (i.e., chainDB) + // Allowed pattern for chainName: [a-z][a-z0-9.-] + db := env.GetDBHandle("w1coaii9ck3l8red6a5cf3rwbe1b4wvbzcrrfl7samu7px8b9gf-4hft7wrgdmzzjj9ure4cbffucaj78nbj9ej.kvl3bus1iq1qir9xlhb8a1wipuksgs3g621elzy1prr658087exwrhp-y4j55o9cld242v--oeh3br1g7m8d6l8jobn.y42cgjt1.u1ik8qxnv4ohh9kr2w2zc8hqir5u4ev23s7jygrg....s7.ohp-5bcxari8nji") + + updates := NewUpdateBatch() + + // Allowed pattern for namespace and collection: [a-zA-Z0-9_-] + ns := "wMCnSXiV9YoIqNQyNvFVTdM8XnUtvrOFFIWsKelmP5NEszmNLl8YhtOKbFu3P_NgwgsYF8PsfwjYCD8f1XRpANQLoErDHwLlweryqXeJ6vzT2x0pS_GwSx0m6tBI0zOmHQOq_2De8A87x6zUOPwufC2T6dkidFxiuq8Sey2-5vUo_iNKCij3WTeCnKx78PUIg_U1gp4_0KTvYVtRBRvH0kz5usizBxPaiFu3TPhB9XLviScvdUVSbSYJ0Z" + coll := "vWjtfSTXVK8WJus5s6zWoMIciXd7qHRZIusF9SkOS6m8XuHCiJDE9cCRuVerq22Na8qBL2ywDGFpVMIuzfyEXLjeJb0mMuH4cwewT6r1INOTOSYwrikwOLlT_fl0V1L7IQEwUBB8WCvRqSdj6j5-E5aGul_pv_0UeCdwWiyA_GrZmP7ocLzfj2vP8btigrajqdH-irLO2ydEjQUAvf8fiuxru9la402KmKRy457GgI98UHoUdqV3f3FCdR" + + updates.PubUpdates.Put(ns, "key1", []byte("value1"), version.NewHeight(1, 1)) + updates.PvtUpdates.Put(ns, coll, "key1", []byte("pvt_value"), version.NewHeight(1, 2)) + updates.HashUpdates.Put(ns, coll, util.ComputeStringHash("key1"), util.ComputeHash([]byte("pvt_value")), version.NewHeight(1, 2)) + + db.ApplyPrivacyAwareUpdates(updates, version.NewHeight(2, 6)) + + vv, err := db.GetState(ns, "key1") + assert.NoError(t, err) + assert.Equal(t, &statedb.VersionedValue{Value: []byte("value1"), Version: version.NewHeight(1, 1)}, vv) + + vv, err = db.GetPrivateData(ns, coll, "key1") + assert.NoError(t, err) + assert.Equal(t, &statedb.VersionedValue{Value: []byte("pvt_value"), Version: version.NewHeight(1, 2)}, vv) +} + func testItr(t *testing.T, itr statedb.ResultsIterator, expectedKeys []string) { defer itr.Close() for _, expectedKey := range expectedKeys { diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go index fc164c02ac1..ccb1ea1c92a 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go @@ -161,7 +161,7 @@ func (provider *VersionedDBProvider) Close() { type VersionedDB struct { couchInstance *couchdb.CouchInstance metadataDB *couchdb.CouchDatabase // A database per channel to store metadata such as savepoint. - dbName string // The name of the channel database. + chainName string // The name of the chain/channel. namespaceDBs map[string]*couchdb.CouchDatabase // One database per deployed chaincode. //TODO: Decide whether to split committedDataCache into multiple cahces, i.e., one per namespace. committedDataCache *CommittedVersions // Used as a local cache during bulk processing of a block. @@ -171,8 +171,10 @@ type VersionedDB struct { // newVersionedDB constructs an instance of VersionedDB func newVersionedDB(couchInstance *couchdb.CouchInstance, dbName string) (*VersionedDB, error) { // CreateCouchDatabase creates a CouchDB database object, as well as the underlying database if it does not exist + chainName := dbName + + dbName = couchdb.ConstructMetadataDBName(dbName) - dbName = dbName + "_" metadataDB, err := couchdb.CreateCouchDatabase(*couchInstance, dbName) if err != nil { return nil, err @@ -183,33 +185,32 @@ func newVersionedDB(couchInstance *couchdb.CouchInstance, dbName string) (*Versi committedDataCache := &CommittedVersions{committedVersions: versionMap, revisionNumbers: revMap} - return &VersionedDB{couchInstance, metadataDB, dbName, namespaceDBMap, committedDataCache, sync.RWMutex{}}, nil + return &VersionedDB{couchInstance, metadataDB, chainName, namespaceDBMap, committedDataCache, sync.RWMutex{}}, nil } // getNamespaceDBHandle gets the handle to a named chaincode database func (vdb *VersionedDB) getNamespaceDBHandle(namespace string) (*couchdb.CouchDatabase, error) { - // TODO: lower casing the namespace will be handled more appropriately when - // we address the additional name mapping logic specified in FAB-7130. - namespaceDBName := vdb.dbName + strings.ToLower(namespace) vdb.mux.RLock() - db := vdb.namespaceDBs[namespaceDBName] + db := vdb.namespaceDBs[namespace] vdb.mux.RUnlock() if db != nil { return db, nil } + namespaceDBName := couchdb.ConstructNamespaceDBName(vdb.chainName, namespace) + vdb.mux.Lock() defer vdb.mux.Unlock() - db = vdb.namespaceDBs[namespaceDBName] + db = vdb.namespaceDBs[namespace] if db == nil { var err error db, err = couchdb.CreateCouchDatabase(*vdb.couchInstance, namespaceDBName) if err != nil { return nil, err } - vdb.namespaceDBs[namespaceDBName] = db + vdb.namespaceDBs[namespace] = db } return db, nil } @@ -560,7 +561,7 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis } logger.Debugf("Channel [%s]: namespace=[%s] key=[%#v], prior revision=[%s], isDelete=[%t]", - vdb.dbName, ns, key, revision, isDelete) + vdb.chainName, ns, key, revision, isDelete) if isDelete { // this is a deleted record. Set the _deleted property to true @@ -922,6 +923,7 @@ func (vdb *VersionedDB) recordSavepoint(height *version.Height, namespaces []str var err error var savepointDoc couchSavepointData // ensure full commit to flush all changes on updated namespaces until now to disk + // namespace also includes empty namespace which is nothing but metadataDB for _, ns := range namespaces { // TODO: Ensure full commit can be parallelized to improve performance db, err := vdb.getNamespaceDBHandle(ns) @@ -935,6 +937,7 @@ func (vdb *VersionedDB) recordSavepoint(height *version.Height, namespaces []str return errors.New("Failed to perform full commit") } } + // construct savepoint document savepointDoc.BlockNum = height.BlockNum savepointDoc.TxNum = height.TxNum @@ -952,12 +955,10 @@ func (vdb *VersionedDB) recordSavepoint(height *version.Height, namespaces []str return err } - dbResponse, err := vdb.metadataDB.EnsureFullCommit() - - if err != nil || dbResponse.Ok != true { - logger.Errorf("Failed to perform full commit\n") - return errors.New("Failed to perform full commit") - } + // Note: Ensure full commit on metadataDB after storing the savepoint is not necessary + // as CouchDB syncs states to disk periodically (every 1 second). If peer fails before + // syncing the savepoint to disk, ledger recovery process kicks in to ensure consistency + // between CouchDB and block store on peer restart return nil } diff --git a/core/ledger/util/couchdb/couchdbutil.go b/core/ledger/util/couchdb/couchdbutil.go index ab1d044743a..02d193253ee 100644 --- a/core/ledger/util/couchdb/couchdbutil.go +++ b/core/ledger/util/couchdb/couchdbutil.go @@ -17,16 +17,28 @@ limitations under the License. package couchdb import ( + "encoding/hex" "fmt" "net/http" "regexp" "strconv" "strings" "time" + + "github.com/hyperledger/fabric/common/util" ) -var expectedDatabaseNamePattern = `[a-z][a-z0-9.$_-]*` -var maxLength = 249 +var expectedDatabaseNamePattern = `[a-z][a-z0-9.$_()-]*` +var maxLength = 238 + +// To restrict the length of couchDB database name to the +// allowed length of 249 chars, the string length limit +// for chain/channel name, namespace/chaincode name, and +// collection name, which constitutes the database name, +// is defined. +var chainNameAllowedLength = 50 +var namespaceNameAllowedLength = 50 +var collectionNameAllowedLength = 50 //CreateCouchInstance creates a CouchDB instance func CreateCouchInstance(couchDBConnectURL, id, pw string, maxRetries, @@ -137,6 +149,81 @@ func CreateSystemDatabasesIfNotExist(couchInstance CouchInstance) error { } +// ConstructMetadataDBName truncates the db name to couchdb allowed length to +// construct the metadataDBName +func ConstructMetadataDBName(dbName string) string { + if len(dbName) > maxLength { + untruncatedDBName := dbName + // Truncate the name if the length violates the allowed limit + // As the passed dbName is same as chain/channel name, truncate using chainNameAllowedLength + dbName = dbName[:chainNameAllowedLength] + // For metadataDB (i.e., chain/channel DB), the dbName contains + (SHA256 hash of actual chainName) + dbName = dbName + "(" + hex.EncodeToString(util.ComputeSHA256([]byte(untruncatedDBName))) + ")" + // 50 chars for dbName + 1 char for ( + 64 chars for sha256 + 1 char for ) = 116 chars + } + return dbName + "_" +} + +// ConstructNamespaceDBName truncates db name to couchdb allowed length to +// construct the namespaceDBName +func ConstructNamespaceDBName(chainName, namespace string) string { + // replace upper-case in namespace with a escape sequence '$' and the respective lower-case letter + escapedNamespace := escapeUpperCase(namespace) + namespaceDBName := chainName + "_" + escapedNamespace + + // For namespaceDBName of form 'chainName_namespace', on length limit violation, the truncated + // namespaceDBName would contain + "_" + + // + + // () + // + // For namespaceDBName of form 'chainName_namespace$$collection', on length limit violation, the truncated + // namespaceDBName would contain + "_" + + // + "$$" + + () + + if len(namespaceDBName) > maxLength { + // Compute the hash of untruncated namespaceDBName that needs to be appended to + // truncated namespaceDBName for maintaining uniqueness + hashOfNamespaceDBName := hex.EncodeToString(util.ComputeSHA256([]byte(chainName + "_" + namespace))) + + // As truncated namespaceDBName is of form 'chainName_escapedNamespace', both chainName + // and escapedNamespace need to be truncated to defined allowed length. + if len(chainName) > chainNameAllowedLength { + // Truncate chainName to chainNameAllowedLength + chainName = chainName[0:chainNameAllowedLength] + } + // As escapedNamespace can be of either 'namespace' or 'namespace$$collectionName', + // both 'namespace' and 'collectionName' need to be truncated to defined allowed length. + // '$$' is used as joiner between namespace and collection name. + // Split the escapedNamespace into escaped namespace and escaped collection name if exist. + names := strings.Split(escapedNamespace, "$$") + namespace := names[0] + if len(namespace) > namespaceNameAllowedLength { + // Truncate the namespace + namespace = namespace[0:namespaceNameAllowedLength] + } + + escapedNamespace = namespace + + // Check and truncate the length of collection name if exist + if len(names) == 2 { + collection := names[1] + if len(collection) > collectionNameAllowedLength { + // Truncate the escaped collection name + collection = collection[0:collectionNameAllowedLength] + } + // Append truncated collection name to escapedNamespace + escapedNamespace = escapedNamespace + "$$" + collection + } + // Construct and return the namespaceDBName + // 50 chars for chainName + 1 char for '_' + 102 chars for escaped namespace + 1 char for '(' + 64 chars + // for sha256 hash + 1 char for ')' = 219 chars + return chainName + "_" + escapedNamespace + "(" + hashOfNamespaceDBName + ")" + } + return namespaceDBName +} + //mapAndValidateDatabaseName checks to see if the database name contains illegal characters //CouchDB Rules: Only lowercase characters (a-z), digits (0-9), and any of the characters //_, $, (, ), +, -, and / are allowed. Must begin with a letter. @@ -168,3 +255,11 @@ func mapAndValidateDatabaseName(databaseName string) (string, error) { databaseName = strings.Replace(databaseName, ".", "$", -1) return databaseName, nil } + +// escapeUpperCase replaces every upper case letter with a '$' and the respective +// lower-case letter +func escapeUpperCase(dbName string) string { + re := regexp.MustCompile(`([A-Z])`) + dbName = re.ReplaceAllString(dbName, "$$"+"$1") + return strings.ToLower(dbName) +} diff --git a/core/ledger/util/couchdb/couchdbutil_test.go b/core/ledger/util/couchdb/couchdbutil_test.go index c4b682161ac..66cca76d232 100644 --- a/core/ledger/util/couchdb/couchdbutil_test.go +++ b/core/ledger/util/couchdb/couchdbutil_test.go @@ -17,10 +17,12 @@ limitations under the License. package couchdb import ( + "encoding/hex" "fmt" "testing" "github.com/hyperledger/fabric/common/ledger/testutil" + "github.com/hyperledger/fabric/common/util" "github.com/hyperledger/fabric/core/ledger/ledgerconfig" ) @@ -110,3 +112,70 @@ func TestDatabaseMapping(t *testing.T) { testutil.AssertNoError(t, err, "") testutil.AssertEquals(t, transformedName, "test$my$db-1") } + +func TestConstructMetadataDBName(t *testing.T) { + // Allowed pattern for chainName: [a-z][a-z0-9.-] + chainName := "tob2g.y-z0f.qwp-rq5g4-ogid5g6oucyryg9sc16mz0t4vuake5q557esz7sn493nf0ghch0xih6dwuirokyoi4jvs67gh6r5v6mhz3-292un2-9egdcs88cstg3f7xa9m1i8v4gj0t3jedsm-woh3kgiqehwej6h93hdy5tr4v.1qmmqjzz0ox62k.507sh3fkw3-mfqh.ukfvxlm5szfbwtpfkd1r4j.cy8oft5obvwqpzjxb27xuw6" + + truncatedChainName := "tob2g.y-z0f.qwp-rq5g4-ogid5g6oucyryg9sc16mz0t4vuak" + testutil.AssertEquals(t, len(truncatedChainName), chainNameAllowedLength) + + // + 1 char for '(' + <64 chars for SHA256 hash + // (hex encoding) of untruncated chainName> + 1 char for ')' + 1 char for '_' = 117 chars + hash := hex.EncodeToString(util.ComputeSHA256([]byte(chainName))) + expectedDBName := truncatedChainName + "(" + hash + ")" + "_" + expectedDBNameLength := 117 + + constructedDBName := ConstructMetadataDBName(chainName) + testutil.AssertEquals(t, len(constructedDBName), expectedDBNameLength) + testutil.AssertEquals(t, constructedDBName, expectedDBName) +} + +func TestConstructedNamespaceDBName(t *testing.T) { + // === SCENARIO 1: chainName_ns$$coll === + + // Allowed pattern for chainName: [a-z][a-z0-9.-] + chainName := "tob2g.y-z0f.qwp-rq5g4-ogid5g6oucyryg9sc16mz0t4vuake5q557esz7sn493nf0ghch0xih6dwuirokyoi4jvs67gh6r5v6mhz3-292un2-9egdcs88cstg3f7xa9m1i8v4gj0t3jedsm-woh3kgiqehwej6h93hdy5tr4v.1qmmqjzz0ox62k.507sh3fkw3-mfqh.ukfvxlm5szfbwtpfkd1r4j.cy8oft5obvwqpzjxb27xuw6" + + // Allowed pattern for namespace and collection: [a-zA-Z0-9_-] + ns := "wMCnSXiV9YoIqNQyNvFVTdM8XnUtvrOFFIWsKelmP5NEszmNLl8YhtOKbFu3P_NgwgsYF8PsfwjYCD8f1XRpANQLoErDHwLlweryqXeJ6vzT2x0pS_GwSx0m6tBI0zOmHQOq_2De8A87x6zUOPwufC2T6dkidFxiuq8Sey2-5vUo_iNKCij3WTeCnKx78PUIg_U1gp4_0KTvYVtRBRvH0kz5usizBxPaiFu3TPhB9XLviScvdUVSbSYJ0Z" + // first letter 'p' denotes private data namespace. We can use 'h' to denote hashed data namespace as defined in + // privacyenabledstate/common_storage_db.go + coll := "pvWjtfSTXVK8WJus5s6zWoMIciXd7qHRZIusF9SkOS6m8XuHCiJDE9cCRuVerq22Na8qBL2ywDGFpVMIuzfyEXLjeJb0mMuH4cwewT6r1INOTOSYwrikwOLlT_fl0V1L7IQEwUBB8WCvRqSdj6j5-E5aGul_pv_0UeCdwWiyA_GrZmP7ocLzfj2vP8btigrajqdH-irLO2ydEjQUAvf8fiuxru9la402KmKRy457GgI98UHoUdqV3f3FCdR" + + truncatedChainName := "tob2g.y-z0f.qwp-rq5g4-ogid5g6oucyryg9sc16mz0t4vuak" + truncatedEscapedNs := "w$m$cn$s$xi$v9$yo$iq$n$qy$nv$f$v$td$m8$xn$utvr$o$f" + truncatedEscapedColl := "pv$wjtf$s$t$x$v$k8$w$jus5s6z$wo$m$ici$xd7q$h$r$z$i" + testutil.AssertEquals(t, len(truncatedChainName), chainNameAllowedLength) + testutil.AssertEquals(t, len(truncatedEscapedNs), namespaceNameAllowedLength) + testutil.AssertEquals(t, len(truncatedEscapedColl), collectionNameAllowedLength) + + untruncatedDBName := chainName + "_" + ns + "$$" + coll + hash := hex.EncodeToString(util.ComputeSHA256([]byte(untruncatedDBName))) + expectedDBName := truncatedChainName + "_" + truncatedEscapedNs + "$$" + truncatedEscapedColl + "(" + hash + ")" + // + 1 char for '_' + + 2 chars for '$$' + + 1 char for '(' + <64 chars for SHA256 hash + // (hex encoding) of untruncated chainName_ns$$coll> + 1 char for ')' = 219 chars + expectedDBNameLength := 219 + + namespace := ns + "$$" + coll + constructedDBName := ConstructNamespaceDBName(chainName, namespace) + testutil.AssertEquals(t, len(constructedDBName), expectedDBNameLength) + testutil.AssertEquals(t, constructedDBName, expectedDBName) + + // === SCENARIO 2: chainName_ns === + + untruncatedDBName = chainName + "_" + ns + hash = hex.EncodeToString(util.ComputeSHA256([]byte(untruncatedDBName))) + expectedDBName = truncatedChainName + "_" + truncatedEscapedNs + "(" + hash + ")" + // + 1 char for '_' + + 1 char for '(' + <64 chars for SHA256 hash + // (hex encoding) of untruncated chainName_ns> + 1 char for ')' = 167 chars + expectedDBNameLength = 167 + + namespace = ns + constructedDBName = ConstructNamespaceDBName(chainName, namespace) + testutil.AssertEquals(t, len(constructedDBName), expectedDBNameLength) + testutil.AssertEquals(t, constructedDBName, expectedDBName) +}