Skip to content

Commit

Permalink
async load and some logging
Browse files Browse the repository at this point in the history
  • Loading branch information
kocubinski committed Nov 12, 2023
1 parent 070d4c0 commit 54d461d
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 42 deletions.
34 changes: 34 additions & 0 deletions store/gaskv/store.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package gaskv

import (
"fmt"
"io"
"time"

"github.com/streadway/handy/atomic"

"github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/telemetry"
)
Expand Down Expand Up @@ -33,14 +36,38 @@ func (gs *Store) GetStoreType() types.StoreType {
return gs.parent.GetStoreType()
}

var DebugLogging atomic.Int

var gasTotal uint64

func StartLogging() {
DebugLogging.Add(1)
}

func StopLogging() {
DebugLogging.Add(-1)
fmt.Printf("GasKVStore DebugLog total=%d\n", gasTotal)
gasTotal = 0
}

func debugLog(op string, sub string, amount uint64) {
if DebugLogging.Get() == 1 {
fmt.Printf("GasKVStore DebugLog op=%s sub=%s amount=%d\n", op, sub, amount)
gasTotal += amount
}
}

// Implements KVStore.
func (gs *Store) Get(key []byte) (value []byte) {
gs.gasMeter.ConsumeGas(gs.gasConfig.ReadCostFlat, types.GasReadCostFlatDesc)
value = gs.parent.Get(key)
debugLog("Get", "Flat", gs.gasConfig.ReadCostFlat)

// TODO overflow-safe math?
gs.gasMeter.ConsumeGas(gs.gasConfig.ReadCostPerByte*types.Gas(len(key)), types.GasReadPerByteDesc)
debugLog("Get", "PerByteKey", gs.gasConfig.ReadCostPerByte*types.Gas(len(key)))
gs.gasMeter.ConsumeGas(gs.gasConfig.ReadCostPerByte*types.Gas(len(value)), types.GasReadPerByteDesc)
debugLog("Get", "PerByteValue", gs.gasConfig.ReadCostPerByte*types.Gas(len(value)))

return value
}
Expand All @@ -50,16 +77,20 @@ func (gs *Store) Set(key []byte, value []byte) {
types.AssertValidKey(key)
types.AssertValidValue(value)
gs.gasMeter.ConsumeGas(gs.gasConfig.WriteCostFlat, types.GasWriteCostFlatDesc)
debugLog("Set", "Flat", gs.gasConfig.WriteCostFlat)
// TODO overflow-safe math?
gs.gasMeter.ConsumeGas(gs.gasConfig.WriteCostPerByte*types.Gas(len(key)), types.GasWritePerByteDesc)
debugLog("Set", "PerByteKey", gs.gasConfig.WriteCostPerByte*types.Gas(len(key)))
gs.gasMeter.ConsumeGas(gs.gasConfig.WriteCostPerByte*types.Gas(len(value)), types.GasWritePerByteDesc)
debugLog("Set", "PerByteValue", gs.gasConfig.WriteCostPerByte*types.Gas(len(value)))
gs.parent.Set(key, value)
}

// Implements KVStore.
func (gs *Store) Has(key []byte) bool {
defer telemetry.MeasureSince(time.Now(), "store", "gaskv", "has")
gs.gasMeter.ConsumeGas(gs.gasConfig.HasCost, types.GasHasDesc)
debugLog("Has", "Flat", gs.gasConfig.HasCost)
return gs.parent.Has(key)
}

Expand Down Expand Up @@ -179,8 +210,11 @@ func (gi *gasIterator) consumeSeekGas() {
value := gi.Value()

gi.gasMeter.ConsumeGas(gi.gasConfig.ReadCostPerByte*types.Gas(len(key)), types.GasValuePerByteDesc)
debugLog("consumeSeekGas", "PerByteKey", gi.gasConfig.ReadCostPerByte*types.Gas(len(key)))
gi.gasMeter.ConsumeGas(gi.gasConfig.ReadCostPerByte*types.Gas(len(value)), types.GasValuePerByteDesc)
debugLog("consumeSeekGas", "PerByteValue", gi.gasConfig.ReadCostPerByte*types.Gas(len(value)))
}

gi.gasMeter.ConsumeGas(gi.gasConfig.IterNextCostFlat, types.GasIterNextCostFlatDesc)
debugLog("consumeSeekGas", "Flat", gi.gasConfig.IterNextCostFlat)
}
10 changes: 7 additions & 3 deletions store/iavl_v2/store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package iavl_v2

import (
"fmt"
io "io"
"path/filepath"
"time"
Expand Down Expand Up @@ -34,13 +35,15 @@ func LoadStoreWithInitialVersion(v2RootPath string, key types.StoreKey, id types
// i.e. not the happy path.
path := filepath.Join(v2RootPath, key.Name())
pool := iavl.NewNodePool()
fmt.Println("LoadStoreWithInitialVersion path:", path)
sql, err := iavl.NewSqliteDb(pool, iavl.SqliteDbOptions{Path: path})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to open sqlite db path=%s: %w", path, err)
}

tree := iavl.NewTree(sql, pool, iavl.TreeOptions{})
err = tree.LoadSnapshot(id.Version)
tree := iavl.NewTree(sql, pool, iavl.TreeOptions{StateStorage: true})
//err = tree.LoadSnapshot(id.Version)
err = tree.LoadVersion(id.Version)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -73,6 +76,7 @@ func (s *Store) Commit() types.CommitID {

func (s *Store) LastCommitID() types.CommitID {
hash := s.Tree.Hash()
fmt.Printf("IAVLV2 Get LastCommitID: %x\n", hash)

return types.CommitID{
Version: s.Tree.Version(),
Expand Down
137 changes: 98 additions & 39 deletions store/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cosmos/cosmos-sdk/store/cachemulti"
"github.com/cosmos/cosmos-sdk/store/dbadapter"
"github.com/cosmos/cosmos-sdk/store/iavl"
"github.com/cosmos/cosmos-sdk/store/iavl_v2"
"github.com/cosmos/cosmos-sdk/store/listenkv"
"github.com/cosmos/cosmos-sdk/store/mem"
"github.com/cosmos/cosmos-sdk/store/tracekv"
Expand Down Expand Up @@ -65,7 +66,9 @@ type Store struct {
keysByName map[string]types.StoreKey
lazyLoading bool
initialVersion int64
iavlV2Path string

iavlV2Path string
asyncLoad bool

traceWriter io.Writer
traceContext types.TraceContext
Expand Down Expand Up @@ -106,6 +109,10 @@ func NewStore(db dbm.DB, logger log.Logger) *Store {
func NewStoreWithOptions(db dbm.DB, logger log.Logger, options StoreOptions) *Store {
store := NewStore(db, logger)
store.iavlV2Path = options.IavlV2RootPath
if store.iavlV2Path != "" {
fmt.Println("NewStoreWithOptions iavlV2Path:", store.iavlV2Path)
store.asyncLoad = true
}
return store
}

Expand Down Expand Up @@ -249,49 +256,97 @@ func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error {
})
}

errCh := make(chan error)
storeCh := make(chan struct {
key types.StoreKey
store types.CommitKVStore
})
maxAsync := 7
asyncLoadCount := 0
loadCh := make(chan struct{}, maxAsync)
for i := 0; i < maxAsync; i++ {
loadCh <- struct{}{}
}

for _, key := range storesKeys {
storeParams := rs.storesParams[key]
sParams := rs.storesParams[key]
commitID := rs.getCommitID(infos, key.Name())

// If it has been added, set the initial version
if upgrades.IsAdded(key.Name()) {
storeParams.initialVersion = uint64(ver) + 1
}

// TODO async load

store, err := rs.loadCommitStoreFromParams(key, commitID, storeParams)
if err != nil {
return errors.Wrap(err, "failed to load store")
sParams.initialVersion = uint64(ver) + 1
}

newStores[key] = store

// If it was deleted, remove all data
if upgrades.IsDeleted(key.Name()) {
if err := deleteKVStore(store.(types.KVStore)); err != nil {
return errors.Wrapf(err, "failed to delete store %s", key.Name())
if rs.asyncLoad && sParams.typ == types.StoreTypeIAVL {
asyncLoadCount++
if upgrades.IsDeleted(key.Name()) {
return fmt.Errorf("async load does not support deleted stores")
}
if oldName := upgrades.RenamedFrom(key.Name()); oldName != "" {
return fmt.Errorf("async load does not support renamed stores")
}
} else if oldName := upgrades.RenamedFrom(key.Name()); oldName != "" {
// handle renames specially
// make an unregistered key to satify loadCommitStore params
oldKey := types.NewKVStoreKey(oldName)
oldParams := storeParams
oldParams.key = oldKey

// load from the old name
oldStore, err := rs.loadCommitStoreFromParams(oldKey, rs.getCommitID(infos, oldName), oldParams)
fmt.Println("async load", key.Name())
go func(k types.StoreKey, cid types.CommitID, sp storeParams) {
<-loadCh
store, err := rs.loadCommitStoreFromParams(k, cid, sp)
if err != nil {
errCh <- err
return
}
storeCh <- struct {
key types.StoreKey
store types.CommitKVStore
}{
key: k,
store: store,
}
loadCh <- struct{}{}
}(key, commitID, sParams)
} else {
fmt.Println("sync load", key.Name())
store, err := rs.loadCommitStoreFromParams(key, commitID, sParams)
if err != nil {
return errors.Wrapf(err, "failed to load old store %s", oldName)
return errors.Wrap(err, "failed to load store")
}

// move all data
if err := moveKVStoreData(oldStore.(types.KVStore), store.(types.KVStore)); err != nil {
return errors.Wrapf(err, "failed to move store %s -> %s", oldName, key.Name())
newStores[key] = store

// If it was deleted, remove all data
if upgrades.IsDeleted(key.Name()) {
if err := deleteKVStore(store.(types.KVStore)); err != nil {
return errors.Wrapf(err, "failed to delete store %s", key.Name())
}
} else if oldName := upgrades.RenamedFrom(key.Name()); oldName != "" {
// handle renames specially
// make an unregistered key to satify loadCommitStore params
oldKey := types.NewKVStoreKey(oldName)
oldParams := sParams
oldParams.key = oldKey

// load from the old name
oldStore, err := rs.loadCommitStoreFromParams(oldKey, rs.getCommitID(infos, oldName), oldParams)
if err != nil {
return errors.Wrapf(err, "failed to load old store %s", oldName)
}

// move all data
if err := moveKVStoreData(oldStore.(types.KVStore), store.(types.KVStore)); err != nil {
return errors.Wrapf(err, "failed to move store %s -> %s", oldName, key.Name())
}
}
}
}

for i := 0; i < asyncLoadCount; i++ {
select {
case err := <-errCh:
return err
case store := <-storeCh:
fmt.Println("async load done", store.key.Name())
newStores[store.key] = store.store
}
}

rs.lastCommitInfo = cInfo
rs.stores = newStores

Expand Down Expand Up @@ -445,7 +500,7 @@ func (rs *Store) Commit() types.CommitID {
rs.mx.RLock()
defer rs.mx.RUnlock()
hash, keys := rs.lastCommitInfo.Hash()
rs.logger.Debug("calculated commit hash", "height", version, "commit_hash", fmt.Sprintf("%X", hash), "keys", keys)
rs.logger.Info("calculated commit hash", "height", version, "commit_hash", fmt.Sprintf("%X", hash), "keys", keys)

return types.CommitID{
Version: version,
Expand All @@ -455,6 +510,8 @@ func (rs *Store) Commit() types.CommitID {

// SetCommitHeader sets the commit block header of the store.
func (rs *Store) SetCommitHeader(h tmproto.Header) {
rs.logger.Info("SetCommitHeader", "header", h)
rs.logger.Info("SetCommitHeader", "lastResultsHash", fmt.Sprintf("%x", h.LastResultsHash))
rs.commitHeader = h
}

Expand Down Expand Up @@ -902,18 +959,20 @@ func (rs *Store) loadCommitStoreFromParams(key types.StoreKey, id types.CommitID
var store types.CommitKVStore
var err error

// IAVL v1
if params.initialVersion == 0 {
store, err = iavl.LoadStore(db, rs.logger, key, id, rs.lazyLoading, rs.iavlCacheSize)
} else {
store, err = iavl.LoadStoreWithInitialVersion(db, rs.logger, key, id, rs.lazyLoading, params.initialVersion, rs.iavlCacheSize)
}
fmt.Println("load store", key.Name(), "initialVersion", params.initialVersion)

// IAVL v0
//if params.initialVersion == 0 {
// store, err = iavl.LoadStore(db, rs.logger, key, id, rs.lazyLoading, rs.iavlCacheSize)
//} else {
// store, err = iavl.LoadStoreWithInitialVersion(db, rs.logger, key, id, rs.lazyLoading, params.initialVersion, rs.iavlCacheSize)
//}

// IAVL V2
//store, err = iavl_v2.LoadStoreWithInitialVersion(rs.iavlV2Path, key, id, params.initialVersion)
store, err = iavl_v2.LoadStoreWithInitialVersion(rs.iavlV2Path, key, id, params.initialVersion)

if err != nil {
return nil, err
return nil, fmt.Errorf("failed to load store %s: %w", key.Name(), err)
}

if rs.interBlockCache != nil {
Expand Down Expand Up @@ -971,7 +1030,7 @@ func (rs *Store) commitStores(version int64, storeMap map[types.StoreKey]types.C

for key, store := range storeMap {
commitID := store.Commit()
rs.logger.Debug("committed KVStore", "height", commitID.Version, "key", key.Name(), "commit_store_hash", fmt.Sprintf("%X", commitID.Hash))
rs.logger.Info("committed KVStore", "height", commitID.Version, "key", key.Name(), "commit_store_hash", fmt.Sprintf("%X", commitID.Hash))

if store.GetStoreType() == types.StoreTypeTransient {
continue
Expand Down
2 changes: 2 additions & 0 deletions x/bank/keeper/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ func (k BaseViewKeeper) IterateAccountBalances(ctx sdk.Context, addr sdk.AccAddr
for ; iterator.Valid(); iterator.Next() {
var balance sdk.Coin
k.cdc.MustUnmarshal(iterator.Value(), &balance)
fmt.Printf("IterateAccountBalances key=%s len(value)=%d balance=%v\n",
iterator.Key(), len(iterator.Value()), balance)

if cb(balance) {
break
Expand Down

0 comments on commit 54d461d

Please sign in to comment.