diff --git a/store/v2/commitment/store_test_suite.go b/store/v2/commitment/store_test_suite.go index edbce27116c9..18b4ef4e2cc0 100644 --- a/store/v2/commitment/store_test_suite.go +++ b/store/v2/commitment/store_test_suite.go @@ -221,6 +221,83 @@ func (s *CommitStoreTestSuite) TestStore_Pruning() { } } +func (s *CommitStoreTestSuite) TestStore_GetProof() { + storeKeys := []string{storeKey1, storeKey2} + commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, nil, coretesting.NewNopLogger()) + s.Require().NoError(err) + + toVersion := uint64(10) + keyCount := 5 + + // commit some changes + for version := uint64(1); version <= toVersion; version++ { + cs := corestore.NewChangeset() + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false) + } + } + err := commitStore.WriteChangeset(cs) + s.Require().NoError(err) + _, err = commitStore.Commit(version) + s.Require().NoError(err) + } + + // get proof + for version := uint64(1); version <= toVersion; version++ { + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + _, err := commitStore.GetProof([]byte(storeKey), version, []byte(fmt.Sprintf("key-%d-%d", version, i))) + s.Require().NoError(err) + } + } + } + + // prune version 1 + s.Require().NoError(commitStore.Prune(1)) + + // check if proof for version 1 is pruned + _, err = commitStore.GetProof([]byte(storeKeys[0]), 1, []byte(fmt.Sprintf("key-%d-%d", 1, 0))) + s.Require().Error(err) + // check the commit info + commit, _ := commitStore.GetCommitInfo(1) + s.Require().Nil(commit) +} + +func (s *CommitStoreTestSuite) TestStore_Get() { + storeKeys := []string{storeKey1, storeKey2} + commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, nil, coretesting.NewNopLogger()) + s.Require().NoError(err) + + toVersion := uint64(10) + keyCount := 5 + + // commit some changes + for version := uint64(1); version <= toVersion; version++ { + cs := corestore.NewChangeset() + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false) + } + } + err := commitStore.WriteChangeset(cs) + s.Require().NoError(err) + _, err = commitStore.Commit(version) + s.Require().NoError(err) + } + + // get proof + for version := uint64(1); version <= toVersion; version++ { + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + val, err := commitStore.Get([]byte(storeKey), version, []byte(fmt.Sprintf("key-%d-%d", version, i))) + s.Require().NoError(err) + s.Require().Equal([]byte(fmt.Sprintf("value-%d-%d", version, i)), val) + } + } + } +} + func (s *CommitStoreTestSuite) TestStore_Upgrades() { storeKeys := []string{storeKey1, storeKey2, storeKey3} commitDB := dbm.NewMemDB() diff --git a/store/v2/migration/manager_test.go b/store/v2/migration/manager_test.go index ae00d5c08bbb..4e04f6c40514 100644 --- a/store/v2/migration/manager_test.go +++ b/store/v2/migration/manager_test.go @@ -1,6 +1,7 @@ package migration import ( + "encoding/binary" "fmt" "testing" @@ -117,3 +118,103 @@ func TestMigrateState(t *testing.T) { }) } } + +func TestStartMigrateState(t *testing.T) { + for _, noCommitStore := range []bool{false, true} { + t.Run(fmt.Sprintf("Migrate noCommitStore=%v", noCommitStore), func(t *testing.T) { + m, orgCommitStore := setupMigrationManager(t, noCommitStore) + + chDone := make(chan struct{}) + chChangeset := make(chan *VersionedChangeset, 1) + + // apply changeset + toVersion := uint64(10) + keyCount := 5 + changesets := []corestore.Changeset{} + + for version := uint64(1); version <= toVersion; version++ { + cs := corestore.NewChangeset() + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false) + } + } + changesets = append(changesets, *cs) + require.NoError(t, orgCommitStore.WriteChangeset(cs)) + _, err := orgCommitStore.Commit(version) + require.NoError(t, err) + } + + // feed changesets to channel + go func() { + for version := uint64(1); version <= toVersion; version++ { + chChangeset <- &VersionedChangeset{ + Version: version, + Changeset: &changesets[version-1], + } + } + }() + + // check if migrate process complete + go func() { + for { + migrateVersion := m.GetMigratedVersion() + if migrateVersion == toVersion-1 { + break + } + } + + chDone <- struct{}{} + }() + + err := m.Start(toVersion-1, chChangeset, chDone) + require.NoError(t, err) + + // expecting error for conflicting process, since Migrate trigger snapshotter create migration, + // which start a snapshot process already. + _, err = m.snapshotsManager.Create(toVersion - 1) + require.Error(t, err) + + if m.stateCommitment != nil { + // check the migrated state + for version := uint64(1); version < toVersion; version++ { + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + val, err := m.stateCommitment.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) + require.NoError(t, err) + require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val) + } + } + } + // check the latest state + val, err := m.stateCommitment.Get([]byte("store1"), toVersion-1, []byte("key-100-1")) + require.NoError(t, err) + require.Nil(t, val) + val, err = m.stateCommitment.Get([]byte("store2"), toVersion-1, []byte("key-100-0")) + require.NoError(t, err) + require.Nil(t, val) + } + + // check the storage + for version := uint64(1); version < toVersion; version++ { + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + val, err := m.stateStorage.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) + require.NoError(t, err) + require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val) + } + } + } + + // check if migration db write change set to storage + for version := uint64(1); version < toVersion; version++ { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, version) + csKey := []byte(fmt.Sprintf(migrateChangesetKeyFmt, buf)) + csVal, err := m.db.Get(csKey) + require.NoError(t, err) + require.NotEmpty(t, csVal) + } + }) + } +} diff --git a/store/v2/pruning/manager_test.go b/store/v2/pruning/manager_test.go index 9f96b2a48360..c5d137f1d09e 100644 --- a/store/v2/pruning/manager_test.go +++ b/store/v2/pruning/manager_test.go @@ -152,3 +152,107 @@ func TestPruningOption(t *testing.T) { }) } } + +func (s *PruningManagerTestSuite) TestSignalCommit() { + // commit version 1 + cs := corestore.NewChangeset() + for _, storeKey := range storeKeys { + cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", 1, 0)), []byte(fmt.Sprintf("value-%d-%d", 1, 0)), false) + } + + s.Require().NoError(s.sc.WriteChangeset(cs)) + _, err := s.sc.Commit(1) + s.Require().NoError(err) + + s.Require().NoError(s.ss.ApplyChangeset(1, cs)) + + // commit version 2 + for _, storeKey := range storeKeys { + cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", 2, 0)), []byte(fmt.Sprintf("value-%d-%d", 2, 0)), false) + } + + // signaling commit has started + s.Require().NoError(s.manager.SignalCommit(true, 2)) + + s.Require().NoError(s.sc.WriteChangeset(cs)) + _, err = s.sc.Commit(2) + s.Require().NoError(err) + + s.Require().NoError(s.ss.ApplyChangeset(2, cs)) + + // try prune before signaling commit has finished + s.Require().NoError(s.manager.Prune(2)) + + // proof is removed no matter SignalCommit has not yet inform that commit process has finish + // since commitInfo is remove async with tree data + checkSCPrune := func() bool { + count := 0 + for _, storeKey := range storeKeys { + _, err := s.sc.GetProof([]byte(storeKey), 1, []byte(fmt.Sprintf("key-%d-%d", 1, 0))) + if err != nil { + count++ + } + } + + return count == len(storeKeys) + } + s.Require().Eventually(checkSCPrune, 10*time.Second, 1*time.Second) + + // data from state commitment should not be pruned since we haven't signal the commit process has finished + val, err := s.sc.Get([]byte(storeKeys[0]), 1, []byte(fmt.Sprintf("key-%d-%d", 1, 0))) + s.Require().NoError(err) + s.Require().Equal(val, []byte(fmt.Sprintf("value-%d-%d", 1, 0))) + + // signaling commit has finished, version 1 should be pruned + s.Require().NoError(s.manager.SignalCommit(false, 2)) + + checkSCPrune = func() bool { + count := 0 + for _, storeKey := range storeKeys { + _, err := s.sc.GetProof([]byte(storeKey), 1, []byte(fmt.Sprintf("key-%d-%d", 1, 0))) + if err != nil { + count++ + } + } + + return count == len(storeKeys) + } + s.Require().Eventually(checkSCPrune, 10*time.Second, 1*time.Second) + + // try with signal commit start and finish accordingly + // commit changesets with pruning + toVersion := uint64(100) + keyCount := 10 + for version := uint64(3); version <= toVersion; version++ { + cs := corestore.NewChangeset() + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false) + } + } + s.Require().NoError(s.manager.SignalCommit(true, version)) + + s.Require().NoError(s.sc.WriteChangeset(cs)) + _, err := s.sc.Commit(version) + s.Require().NoError(err) + + s.Require().NoError(s.ss.ApplyChangeset(version, cs)) + + s.Require().NoError(s.manager.SignalCommit(false, version)) + + } + + // wait for the pruning to finish in the commitment store + checkSCPrune = func() bool { + count := 0 + for _, storeKey := range storeKeys { + _, err := s.sc.GetProof([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", toVersion-1, 0))) + if err != nil { + count++ + } + } + + return count == len(storeKeys) + } + s.Require().Eventually(checkSCPrune, 10*time.Second, 1*time.Second) +} diff --git a/store/v2/root/store_test.go b/store/v2/root/store_test.go index fa4d25648a2a..eddaea730652 100644 --- a/store/v2/root/store_test.go +++ b/store/v2/root/store_test.go @@ -643,7 +643,7 @@ func (s *RootStoreTestSuite) TestMultiStore_PruningRestart() { return false } // wait for async pruning process to finish - s.Require().Eventually(checkErr, 5*time.Second, 100*time.Millisecond, "expected error when loading height: %d", v) + s.Require().Eventually(checkErr, 10*time.Second, 1*time.Second, "expected error when loading height: %d", v) } } diff --git a/store/v2/snapshots/helpers_test.go b/store/v2/snapshots/helpers_test.go index 129704325c6a..657fd5c6f1ba 100644 --- a/store/v2/snapshots/helpers_test.go +++ b/store/v2/snapshots/helpers_test.go @@ -120,6 +120,7 @@ func (m *mockCommitSnapshotter) Restore( var item snapshotstypes.SnapshotItem m.items = [][]byte{} + keyCount := 0 for { item.Reset() err := protoReader.ReadMsg(&item) @@ -133,6 +134,19 @@ func (m *mockCommitSnapshotter) Restore( break } m.items = append(m.items, payload.Payload) + // mock feeding chStorage to check if the loop closed properly + // + // ref: https://github.com/cosmos/cosmos-sdk/pull/21106 + chStorage <- &corestore.StateChanges{ + Actor: []byte("actor"), + StateChanges: []corestore.KVPair{ + { + Key: []byte(fmt.Sprintf("key-%d", keyCount)), + Value: payload.Payload, + }, + }, + } + keyCount++ } return item, nil @@ -155,9 +169,19 @@ func (m *mockCommitSnapshotter) SupportedFormats() []uint32 { return []uint32{snapshotstypes.CurrentFormat} } -type mockStorageSnapshotter struct{} +type mockStorageSnapshotter struct { + items map[string][]byte +} func (m *mockStorageSnapshotter) Restore(version uint64, chStorage <-chan *corestore.StateChanges) error { + // mock consuming chStorage to check if the loop closed properly + // + // ref: https://github.com/cosmos/cosmos-sdk/pull/21106 + for change := range chStorage { + for _, kv := range change.StateChanges { + m.items[string(kv.Key)] = kv.Value + } + } return nil } diff --git a/store/v2/snapshots/manager_test.go b/store/v2/snapshots/manager_test.go index 4fd33e1b36fa..2ecec5660066 100644 --- a/store/v2/snapshots/manager_test.go +++ b/store/v2/snapshots/manager_test.go @@ -2,7 +2,9 @@ package snapshots_test import ( "errors" + "fmt" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -129,8 +131,9 @@ func TestManager_Prune(t *testing.T) { func TestManager_Restore(t *testing.T) { store := setupStore(t) target := &mockCommitSnapshotter{} + storageSnapshotter := &mockStorageSnapshotter{items: map[string][]byte{}} extSnapshotter := newExtSnapshotter(0) - manager := snapshots.NewManager(store, opts, target, &mockStorageSnapshotter{}, nil, coretesting.NewNopLogger()) + manager := snapshots.NewManager(store, opts, target, storageSnapshotter, nil, coretesting.NewNopLogger()) err := manager.RegisterExtensions(extSnapshotter) require.NoError(t, err) @@ -203,6 +206,14 @@ func TestManager_Restore(t *testing.T) { assert.Equal(t, expectItems, target.items) assert.Equal(t, 10, len(extSnapshotter.state)) + // make sure storageSnapshotter items are properly stored + for i, item := range target.items { + key := fmt.Sprintf("key-%d", i) + chunk := storageSnapshotter.items[key] + require.NotNil(t, chunk) + require.Equal(t, item, chunk) + } + // The snapshot is saved in local snapshot store snapshots, err := store.List() require.NoError(t, err) @@ -265,10 +276,12 @@ func TestSnapshot_Take_Restore(t *testing.T) { commitSnapshotter := &mockCommitSnapshotter{ items: items, } + storageSnapshotter := &mockStorageSnapshotter{items: map[string][]byte{}} + extSnapshotter := newExtSnapshotter(10) expectChunks := snapshotItems(items, extSnapshotter) - manager := snapshots.NewManager(store, opts, commitSnapshotter, &mockStorageSnapshotter{}, nil, coretesting.NewNopLogger()) + manager := snapshots.NewManager(store, opts, commitSnapshotter, storageSnapshotter, nil, coretesting.NewNopLogger()) err := manager.RegisterExtensions(extSnapshotter) require.NoError(t, err) @@ -418,3 +431,107 @@ func TestSnapshot_Take_Prune(t *testing.T) { _, err = manager.Prune(2) require.Error(t, err) } + +func TestSnapshot_Pruning_Take_Snapshot_Parallel(t *testing.T) { + store := setupStore(t) + + items := [][]byte{ + {1, 2, 3}, + {4, 5, 6}, + {7, 8, 9}, + } + commitSnapshotter := &mockCommitSnapshotter{ + items: items, + } + extSnapshotter := newExtSnapshotter(10) + + expectChunks := snapshotItems(items, extSnapshotter) + manager := snapshots.NewManager(store, opts, commitSnapshotter, &mockStorageSnapshotter{}, nil, coretesting.NewNopLogger()) + err := manager.RegisterExtensions(extSnapshotter) + require.NoError(t, err) + + var prunedCount uint64 + // try take snapshot and pruning parallel while prune operation begins first + go func() { + checkError := func() bool { + _, err := manager.Create(4) + return err != nil + } + + require.Eventually(t, checkError, time.Millisecond*200, time.Millisecond) + }() + + prunedCount, err = manager.Prune(1) + require.NoError(t, err) + assert.EqualValues(t, 3, prunedCount) + + // creating a snapshot at a same height 4, should be true since we prune has finished + snapshot, err := manager.Create(4) + require.NoError(t, err) + + assert.Equal(t, &types.Snapshot{ + Height: 4, + Format: commitSnapshotter.SnapshotFormat(), + Chunks: 1, + Hash: []uint8{0xc5, 0xf7, 0xfe, 0xea, 0xd3, 0x4d, 0x3e, 0x87, 0xff, 0x41, 0xa2, 0x27, 0xfa, 0xcb, 0x38, 0x17, 0xa, 0x5, 0xeb, 0x27, 0x4e, 0x16, 0x5e, 0xf3, 0xb2, 0x8b, 0x47, 0xd1, 0xe6, 0x94, 0x7e, 0x8b}, + Metadata: types.Metadata{ + ChunkHashes: checksums(expectChunks), + }, + }, snapshot) + + // try take snapshot and pruning parallel while snapshot operation begins first + go func() { + checkError := func() bool { + _, err = manager.Prune(1) + return err != nil + } + + require.Eventually(t, checkError, time.Millisecond*200, time.Millisecond) + }() + + snapshot, err = manager.Create(5) + require.NoError(t, err) + + assert.Equal(t, &types.Snapshot{ + Height: 5, + Format: commitSnapshotter.SnapshotFormat(), + Chunks: 1, + Hash: []uint8{0xc5, 0xf7, 0xfe, 0xea, 0xd3, 0x4d, 0x3e, 0x87, 0xff, 0x41, 0xa2, 0x27, 0xfa, 0xcb, 0x38, 0x17, 0xa, 0x5, 0xeb, 0x27, 0x4e, 0x16, 0x5e, 0xf3, 0xb2, 0x8b, 0x47, 0xd1, 0xe6, 0x94, 0x7e, 0x8b}, + Metadata: types.Metadata{ + ChunkHashes: checksums(expectChunks), + }, + }, snapshot) +} + +func TestSnapshot_SnapshotIfApplicable(t *testing.T) { + store := setupStore(t) + + items := [][]byte{ + {1, 2, 3}, + {4, 5, 6}, + {7, 8, 9}, + } + commitSnapshotter := &mockCommitSnapshotter{ + items: items, + } + extSnapshotter := newExtSnapshotter(10) + + snapshotOpts := snapshots.NewSnapshotOptions(1, 1) + + manager := snapshots.NewManager(store, snapshotOpts, commitSnapshotter, &mockStorageSnapshotter{}, nil, coretesting.NewNopLogger()) + err := manager.RegisterExtensions(extSnapshotter) + require.NoError(t, err) + + manager.SnapshotIfApplicable(4) + + checkLatestHeight := func() bool { + latestSnapshot, _ := store.GetLatest() + return latestSnapshot.Height == 4 + } + + require.Eventually(t, checkLatestHeight, time.Second*10, time.Second) + + pruned, err := manager.Prune(1) + require.NoError(t, err) + require.Equal(t, uint64(0), pruned) +} diff --git a/store/v2/storage/storage_test_suite.go b/store/v2/storage/storage_test_suite.go index 2bc745221d75..6b4247fffc8d 100644 --- a/store/v2/storage/storage_test_suite.go +++ b/store/v2/storage/storage_test_suite.go @@ -640,6 +640,72 @@ func (s *StorageTestSuite) TestDatabase_Prune_KeepRecent() { s.Require().Equal([]byte("val200"), bz) } +func (s *StorageTestSuite) TestDatabase_Restore() { + db, err := s.NewDB(s.T().TempDir()) + s.Require().NoError(err) + defer db.Close() + + toVersion := uint64(10) + keyCount := 10 + + // for versions 1-10, set 10 keys + for v := uint64(1); v <= toVersion; v++ { + cs := corestore.NewChangesetWithPairs(map[string]corestore.KVPairs{storeKey1: {}}) + for i := 0; i < keyCount; i++ { + key := fmt.Sprintf("key%03d", i) + val := fmt.Sprintf("val%03d-%03d", i, v) + + cs.AddKVPair(storeKey1Bytes, corestore.KVPair{Key: []byte(key), Value: []byte(val)}) + } + + s.Require().NoError(db.ApplyChangeset(v, cs)) + } + + latestVersion, err := db.GetLatestVersion() + s.Require().NoError(err) + s.Require().Equal(uint64(10), latestVersion) + + chStorage := make(chan *corestore.StateChanges, 5) + + go func() { + for i := uint64(11); i <= 15; i++ { + kvPairs := []corestore.KVPair{} + for j := 0; j < keyCount; j++ { + key := fmt.Sprintf("key%03d-%03d", j, i) + val := fmt.Sprintf("val%03d-%03d", j, i) + + kvPairs = append(kvPairs, corestore.KVPair{Key: []byte(key), Value: []byte(val)}) + } + chStorage <- &corestore.StateChanges{ + Actor: storeKey1Bytes, + StateChanges: kvPairs, + } + } + close(chStorage) + }() + + // restore with snapshot version smaller than latest version + // should return an error + err = db.Restore(9, chStorage) + s.Require().Error(err) + + // restore + err = db.Restore(11, chStorage) + s.Require().NoError(err) + + // check the storage + for i := uint64(11); i <= 15; i++ { + for j := 0; j < keyCount; j++ { + key := fmt.Sprintf("key%03d-%03d", j, i) + val := fmt.Sprintf("val%03d-%03d", j, i) + + v, err := db.Get(storeKey1Bytes, 11, []byte(key)) + s.Require().NoError(err) + s.Require().Equal([]byte(val), v) + } + } +} + func (s *StorageTestSuite) TestUpgradable() { ss, err := s.NewDB(s.T().TempDir()) s.Require().NoError(err) diff --git a/tests/e2e/accounts/lockup/periodic_lockup_test_suite.go b/tests/e2e/accounts/lockup/periodic_lockup_test_suite.go index 8e36befcc091..948f5eb8a30b 100644 --- a/tests/e2e/accounts/lockup/periodic_lockup_test_suite.go +++ b/tests/e2e/accounts/lockup/periodic_lockup_test_suite.go @@ -1,7 +1,6 @@ package lockup import ( - "fmt" "testing" "time" @@ -155,7 +154,6 @@ func (s *E2ETestSuite) TestPeriodicLockingAccount() { // check if tracking is updated accordingly lockupAccountInfoResponse := s.queryLockupAccInfo(ctx, app, accountAddr) - fmt.Println(lockupAccountInfoResponse) delLocking := lockupAccountInfoResponse.DelegatedLocking require.True(t, delLocking.AmountOf("stake").Equal(math.NewInt(100))) })