Skip to content

Commit

Permalink
fix: state listener could observe discarded writes (backport cosmos#1…
Browse files Browse the repository at this point in the history
…3459) (cosmos#13463)

* fix: state listener could observe discarded writes (cosmos#13459)

* fix: state listener could observe uncommitted writes

Closes: cosmos#13457

don't pass listeners to nested cached store,
only the most inner layer's cache writes should be observed.

* Update CHANGELOG.md

* add unit test

* rename

Co-authored-by: Marko <marbar3778@yahoo.com>
(cherry picked from commit bb54c59)

# Conflicts:
#	CHANGELOG.md

* fixes

* gofumpt

* gofumpt

* updates

Co-authored-by: yihuang <huang@crypto.com>
Co-authored-by: Julien Robert <julien@rbrt.fr>
Co-authored-by: marbar3778 <marbar3778@yahoo.com>
  • Loading branch information
4 people authored and JeancarloBarrios committed Sep 28, 2024
1 parent 1d6adca commit 93091e4
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 96 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ Ref: https://keepachangelog.com/en/1.0.0/
* [#13369](https://github.com/cosmos/cosmos-sdk/pull/13369) Improve UX for `keyring.List` by returning all retrieved keys.
* [#13323](https://github.com/cosmos/cosmos-sdk/pull/13323) Ensure `withdraw_rewards` rewards are emitted from all actions that result in rewards being withdrawn.
* [#13321](https://github.com/cosmos/cosmos-sdk/pull/13321) Add flag to disable fast node migration and usage.
* (store) [#13326](https://github.com/cosmos/cosmos-sdk/pull/13326) Implementation of ADR-038 file StreamingService, backport #8664.

### API Breaking Changes

- (cli) [#13089](https://github.com/cosmos/cosmos-sdk/pull/13089) Fix rollback command don't actually delete multistore versions, added method `RollbackToVersion` to interface `CommitMultiStore` and added method `CommitMultiStore` to `Application` interface.

### Improvements
### Bug Fixes

* (store) [\#13326](https://github.com/cosmos/cosmos-sdk/pull/13326) Implementation of ADR-038 file StreamingService, backport #8664
* (store) [#13459](https://github.com/cosmos/cosmos-sdk/pull/13459) Don't let state listener observe the uncommitted writes.

## v0.45.8 - 2022-08-25

Expand Down
6 changes: 5 additions & 1 deletion store/cachemulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func NewFromKVStore(
store types.KVStore, stores map[types.StoreKey]types.CacheWrapper,
keys map[string]types.StoreKey, traceWriter io.Writer, traceContext types.TraceContext,
) Store {
if listeners == nil {
listeners = make(map[types.StoreKey][]types.WriteListener)
}
cms := Store{
db: cachekv.NewStore(store),
stores: make(map[types.StoreKey]types.CacheWrap, len(stores)),
Expand Down Expand Up @@ -78,7 +81,8 @@ func newCacheMultiStoreFromCMS(cms Store) Store {
stores[k] = v
}

return NewFromKVStore(cms.db, stores, nil, cms.traceWriter, cms.traceContext)
// don't pass listeners to nested cache store.
return NewFromKVStore(cms.db, stores, nil, cms.traceWriter, cms.traceContext, nil)
}

// SetTracer sets the tracer for the MultiStore that the underlying
Expand Down
112 changes: 21 additions & 91 deletions store/rootmulti/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ type MockListener struct {
stateCache []types.StoreKVPair
}

func (tl *MockListener) OnWrite(storeKey types.StoreKey, key, value []byte, delete bool) error {
func (tl *MockListener) OnWrite(storeKey types.StoreKey, key []byte, value []byte, delete bool) error {
tl.stateCache = append(tl.stateCache, types.StoreKVPair{
StoreKey: storeKey.Name(),
Key: key,
Expand All @@ -823,106 +823,36 @@ func (tl *MockListener) OnWrite(storeKey types.StoreKey, key, value []byte, dele
}

func TestStateListeners(t *testing.T) {
db := coretesting.NewMemDB()
ms := newMultiStoreWithMounts(db, pruningtypes.NewPruningOptions(pruningtypes.PruningNothing))
require.Empty(t, ms.listeners)
var db dbm.DB = dbm.NewMemDB()
ms := newMultiStoreWithMounts(db, types.NewPruningOptionsFromString(types.PruningOptionNothing))

ms.AddListeners([]types.StoreKey{testStoreKey1})
require.Equal(t, 1, len(ms.listeners))
listener := &MockListener{}
ms.AddListeners(testStoreKey1, []types.WriteListener{listener})

require.NoError(t, ms.LoadLatestVersion())
cacheMulti := ms.CacheMultiStore()

store := cacheMulti.GetKVStore(testStoreKey1)
store.Set([]byte{1}, []byte{1})
require.Empty(t, ms.PopStateCache())
store1 := cacheMulti.GetKVStore(testStoreKey1)
store1.Set([]byte{1}, []byte{1})
require.Empty(t, listener.stateCache)

// writes are observed when cache store commit.
cacheMulti.Write()
require.Equal(t, 1, len(ms.PopStateCache()))

// test no listening on unobserved store
store = cacheMulti.GetKVStore(testStoreKey2)
store.Set([]byte{1}, []byte{1})
require.Empty(t, ms.PopStateCache())
require.Equal(t, 1, len(listener.stateCache))

// writes are not observed when cache store commit
cacheMulti.Write()
require.Empty(t, ms.PopStateCache())
}
// test nested cache store
listener.stateCache = []types.StoreKVPair{}
nested := cacheMulti.CacheMultiStore()

type commitKVStoreStub struct {
types.CommitKVStore
Committed int
}
store1 = nested.GetKVStore(testStoreKey1)
store1.Set([]byte{1}, []byte{1})
require.Empty(t, listener.stateCache)

func (stub *commitKVStoreStub) Commit() types.CommitID {
commitID := stub.CommitKVStore.Commit()
stub.Committed++
return commitID
}

func prepareStoreMap() (map[types.StoreKey]types.CommitKVStore, error) {
db := coretesting.NewMemDB()
store := NewStore(db, log.NewNopLogger(), metrics.NewNoOpMetrics())
store.MountStoreWithDB(types.NewKVStoreKey("iavl1"), types.StoreTypeIAVL, nil)
store.MountStoreWithDB(types.NewKVStoreKey("iavl2"), types.StoreTypeIAVL, nil)
store.MountStoreWithDB(types.NewTransientStoreKey("trans1"), types.StoreTypeTransient, nil)
if err := store.LoadLatestVersion(); err != nil {
return nil, err
}
return map[types.StoreKey]types.CommitKVStore{
testStoreKey1: &commitKVStoreStub{
CommitKVStore: store.GetStoreByName("iavl1").(types.CommitKVStore),
},
testStoreKey2: &commitKVStoreStub{
CommitKVStore: store.GetStoreByName("iavl2").(types.CommitKVStore),
},
testStoreKey3: &commitKVStoreStub{
CommitKVStore: store.GetStoreByName("trans1").(types.CommitKVStore),
},
}, nil
}
// writes are not observed when nested cache store commit
nested.Write()
require.Empty(t, listener.stateCache)

func TestCommitStores(t *testing.T) {
testCases := []struct {
name string
committed int
exptectCommit int
}{
{
"when upgrade not get interrupted",
0,
1,
},
{
"when upgrade get interrupted once",
1,
0,
},
{
"when upgrade get interrupted twice",
2,
0,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
storeMap, err := prepareStoreMap()
require.NoError(t, err)
store := storeMap[testStoreKey1].(*commitKVStoreStub)
for i := tc.committed; i > 0; i-- {
store.Commit()
}
store.Committed = 0
var version int64 = 1
removalMap := map[types.StoreKey]bool{}
res := commitStores(version, storeMap, removalMap)
for _, s := range res.StoreInfos {
require.Equal(t, version, s.CommitId.Version)
}
require.Equal(t, version, res.Version)
require.Equal(t, tc.exptectCommit, store.Committed)
})
}
// writes are observed when inner cache store commit
cacheMulti.Write()
require.Equal(t, 1, len(listener.stateCache))
}
2 changes: 1 addition & 1 deletion store/streaming/file/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestFileStreamingService(t *testing.T) {
if os.Getenv("CI") != "" {
t.Skip("Skipping TestFileStreamingService in CI environment")
}
err := os.Mkdir(testDir, 0700)
err := os.Mkdir(testDir, 0o700)
require.Nil(t, err)
defer os.RemoveAll(testDir)

Expand Down
3 changes: 2 additions & 1 deletion types/tx/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ const MaxGasWanted = uint64((1 << 63) - 1)

// Interface implementation checks.
var (
_, _, _, _ gogoprotoany.UnpackInterfacesMessage = &Tx{}, &TxBody{}, &AuthInfo{}, &SignerInfo{}
_, _, _, _ codectypes.UnpackInterfacesMessage = &Tx{}, &TxBody{}, &AuthInfo{}, &SignerInfo{}
_ sdk.Tx = &Tx{}
)

// GetMsgs implements the GetMsgs method on sdk.Tx.
Expand Down

0 comments on commit 93091e4

Please sign in to comment.