Skip to content

Commit

Permalink
Optimize iteration on nested cache context
Browse files Browse the repository at this point in the history
Closes: cosmos#10310
Solution:
- cache the valid status
  • Loading branch information
yihuang committed Nov 16, 2022
1 parent 3423442 commit 1874a29
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 24 deletions.
162 changes: 162 additions & 0 deletions store/cachekv/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package cachekv_test

import (
fmt "fmt"
"testing"

"github.com/cosmos/cosmos-sdk/store"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/tendermint/tendermint/libs/log"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
dbm "github.com/tendermint/tm-db"
)

func DoBenchmarkDeepContextStack(b *testing.B, depth int) {
begin := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
end := []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
key := storetypes.NewKVStoreKey("test")

db := dbm.NewMemDB()
cms := store.NewCommitMultiStore(db)
cms.MountStoreWithDB(key, storetypes.StoreTypeIAVL, db)
cms.LoadLatestVersion()
ctx := sdk.NewContext(cms, tmproto.Header{}, false, log.NewNopLogger())

var stack ContextStack
stack.Reset(ctx)

for i := 0; i < depth; i++ {
stack.Snapshot()

store := stack.CurrentContext().KVStore(key)
store.Set(begin, []byte("value"))
}

store := stack.CurrentContext().KVStore(key)
for i := 0; i < b.N; i++ {
it := store.Iterator(begin, end)
it.Valid()
it.Key()
it.Value()
it.Next()
it.Close()
}
}

func BenchmarkDeepContextStack1(b *testing.B) {
DoBenchmarkDeepContextStack(b, 1)
}

func BenchmarkDeepContextStack3(b *testing.B) {
DoBenchmarkDeepContextStack(b, 3)
}
func BenchmarkDeepContextStack10(b *testing.B) {
DoBenchmarkDeepContextStack(b, 10)
}

func BenchmarkDeepContextStack13(b *testing.B) {
DoBenchmarkDeepContextStack(b, 13)
}

// cachedContext is a pair of cache context and its corresponding commit method.
// They are obtained from the return value of `context.CacheContext()`.
type cachedContext struct {
ctx sdk.Context
commit func()
}

// ContextStack manages the initial context and a stack of cached contexts,
// to support the `StateDB.Snapshot` and `StateDB.RevertToSnapshot` methods.
type ContextStack struct {
// Context of the initial state before transaction execution.
// It's the context used by `StateDB.CommitedState`.
initialCtx sdk.Context
cachedContexts []cachedContext
}

// CurrentContext returns the top context of cached stack,
// if the stack is empty, returns the initial context.
func (cs *ContextStack) CurrentContext() sdk.Context {
l := len(cs.cachedContexts)
if l == 0 {
return cs.initialCtx
}
return cs.cachedContexts[l-1].ctx
}

// Reset sets the initial context and clear the cache context stack.
func (cs *ContextStack) Reset(ctx sdk.Context) {
cs.initialCtx = ctx
if len(cs.cachedContexts) > 0 {
cs.cachedContexts = []cachedContext{}
}
}

// IsEmpty returns true if the cache context stack is empty.
func (cs *ContextStack) IsEmpty() bool {
return len(cs.cachedContexts) == 0
}

// Commit commits all the cached contexts from top to bottom in order and clears the stack by setting an empty slice of cache contexts.
func (cs *ContextStack) Commit() {
// commit in order from top to bottom
for i := len(cs.cachedContexts) - 1; i >= 0; i-- {
// keep all the cosmos events
cs.initialCtx.EventManager().EmitEvents(cs.cachedContexts[i].ctx.EventManager().Events())
if cs.cachedContexts[i].commit == nil {
panic(fmt.Sprintf("commit function at index %d should not be nil", i))
} else {
cs.cachedContexts[i].commit()
}
}
cs.cachedContexts = []cachedContext{}
}

// CommitToRevision commit the cache after the target revision,
// to improve efficiency of db operations.
func (cs *ContextStack) CommitToRevision(target int) error {
if target < 0 || target >= len(cs.cachedContexts) {
return fmt.Errorf("snapshot index %d out of bound [%d..%d)", target, 0, len(cs.cachedContexts))
}

targetCtx := cs.cachedContexts[target].ctx
// commit in order from top to bottom
for i := len(cs.cachedContexts) - 1; i > target; i-- {
// keep all the cosmos events
targetCtx.EventManager().EmitEvents(cs.cachedContexts[i].ctx.EventManager().Events())
if cs.cachedContexts[i].commit == nil {
return fmt.Errorf("commit function at index %d should not be nil", i)
}
cs.cachedContexts[i].commit()
}
cs.cachedContexts = cs.cachedContexts[0 : target+1]

return nil
}

// Snapshot pushes a new cached context to the stack,
// and returns the index of it.
func (cs *ContextStack) Snapshot() int {
i := len(cs.cachedContexts)
ctx, commit := cs.CurrentContext().CacheContext()
cs.cachedContexts = append(cs.cachedContexts, cachedContext{ctx: ctx, commit: commit})
return i
}

// RevertToSnapshot pops all the cached contexts after the target index (inclusive).
// the target should be snapshot index returned by `Snapshot`.
// This function panics if the index is out of bounds.
func (cs *ContextStack) RevertToSnapshot(target int) {
if target < 0 || target >= len(cs.cachedContexts) {
panic(fmt.Errorf("snapshot index %d out of bound [%d..%d)", target, 0, len(cs.cachedContexts)))
}
cs.cachedContexts = cs.cachedContexts[:target]
}

// RevertAll discards all the cache contexts.
func (cs *ContextStack) RevertAll() {
if len(cs.cachedContexts) > 0 {
cs.RevertToSnapshot(0)
}
}
46 changes: 22 additions & 24 deletions store/cachekv/mergeiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type cacheMergeIterator struct {
parent types.Iterator
cache types.Iterator
ascending bool

valid bool
}

var _ types.Iterator = (*cacheMergeIterator)(nil)
Expand All @@ -29,6 +31,7 @@ func newCacheMergeIterator(parent, cache types.Iterator, ascending bool) *cacheM
ascending: ascending,
}

iter.valid = iter.skipUntilExistsOrInvalid()
return iter
}

Expand All @@ -40,42 +43,38 @@ func (iter *cacheMergeIterator) Domain() (start, end []byte) {

// Valid implements Iterator.
func (iter *cacheMergeIterator) Valid() bool {
return iter.skipUntilExistsOrInvalid()
return iter.valid
}

// Next implements Iterator
func (iter *cacheMergeIterator) Next() {
iter.skipUntilExistsOrInvalid()
iter.assertValid()

// If parent is invalid, get the next cache item.
if !iter.parent.Valid() {
switch {
case !iter.parent.Valid():
// If parent is invalid, get the next cache item.
iter.cache.Next()
return
}

// If cache is invalid, get the next parent item.
if !iter.cache.Valid() {
case !iter.cache.Valid():
// If cache is invalid, get the next parent item.
iter.parent.Next()
return
}

// Both are valid. Compare keys.
keyP, keyC := iter.parent.Key(), iter.cache.Key()
switch iter.compare(keyP, keyC) {
case -1: // parent < cache
iter.parent.Next()
case 0: // parent == cache
iter.parent.Next()
iter.cache.Next()
case 1: // parent > cache
iter.cache.Next()
default:
// Both are valid. Compare keys.
keyP, keyC := iter.parent.Key(), iter.cache.Key()
switch iter.compare(keyP, keyC) {
case -1: // parent < cache
iter.parent.Next()
case 0: // parent == cache
iter.parent.Next()
iter.cache.Next()
case 1: // parent > cache
iter.cache.Next()
}
}
iter.valid = iter.skipUntilExistsOrInvalid()
}

// Key implements Iterator
func (iter *cacheMergeIterator) Key() []byte {
iter.skipUntilExistsOrInvalid()
iter.assertValid()

// If parent is invalid, get the cache key.
Expand Down Expand Up @@ -106,7 +105,6 @@ func (iter *cacheMergeIterator) Key() []byte {

// Value implements Iterator
func (iter *cacheMergeIterator) Value() []byte {
iter.skipUntilExistsOrInvalid()
iter.assertValid()

// If parent is invalid, get the cache value.
Expand Down

0 comments on commit 1874a29

Please sign in to comment.