diff --git a/store/gaskv/store.go b/store/gaskv/store.go index f36119169c76..ce3e7cde8098 100644 --- a/store/gaskv/store.go +++ b/store/gaskv/store.go @@ -1,9 +1,12 @@ package gaskv import ( + "fmt" "io" "time" + "github.com/streadway/handy/atomic" + "github.com/cosmos/cosmos-sdk/store/types" "github.com/cosmos/cosmos-sdk/telemetry" ) @@ -33,14 +36,38 @@ func (gs *Store) GetStoreType() types.StoreType { return gs.parent.GetStoreType() } +var DebugLogging atomic.Int + +var gasTotal uint64 + +func StartLogging() { + DebugLogging.Add(1) +} + +func StopLogging() { + DebugLogging.Add(-1) + fmt.Printf("GasKVStore DebugLog total=%d\n", gasTotal) + gasTotal = 0 +} + +func debugLog(op string, sub string, amount uint64) { + if DebugLogging.Get() == 1 { + fmt.Printf("GasKVStore DebugLog op=%s sub=%s amount=%d\n", op, sub, amount) + gasTotal += amount + } +} + // Implements KVStore. func (gs *Store) Get(key []byte) (value []byte) { gs.gasMeter.ConsumeGas(gs.gasConfig.ReadCostFlat, types.GasReadCostFlatDesc) value = gs.parent.Get(key) + debugLog("Get", "Flat", gs.gasConfig.ReadCostFlat) // TODO overflow-safe math? gs.gasMeter.ConsumeGas(gs.gasConfig.ReadCostPerByte*types.Gas(len(key)), types.GasReadPerByteDesc) + debugLog("Get", "PerByteKey", gs.gasConfig.ReadCostPerByte*types.Gas(len(key))) gs.gasMeter.ConsumeGas(gs.gasConfig.ReadCostPerByte*types.Gas(len(value)), types.GasReadPerByteDesc) + debugLog("Get", "PerByteValue", gs.gasConfig.ReadCostPerByte*types.Gas(len(value))) return value } @@ -50,9 +77,12 @@ func (gs *Store) Set(key []byte, value []byte) { types.AssertValidKey(key) types.AssertValidValue(value) gs.gasMeter.ConsumeGas(gs.gasConfig.WriteCostFlat, types.GasWriteCostFlatDesc) + debugLog("Set", "Flat", gs.gasConfig.WriteCostFlat) // TODO overflow-safe math? gs.gasMeter.ConsumeGas(gs.gasConfig.WriteCostPerByte*types.Gas(len(key)), types.GasWritePerByteDesc) + debugLog("Set", "PerByteKey", gs.gasConfig.WriteCostPerByte*types.Gas(len(key))) gs.gasMeter.ConsumeGas(gs.gasConfig.WriteCostPerByte*types.Gas(len(value)), types.GasWritePerByteDesc) + debugLog("Set", "PerByteValue", gs.gasConfig.WriteCostPerByte*types.Gas(len(value))) gs.parent.Set(key, value) } @@ -60,6 +90,7 @@ func (gs *Store) Set(key []byte, value []byte) { func (gs *Store) Has(key []byte) bool { defer telemetry.MeasureSince(time.Now(), "store", "gaskv", "has") gs.gasMeter.ConsumeGas(gs.gasConfig.HasCost, types.GasHasDesc) + debugLog("Has", "Flat", gs.gasConfig.HasCost) return gs.parent.Has(key) } @@ -179,8 +210,11 @@ func (gi *gasIterator) consumeSeekGas() { value := gi.Value() gi.gasMeter.ConsumeGas(gi.gasConfig.ReadCostPerByte*types.Gas(len(key)), types.GasValuePerByteDesc) + debugLog("consumeSeekGas", "PerByteKey", gi.gasConfig.ReadCostPerByte*types.Gas(len(key))) gi.gasMeter.ConsumeGas(gi.gasConfig.ReadCostPerByte*types.Gas(len(value)), types.GasValuePerByteDesc) + debugLog("consumeSeekGas", "PerByteValue", gi.gasConfig.ReadCostPerByte*types.Gas(len(value))) } gi.gasMeter.ConsumeGas(gi.gasConfig.IterNextCostFlat, types.GasIterNextCostFlatDesc) + debugLog("consumeSeekGas", "Flat", gi.gasConfig.IterNextCostFlat) } diff --git a/store/iavl_v2/store.go b/store/iavl_v2/store.go index 353cde98aaaf..2ea417ab6dbf 100644 --- a/store/iavl_v2/store.go +++ b/store/iavl_v2/store.go @@ -1,6 +1,7 @@ package iavl_v2 import ( + "fmt" io "io" "path/filepath" "time" @@ -34,13 +35,15 @@ func LoadStoreWithInitialVersion(v2RootPath string, key types.StoreKey, id types // i.e. not the happy path. path := filepath.Join(v2RootPath, key.Name()) pool := iavl.NewNodePool() + fmt.Println("LoadStoreWithInitialVersion path:", path) sql, err := iavl.NewSqliteDb(pool, iavl.SqliteDbOptions{Path: path}) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to open sqlite db path=%s: %w", path, err) } - tree := iavl.NewTree(sql, pool, iavl.TreeOptions{}) - err = tree.LoadSnapshot(id.Version) + tree := iavl.NewTree(sql, pool, iavl.TreeOptions{StateStorage: true}) + //err = tree.LoadSnapshot(id.Version) + err = tree.LoadVersion(id.Version) if err != nil { return nil, err } @@ -73,6 +76,7 @@ func (s *Store) Commit() types.CommitID { func (s *Store) LastCommitID() types.CommitID { hash := s.Tree.Hash() + fmt.Printf("IAVLV2 Get LastCommitID: %x\n", hash) return types.CommitID{ Version: s.Tree.Version(), diff --git a/store/rootmulti/store.go b/store/rootmulti/store.go index 2162620297cf..186d0faa68c4 100644 --- a/store/rootmulti/store.go +++ b/store/rootmulti/store.go @@ -26,6 +26,7 @@ import ( "github.com/cosmos/cosmos-sdk/store/cachemulti" "github.com/cosmos/cosmos-sdk/store/dbadapter" "github.com/cosmos/cosmos-sdk/store/iavl" + "github.com/cosmos/cosmos-sdk/store/iavl_v2" "github.com/cosmos/cosmos-sdk/store/listenkv" "github.com/cosmos/cosmos-sdk/store/mem" "github.com/cosmos/cosmos-sdk/store/tracekv" @@ -65,7 +66,9 @@ type Store struct { keysByName map[string]types.StoreKey lazyLoading bool initialVersion int64 - iavlV2Path string + + iavlV2Path string + asyncLoad bool traceWriter io.Writer traceContext types.TraceContext @@ -106,6 +109,10 @@ func NewStore(db dbm.DB, logger log.Logger) *Store { func NewStoreWithOptions(db dbm.DB, logger log.Logger, options StoreOptions) *Store { store := NewStore(db, logger) store.iavlV2Path = options.IavlV2RootPath + if store.iavlV2Path != "" { + fmt.Println("NewStoreWithOptions iavlV2Path:", store.iavlV2Path) + store.asyncLoad = true + } return store } @@ -249,49 +256,97 @@ func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error { }) } + errCh := make(chan error) + storeCh := make(chan struct { + key types.StoreKey + store types.CommitKVStore + }) + maxAsync := 7 + asyncLoadCount := 0 + loadCh := make(chan struct{}, maxAsync) + for i := 0; i < maxAsync; i++ { + loadCh <- struct{}{} + } + for _, key := range storesKeys { - storeParams := rs.storesParams[key] + sParams := rs.storesParams[key] commitID := rs.getCommitID(infos, key.Name()) // If it has been added, set the initial version if upgrades.IsAdded(key.Name()) { - storeParams.initialVersion = uint64(ver) + 1 - } - - // TODO async load - - store, err := rs.loadCommitStoreFromParams(key, commitID, storeParams) - if err != nil { - return errors.Wrap(err, "failed to load store") + sParams.initialVersion = uint64(ver) + 1 } - newStores[key] = store - - // If it was deleted, remove all data - if upgrades.IsDeleted(key.Name()) { - if err := deleteKVStore(store.(types.KVStore)); err != nil { - return errors.Wrapf(err, "failed to delete store %s", key.Name()) + if rs.asyncLoad && sParams.typ == types.StoreTypeIAVL { + asyncLoadCount++ + if upgrades.IsDeleted(key.Name()) { + return fmt.Errorf("async load does not support deleted stores") + } + if oldName := upgrades.RenamedFrom(key.Name()); oldName != "" { + return fmt.Errorf("async load does not support renamed stores") } - } else if oldName := upgrades.RenamedFrom(key.Name()); oldName != "" { - // handle renames specially - // make an unregistered key to satify loadCommitStore params - oldKey := types.NewKVStoreKey(oldName) - oldParams := storeParams - oldParams.key = oldKey - - // load from the old name - oldStore, err := rs.loadCommitStoreFromParams(oldKey, rs.getCommitID(infos, oldName), oldParams) + fmt.Println("async load", key.Name()) + go func(k types.StoreKey, cid types.CommitID, sp storeParams) { + <-loadCh + store, err := rs.loadCommitStoreFromParams(k, cid, sp) + if err != nil { + errCh <- err + return + } + storeCh <- struct { + key types.StoreKey + store types.CommitKVStore + }{ + key: k, + store: store, + } + loadCh <- struct{}{} + }(key, commitID, sParams) + } else { + fmt.Println("sync load", key.Name()) + store, err := rs.loadCommitStoreFromParams(key, commitID, sParams) if err != nil { - return errors.Wrapf(err, "failed to load old store %s", oldName) + return errors.Wrap(err, "failed to load store") } - // move all data - if err := moveKVStoreData(oldStore.(types.KVStore), store.(types.KVStore)); err != nil { - return errors.Wrapf(err, "failed to move store %s -> %s", oldName, key.Name()) + newStores[key] = store + + // If it was deleted, remove all data + if upgrades.IsDeleted(key.Name()) { + if err := deleteKVStore(store.(types.KVStore)); err != nil { + return errors.Wrapf(err, "failed to delete store %s", key.Name()) + } + } else if oldName := upgrades.RenamedFrom(key.Name()); oldName != "" { + // handle renames specially + // make an unregistered key to satify loadCommitStore params + oldKey := types.NewKVStoreKey(oldName) + oldParams := sParams + oldParams.key = oldKey + + // load from the old name + oldStore, err := rs.loadCommitStoreFromParams(oldKey, rs.getCommitID(infos, oldName), oldParams) + if err != nil { + return errors.Wrapf(err, "failed to load old store %s", oldName) + } + + // move all data + if err := moveKVStoreData(oldStore.(types.KVStore), store.(types.KVStore)); err != nil { + return errors.Wrapf(err, "failed to move store %s -> %s", oldName, key.Name()) + } } } } + for i := 0; i < asyncLoadCount; i++ { + select { + case err := <-errCh: + return err + case store := <-storeCh: + fmt.Println("async load done", store.key.Name()) + newStores[store.key] = store.store + } + } + rs.lastCommitInfo = cInfo rs.stores = newStores @@ -445,7 +500,7 @@ func (rs *Store) Commit() types.CommitID { rs.mx.RLock() defer rs.mx.RUnlock() hash, keys := rs.lastCommitInfo.Hash() - rs.logger.Debug("calculated commit hash", "height", version, "commit_hash", fmt.Sprintf("%X", hash), "keys", keys) + rs.logger.Info("calculated commit hash", "height", version, "commit_hash", fmt.Sprintf("%X", hash), "keys", keys) return types.CommitID{ Version: version, @@ -455,6 +510,8 @@ func (rs *Store) Commit() types.CommitID { // SetCommitHeader sets the commit block header of the store. func (rs *Store) SetCommitHeader(h tmproto.Header) { + rs.logger.Info("SetCommitHeader", "header", h) + rs.logger.Info("SetCommitHeader", "lastResultsHash", fmt.Sprintf("%x", h.LastResultsHash)) rs.commitHeader = h } @@ -902,18 +959,20 @@ func (rs *Store) loadCommitStoreFromParams(key types.StoreKey, id types.CommitID var store types.CommitKVStore var err error - // IAVL v1 - if params.initialVersion == 0 { - store, err = iavl.LoadStore(db, rs.logger, key, id, rs.lazyLoading, rs.iavlCacheSize) - } else { - store, err = iavl.LoadStoreWithInitialVersion(db, rs.logger, key, id, rs.lazyLoading, params.initialVersion, rs.iavlCacheSize) - } + fmt.Println("load store", key.Name(), "initialVersion", params.initialVersion) + + // IAVL v0 + //if params.initialVersion == 0 { + // store, err = iavl.LoadStore(db, rs.logger, key, id, rs.lazyLoading, rs.iavlCacheSize) + //} else { + // store, err = iavl.LoadStoreWithInitialVersion(db, rs.logger, key, id, rs.lazyLoading, params.initialVersion, rs.iavlCacheSize) + //} // IAVL V2 - //store, err = iavl_v2.LoadStoreWithInitialVersion(rs.iavlV2Path, key, id, params.initialVersion) + store, err = iavl_v2.LoadStoreWithInitialVersion(rs.iavlV2Path, key, id, params.initialVersion) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to load store %s: %w", key.Name(), err) } if rs.interBlockCache != nil { @@ -971,7 +1030,7 @@ func (rs *Store) commitStores(version int64, storeMap map[types.StoreKey]types.C for key, store := range storeMap { commitID := store.Commit() - rs.logger.Debug("committed KVStore", "height", commitID.Version, "key", key.Name(), "commit_store_hash", fmt.Sprintf("%X", commitID.Hash)) + rs.logger.Info("committed KVStore", "height", commitID.Version, "key", key.Name(), "commit_store_hash", fmt.Sprintf("%X", commitID.Hash)) if store.GetStoreType() == types.StoreTypeTransient { continue diff --git a/x/bank/keeper/view.go b/x/bank/keeper/view.go index fe46ec9b6ec1..f311dcc8a66c 100644 --- a/x/bank/keeper/view.go +++ b/x/bank/keeper/view.go @@ -122,6 +122,8 @@ func (k BaseViewKeeper) IterateAccountBalances(ctx sdk.Context, addr sdk.AccAddr for ; iterator.Valid(); iterator.Next() { var balance sdk.Coin k.cdc.MustUnmarshal(iterator.Value(), &balance) + fmt.Printf("IterateAccountBalances key=%s len(value)=%d balance=%v\n", + iterator.Key(), len(iterator.Value()), balance) if cb(balance) { break