forked from cosmos/cosmos-sdk
-
Notifications
You must be signed in to change notification settings - Fork 37
/
Copy pathstore.go
1061 lines (905 loc) · 32 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package rootmulti
import (
"bufio"
"compress/zlib"
"fmt"
"io"
"math"
"sort"
"strings"
"github.com/cosmos/cosmos-sdk/pruning"
pruningTypes "github.com/cosmos/cosmos-sdk/pruning/types"
"github.com/cosmos/cosmos-sdk/snapshots"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
"github.com/cosmos/cosmos-sdk/store/cachemulti"
"github.com/cosmos/cosmos-sdk/store/dbadapter"
"github.com/cosmos/cosmos-sdk/store/iavl"
"github.com/cosmos/cosmos-sdk/store/listenkv"
"github.com/cosmos/cosmos-sdk/store/mem"
"github.com/cosmos/cosmos-sdk/store/tracekv"
"github.com/cosmos/cosmos-sdk/store/transient"
"github.com/cosmos/cosmos-sdk/store/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
iavltree "github.com/cosmos/iavl"
protoio "github.com/gogo/protobuf/io"
gogotypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
dbm "github.com/tendermint/tm-db"
)
const (
latestVersionKey = "s/latest"
commitInfoKeyFmt = "s/%d" // s/<version>
// Do not change chunk size without new snapshot format (must be uniform across nodes)
snapshotChunkSize = uint64(10e6)
snapshotBufferSize = int(snapshotChunkSize)
snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit
)
// Store is composed of many CommitStores. Name contrasts with
// cacheMultiStore which is used for branching other MultiStores. It implements
// the CommitMultiStore interface.
type Store struct {
db dbm.DB
logger log.Logger
lastCommitInfo *types.CommitInfo
pruningManager *pruning.Manager
iavlCacheSize int
storesParams map[types.StoreKey]storeParams
stores map[types.StoreKey]types.CommitKVStore
keysByName map[string]types.StoreKey
lazyLoading bool
initialVersion int64
traceWriter io.Writer
traceContext types.TraceContext
interBlockCache types.MultiStorePersistentCache
listeners map[types.StoreKey][]types.WriteListener
}
var (
_ types.CommitMultiStore = (*Store)(nil)
_ types.Queryable = (*Store)(nil)
)
// NewStore returns a reference to a new Store object with the provided DB. The
// store will be created with a PruneNothing pruning strategy by default. After
// a store is created, KVStores must be mounted and finally LoadLatestVersion or
// LoadVersion must be called.
func NewStore(db dbm.DB, logger log.Logger) *Store {
return &Store{
db: db,
logger: logger,
iavlCacheSize: iavl.DefaultIAVLCacheSize,
storesParams: make(map[types.StoreKey]storeParams),
stores: make(map[types.StoreKey]types.CommitKVStore),
keysByName: make(map[string]types.StoreKey),
listeners: make(map[types.StoreKey][]types.WriteListener),
pruningManager: pruning.NewManager(logger),
}
}
// GetPruning fetches the pruning strategy from the root store.
func (rs *Store) GetPruning() *pruningTypes.PruningOptions {
return rs.pruningManager.GetOptions()
}
// SetPruning sets the pruning strategy on the root store and all the sub-stores.
// Note, calling SetPruning on the root store prior to LoadVersion or
// LoadLatestVersion performs a no-op as the stores aren't mounted yet.
func (rs *Store) SetPruning(pruningOpts *pruningTypes.PruningOptions) {
rs.pruningManager.SetOptions(pruningOpts)
}
// SetSnapshotInterval sets the interval at which the snapshots are taken.
// It is used by the store to determine which heights to retain until after the snapshot is complete.
func (rs *Store) SetSnapshotInterval(snapshotInterval uint64) {
rs.pruningManager.SetSnapshotInterval(snapshotInterval)
}
func (rs *Store) SetIAVLCacheSize(cacheSize int) {
rs.iavlCacheSize = cacheSize
}
// SetLazyLoading sets if the iavl store should be loaded lazily or not
func (rs *Store) SetLazyLoading(lazyLoading bool) {
rs.lazyLoading = lazyLoading
}
// GetStoreType implements Store.
func (rs *Store) GetStoreType() types.StoreType {
return types.StoreTypeMulti
}
// MountStoreWithDB implements CommitMultiStore.
func (rs *Store) MountStoreWithDB(key types.StoreKey, typ types.StoreType, db dbm.DB) {
if key == nil {
panic("MountIAVLStore() key cannot be nil")
}
if _, ok := rs.storesParams[key]; ok {
panic(fmt.Sprintf("store duplicate store key %v", key))
}
if _, ok := rs.keysByName[key.Name()]; ok {
panic(fmt.Sprintf("store duplicate store key name %v", key))
}
rs.storesParams[key] = storeParams{
key: key,
typ: typ,
db: db,
}
rs.keysByName[key.Name()] = key
}
// GetCommitStore returns a mounted CommitStore for a given StoreKey. If the
// store is wrapped in an inter-block cache, it will be unwrapped before returning.
func (rs *Store) GetCommitStore(key types.StoreKey) types.CommitStore {
return rs.GetCommitKVStore(key)
}
// GetCommitKVStore returns a mounted CommitKVStore for a given StoreKey. If the
// store is wrapped in an inter-block cache, it will be unwrapped before returning.
func (rs *Store) GetCommitKVStore(key types.StoreKey) types.CommitKVStore {
// If the Store has an inter-block cache, first attempt to lookup and unwrap
// the underlying CommitKVStore by StoreKey. If it does not exist, fallback to
// the main mapping of CommitKVStores.
if rs.interBlockCache != nil {
if store := rs.interBlockCache.Unwrap(key); store != nil {
return store
}
}
return rs.stores[key]
}
// GetCommitKVStores get all kv stores associated wit the multistore.
func (rs *Store) GetCommitKVStores() map[types.StoreKey]types.CommitKVStore {
return rs.stores
}
// LoadLatestVersionAndUpgrade implements CommitMultiStore
func (rs *Store) LoadLatestVersionAndUpgrade(upgrades *types.StoreUpgrades) error {
ver := getLatestVersion(rs.db)
return rs.loadVersion(ver, upgrades)
}
// LoadVersionAndUpgrade allows us to rename substores while loading an older version
func (rs *Store) LoadVersionAndUpgrade(ver int64, upgrades *types.StoreUpgrades) error {
return rs.loadVersion(ver, upgrades)
}
// LoadLatestVersion implements CommitMultiStore.
func (rs *Store) LoadLatestVersion() error {
ver := getLatestVersion(rs.db)
return rs.loadVersion(ver, nil)
}
// LoadVersion implements CommitMultiStore.
func (rs *Store) LoadVersion(ver int64) error {
return rs.loadVersion(ver, nil)
}
func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error {
infos := make(map[string]types.StoreInfo)
cInfo := &types.CommitInfo{}
// load old data if we are not version 0
if ver != 0 {
var err error
cInfo, err = getCommitInfo(rs.db, ver)
if err != nil {
return err
}
// convert StoreInfos slice to map
for _, storeInfo := range cInfo.StoreInfos {
infos[storeInfo.Name] = storeInfo
}
}
// load each Store (note this doesn't panic on unmounted keys now)
var newStores = make(map[types.StoreKey]types.CommitKVStore)
storesKeys := make([]types.StoreKey, 0, len(rs.storesParams))
for key := range rs.storesParams {
storesKeys = append(storesKeys, key)
}
if upgrades != nil {
// deterministic iteration order for upgrades
// (as the underlying store may change and
// upgrades make store changes where the execution order may matter)
sort.Slice(storesKeys, func(i, j int) bool {
return storesKeys[i].Name() < storesKeys[j].Name()
})
}
for _, key := range storesKeys {
storeParams := rs.storesParams[key]
commitID := rs.getCommitID(infos, key.Name())
// If it has been added, set the initial version
if upgrades.IsAdded(key.Name()) {
storeParams.initialVersion = uint64(ver) + 1
}
store, err := rs.loadCommitStoreFromParams(key, commitID, storeParams)
if err != nil {
return errors.Wrap(err, "failed to load store")
}
newStores[key] = store
// If it was deleted, remove all data
if upgrades.IsDeleted(key.Name()) {
if err := deleteKVStore(store.(types.KVStore)); err != nil {
return errors.Wrapf(err, "failed to delete store %s", key.Name())
}
} else if oldName := upgrades.RenamedFrom(key.Name()); oldName != "" {
// handle renames specially
// make an unregistered key to satify loadCommitStore params
oldKey := types.NewKVStoreKey(oldName)
oldParams := storeParams
oldParams.key = oldKey
// load from the old name
oldStore, err := rs.loadCommitStoreFromParams(oldKey, rs.getCommitID(infos, oldName), oldParams)
if err != nil {
return errors.Wrapf(err, "failed to load old store %s", oldName)
}
// move all data
if err := moveKVStoreData(oldStore.(types.KVStore), store.(types.KVStore)); err != nil {
return errors.Wrapf(err, "failed to move store %s -> %s", oldName, key.Name())
}
}
}
rs.lastCommitInfo = cInfo
rs.stores = newStores
// load any pruned heights we missed from disk to be pruned on the next run
if err := rs.pruningManager.LoadPruningHeights(rs.db); err != nil {
return err
}
return nil
}
func (rs *Store) getCommitID(infos map[string]types.StoreInfo, name string) types.CommitID {
info, ok := infos[name]
if !ok {
return types.CommitID{}
}
return info.CommitId
}
func deleteKVStore(kv types.KVStore) error {
// Note that we cannot write while iterating, so load all keys here, delete below
var keys [][]byte
itr := kv.Iterator(nil, nil)
for itr.Valid() {
keys = append(keys, itr.Key())
itr.Next()
}
itr.Close()
for _, k := range keys {
kv.Delete(k)
}
return nil
}
// we simulate move by a copy and delete
func moveKVStoreData(oldDB types.KVStore, newDB types.KVStore) error {
// we read from one and write to another
itr := oldDB.Iterator(nil, nil)
for itr.Valid() {
newDB.Set(itr.Key(), itr.Value())
itr.Next()
}
itr.Close()
// then delete the old store
return deleteKVStore(oldDB)
}
// PruneSnapshotHeight prunes the given height according to the prune strategy.
// If PruneNothing, this is a no-op.
// If other strategy, this height is persisted until it is
// less than <current height> - KeepRecent and <current height> % Interval == 0
func (rs *Store) PruneSnapshotHeight(height int64) {
rs.pruningManager.HandleHeightSnapshot(height)
}
// SetInterBlockCache sets the Store's internal inter-block (persistent) cache.
// When this is defined, all CommitKVStores will be wrapped with their respective
// inter-block cache.
func (rs *Store) SetInterBlockCache(c types.MultiStorePersistentCache) {
rs.interBlockCache = c
}
// SetTracer sets the tracer for the MultiStore that the underlying
// stores will utilize to trace operations. A MultiStore is returned.
func (rs *Store) SetTracer(w io.Writer) types.MultiStore {
rs.traceWriter = w
return rs
}
// SetTracingContext updates the tracing context for the MultiStore by merging
// the given context with the existing context by key. Any existing keys will
// be overwritten. It is implied that the caller should update the context when
// necessary between tracing operations. It returns a modified MultiStore.
func (rs *Store) SetTracingContext(tc types.TraceContext) types.MultiStore {
if rs.traceContext != nil {
for k, v := range tc {
rs.traceContext[k] = v
}
} else {
rs.traceContext = tc
}
return rs
}
// TracingEnabled returns if tracing is enabled for the MultiStore.
func (rs *Store) TracingEnabled() bool {
return rs.traceWriter != nil
}
// AddListeners adds listeners for a specific KVStore
func (rs *Store) AddListeners(key types.StoreKey, listeners []types.WriteListener) {
if ls, ok := rs.listeners[key]; ok {
rs.listeners[key] = append(ls, listeners...)
} else {
rs.listeners[key] = listeners
}
}
// ListeningEnabled returns if listening is enabled for a specific KVStore
func (rs *Store) ListeningEnabled(key types.StoreKey) bool {
if ls, ok := rs.listeners[key]; ok {
return len(ls) != 0
}
return false
}
// LastCommitID implements Committer/CommitStore.
func (rs *Store) LastCommitID() types.CommitID {
if rs.lastCommitInfo == nil {
return types.CommitID{
Version: getLatestVersion(rs.db),
}
}
return rs.lastCommitInfo.CommitID()
}
// Commit implements Committer/CommitStore.
func (rs *Store) Commit() types.CommitID {
var previousHeight, version int64
if rs.lastCommitInfo.GetVersion() == 0 && rs.initialVersion > 1 {
// This case means that no commit has been made in the store, we
// start from initialVersion.
version = rs.initialVersion
} else {
// This case can means two things:
// - either there was already a previous commit in the store, in which
// case we increment the version from there,
// - or there was no previous commit, and initial version was not set,
// in which case we start at version 1.
previousHeight = rs.lastCommitInfo.GetVersion()
version = previousHeight + 1
}
rs.lastCommitInfo = rs.commitStores(version, rs.stores)
var pruneErr error
defer func() {
rs.flushMetadata(rs.db, version, rs.lastCommitInfo)
if pruneErr != nil {
panic(pruneErr)
}
}()
pruneErr = rs.handlePruning(version)
hash, keys := rs.lastCommitInfo.Hash()
rs.logger.Info("calculated commit hash", "height", version, "commit_hash", hash, "keys", keys)
return types.CommitID{
Version: version,
Hash: hash,
}
}
// CacheWrap implements CacheWrapper/Store/CommitStore.
func (rs *Store) CacheWrap() types.CacheWrap {
return rs.CacheMultiStore().(types.CacheWrap)
}
// CacheWrapWithTrace implements the CacheWrapper interface.
func (rs *Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.CacheWrap {
return rs.CacheWrap()
}
// CacheWrapWithListeners implements the CacheWrapper interface.
func (rs *Store) CacheWrapWithListeners(_ types.StoreKey, _ []types.WriteListener) types.CacheWrap {
return rs.CacheWrap()
}
// CacheMultiStore creates ephemeral branch of the multi-store and returns a CacheMultiStore.
// It implements the MultiStore interface.
func (rs *Store) CacheMultiStore() types.CacheMultiStore {
stores := make(map[types.StoreKey]types.CacheWrapper)
for k, v := range rs.stores {
stores[k] = v
}
return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.traceContext, rs.listeners)
}
// CacheMultiStoreWithVersion is analogous to CacheMultiStore except that it
// attempts to load stores at a given version (height). An error is returned if
// any store cannot be loaded. This should only be used for querying and
// iterating at past heights.
func (rs *Store) CacheMultiStoreWithVersion(version int64) (types.CacheMultiStore, error) {
cachedStores := make(map[types.StoreKey]types.CacheWrapper)
for key, store := range rs.stores {
switch store.GetStoreType() {
case types.StoreTypeIAVL:
// If the store is wrapped with an inter-block cache, we must first unwrap
// it to get the underlying IAVL store.
store = rs.GetCommitKVStore(key)
// Attempt to lazy-load an already saved IAVL store version. If the
// version does not exist or is pruned, an error should be returned.
iavlStore, err := store.(*iavl.Store).GetImmutable(version)
if err != nil {
return nil, err
}
cachedStores[key] = iavlStore
default:
cachedStores[key] = store
}
}
return cachemulti.NewStore(rs.db, cachedStores, rs.keysByName, rs.traceWriter, rs.traceContext, rs.listeners), nil
}
// GetStore returns a mounted Store for a given StoreKey. If the StoreKey does
// not exist, it will panic. If the Store is wrapped in an inter-block cache, it
// will be unwrapped prior to being returned.
//
// TODO: This isn't used directly upstream. Consider returning the Store as-is
// instead of unwrapping.
func (rs *Store) GetStore(key types.StoreKey) types.Store {
store := rs.GetCommitKVStore(key)
if store == nil {
panic(fmt.Sprintf("store does not exist for key: %s", key.Name()))
}
return store
}
// GetKVStore returns a mounted KVStore for a given StoreKey. If tracing is
// enabled on the KVStore, a wrapped TraceKVStore will be returned with the root
// store's tracer, otherwise, the original KVStore will be returned.
//
// NOTE: The returned KVStore may be wrapped in an inter-block cache if it is
// set on the root store.
func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore {
s := rs.stores[key]
if s == nil {
panic(fmt.Sprintf("store does not exist for key: %s", key.Name()))
}
store := s.(types.KVStore)
if rs.TracingEnabled() {
store = tracekv.NewStore(store, rs.traceWriter, rs.traceContext)
}
if rs.ListeningEnabled(key) {
store = listenkv.NewStore(store, key, rs.listeners[key])
}
return store
}
func (rs *Store) handlePruning(version int64) error {
rs.pruningManager.HandleHeight(version - 1) // we should never prune the current version.
if rs.pruningManager.ShouldPruneAtHeight(version) {
rs.logger.Info("prune start", "height", version)
pruningHeights := rs.pruningManager.GetPruningHeights()
rs.logger.Debug(fmt.Sprintf("pruning the following heights: %v\n", pruningHeights))
if len(pruningHeights) == 0 {
return nil
}
for key, store := range rs.stores {
if store.GetStoreType() == types.StoreTypeIAVL {
// If the store is wrapped with an inter-block cache, we must first unwrap
// it to get the underlying IAVL store.
store = rs.GetCommitKVStore(key)
if err := store.(*iavl.Store).DeleteVersions(pruningHeights...); err != nil {
if errCause := errors.Cause(err); errCause != nil && errCause != iavltree.ErrVersionDoesNotExist {
return err
}
}
}
}
rs.pruningManager.ResetPruningHeights()
rs.logger.Info("prune end", "height", version)
return nil
}
return nil
}
// getStoreByName performs a lookup of a StoreKey given a store name typically
// provided in a path. The StoreKey is then used to perform a lookup and return
// a Store. If the Store is wrapped in an inter-block cache, it will be unwrapped
// prior to being returned. If the StoreKey does not exist, nil is returned.
func (rs *Store) getStoreByName(name string) types.Store {
key := rs.keysByName[name]
if key == nil {
return nil
}
return rs.GetCommitKVStore(key)
}
// Query calls substore.Query with the same `req` where `req.Path` is
// modified to remove the substore prefix.
// Ie. `req.Path` here is `/<substore>/<path>`, and trimmed to `/<path>` for the substore.
// TODO: add proof for `multistore -> substore`.
func (rs *Store) Query(req abci.RequestQuery) abci.ResponseQuery {
path := req.Path
storeName, subpath, err := parsePath(path)
if err != nil {
return sdkerrors.QueryResult(err)
}
store := rs.getStoreByName(storeName)
if store == nil {
return sdkerrors.QueryResult(sdkerrors.Wrapf(sdkerrors.ErrUnknownRequest, "no such store: %s", storeName))
}
queryable, ok := store.(types.Queryable)
if !ok {
return sdkerrors.QueryResult(sdkerrors.Wrapf(sdkerrors.ErrUnknownRequest, "store %s (type %T) doesn't support queries", storeName, store))
}
// trim the path and make the query
req.Path = subpath
res := queryable.Query(req)
if !req.Prove || !RequireProof(subpath) {
return res
}
if res.ProofOps == nil || len(res.ProofOps.Ops) == 0 {
return sdkerrors.QueryResult(sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "proof is unexpectedly empty; ensure height has not been pruned"))
}
// If the request's height is the latest height we've committed, then utilize
// the store's lastCommitInfo as this commit info may not be flushed to disk.
// Otherwise, we query for the commit info from disk.
var commitInfo *types.CommitInfo
if res.Height == rs.lastCommitInfo.Version {
commitInfo = rs.lastCommitInfo
} else {
commitInfo, err = getCommitInfo(rs.db, res.Height)
if err != nil {
return sdkerrors.QueryResult(err)
}
}
// Restore origin path and append proof op.
res.ProofOps.Ops = append(res.ProofOps.Ops, commitInfo.ProofOp(storeName))
return res
}
// SetInitialVersion sets the initial version of the IAVL tree. It is used when
// starting a new chain at an arbitrary height.
func (rs *Store) SetInitialVersion(version int64) error {
rs.initialVersion = version
// Loop through all the stores, if it's an IAVL store, then set initial
// version on it.
for key, store := range rs.stores {
if store.GetStoreType() == types.StoreTypeIAVL {
// If the store is wrapped with an inter-block cache, we must first unwrap
// it to get the underlying IAVL store.
store = rs.GetCommitKVStore(key)
store.(*iavl.Store).SetInitialVersion(version)
}
}
return nil
}
// parsePath expects a format like /<storeName>[/<subpath>]
// Must start with /, subpath may be empty
// Returns error if it doesn't start with /
func parsePath(path string) (storeName string, subpath string, err error) {
if !strings.HasPrefix(path, "/") {
return storeName, subpath, sdkerrors.Wrapf(sdkerrors.ErrUnknownRequest, "invalid path: %s", path)
}
paths := strings.SplitN(path[1:], "/", 2)
storeName = paths[0]
if len(paths) == 2 {
subpath = "/" + paths[1]
}
return storeName, subpath, nil
}
//---------------------- Snapshotting ------------------
// Snapshot implements snapshottypes.Snapshotter. The snapshot output for a given format must be
// identical across nodes such that chunks from different sources fit together. If the output for a
// given format changes (at the byte level), the snapshot format must be bumped - see
// TestMultistoreSnapshot_Checksum test.
func (rs *Store) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
if format != snapshottypes.CurrentFormat {
return nil, sdkerrors.Wrapf(snapshottypes.ErrUnknownFormat, "format %v", format)
}
if height == 0 {
return nil, sdkerrors.Wrap(sdkerrors.ErrLogic, "cannot snapshot height 0")
}
if height > uint64(getLatestVersion(rs.db)) {
return nil, sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot snapshot future height %v", height)
}
// Collect stores to snapshot (only IAVL stores are supported)
type namedStore struct {
*iavl.Store
name string
}
stores := []namedStore{}
for key := range rs.stores {
switch store := rs.GetCommitKVStore(key).(type) {
case *iavl.Store:
stores = append(stores, namedStore{name: key.Name(), Store: store})
case *transient.Store, *mem.Store:
// Non-persisted stores shouldn't be snapshotted
continue
default:
return nil, sdkerrors.Wrapf(sdkerrors.ErrLogic,
"don't know how to snapshot store %q of type %T", key.Name(), store)
}
}
sort.Slice(stores, func(i, j int) bool {
return strings.Compare(stores[i].name, stores[j].name) == -1
})
// Spawn goroutine to generate snapshot chunks and pass their io.ReadClosers through a channel
ch := make(chan io.ReadCloser)
go func() {
// Set up a stream pipeline to serialize snapshot nodes:
// ExportNode -> delimited Protobuf -> zlib -> buffer -> chunkWriter -> chan io.ReadCloser
chunkWriter := snapshots.NewChunkWriter(ch, snapshotChunkSize)
defer chunkWriter.Close()
bufWriter := bufio.NewWriterSize(chunkWriter, snapshotBufferSize)
defer func() {
if err := bufWriter.Flush(); err != nil {
chunkWriter.CloseWithError(err)
}
}()
zWriter, err := zlib.NewWriterLevel(bufWriter, 7)
if err != nil {
chunkWriter.CloseWithError(sdkerrors.Wrap(err, "zlib failure"))
return
}
defer func() {
if err := zWriter.Close(); err != nil {
chunkWriter.CloseWithError(err)
}
}()
protoWriter := protoio.NewDelimitedWriter(zWriter)
defer func() {
if err := protoWriter.Close(); err != nil {
chunkWriter.CloseWithError(err)
}
}()
// Export each IAVL store. Stores are serialized as a stream of SnapshotItem Protobuf
// messages. The first item contains a SnapshotStore with store metadata (i.e. name),
// and the following messages contain a SnapshotNode (i.e. an ExportNode). Store changes
// are demarcated by new SnapshotStore items.
for _, store := range stores {
exporter, err := store.Export(int64(height))
if err != nil {
chunkWriter.CloseWithError(err)
return
}
defer exporter.Close()
err = protoWriter.WriteMsg(&types.SnapshotItem{
Item: &types.SnapshotItem_Store{
Store: &types.SnapshotStoreItem{
Name: store.name,
},
},
})
if err != nil {
chunkWriter.CloseWithError(err)
return
}
for {
node, err := exporter.Next()
if err == iavltree.ExportDone {
break
} else if err != nil {
chunkWriter.CloseWithError(err)
return
}
err = protoWriter.WriteMsg(&types.SnapshotItem{
Item: &types.SnapshotItem_IAVL{
IAVL: &types.SnapshotIAVLItem{
Key: node.Key,
Value: node.Value,
Height: int32(node.Height),
Version: node.Version,
},
},
})
if err != nil {
chunkWriter.CloseWithError(err)
return
}
}
exporter.Close()
}
}()
return ch, nil
}
// Restore implements snapshottypes.Snapshotter.
func (rs *Store) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
if format != snapshottypes.CurrentFormat {
return sdkerrors.Wrapf(snapshottypes.ErrUnknownFormat, "format %v", format)
}
if height == 0 {
return sdkerrors.Wrap(sdkerrors.ErrLogic, "cannot restore snapshot at height 0")
}
if height > uint64(math.MaxInt64) {
return sdkerrors.Wrapf(snapshottypes.ErrInvalidMetadata,
"snapshot height %v cannot exceed %v", height, int64(math.MaxInt64))
}
// Signal readiness. Must be done before the readers below are set up, since the zlib
// reader reads from the stream on initialization, potentially causing deadlocks.
if ready != nil {
close(ready)
}
// Set up a restore stream pipeline
// chan io.ReadCloser -> chunkReader -> zlib -> delimited Protobuf -> ExportNode
chunkReader := snapshots.NewChunkReader(chunks)
defer chunkReader.Close()
zReader, err := zlib.NewReader(chunkReader)
if err != nil {
return sdkerrors.Wrap(err, "zlib failure")
}
defer zReader.Close()
protoReader := protoio.NewDelimitedReader(zReader, snapshotMaxItemSize)
defer protoReader.Close()
// Import nodes into stores. The first item is expected to be a SnapshotItem containing
// a SnapshotStoreItem, telling us which store to import into. The following items will contain
// SnapshotNodeItem (i.e. ExportNode) until we reach the next SnapshotStoreItem or EOF.
var importer *iavltree.Importer
for {
item := &types.SnapshotItem{}
err := protoReader.ReadMsg(item)
if err == io.EOF {
break
} else if err != nil {
return sdkerrors.Wrap(err, "invalid protobuf message")
}
switch item := item.Item.(type) {
case *types.SnapshotItem_Store:
if importer != nil {
err = importer.Commit()
if err != nil {
return sdkerrors.Wrap(err, "IAVL commit failed")
}
importer.Close()
}
store, ok := rs.getStoreByName(item.Store.Name).(*iavl.Store)
if !ok || store == nil {
return sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot import into non-IAVL store %q", item.Store.Name)
}
importer, err = store.Import(int64(height))
if err != nil {
return sdkerrors.Wrap(err, "import failed")
}
defer importer.Close()
case *types.SnapshotItem_IAVL:
if importer == nil {
return sdkerrors.Wrap(sdkerrors.ErrLogic, "received IAVL node item before store item")
}
if item.IAVL.Height > math.MaxInt8 {
return sdkerrors.Wrapf(sdkerrors.ErrLogic, "node height %v cannot exceed %v",
item.IAVL.Height, math.MaxInt8)
}
node := &iavltree.ExportNode{
Key: item.IAVL.Key,
Value: item.IAVL.Value,
Height: int8(item.IAVL.Height),
Version: item.IAVL.Version,
}
// Protobuf does not differentiate between []byte{} as nil, but fortunately IAVL does
// not allow nil keys nor nil values for leaf nodes, so we can always set them to empty.
if node.Key == nil {
node.Key = []byte{}
}
if node.Height == 0 && node.Value == nil {
node.Value = []byte{}
}
err := importer.Add(node)
if err != nil {
return sdkerrors.Wrap(err, "IAVL node import failed")
}
default:
return sdkerrors.Wrapf(sdkerrors.ErrLogic, "unknown snapshot item %T", item)
}
}
if importer != nil {
err := importer.Commit()
if err != nil {
return sdkerrors.Wrap(err, "IAVL commit failed")
}
importer.Close()
}
rs.flushMetadata(rs.db, int64(height), rs.buildCommitInfo(int64(height)))
return rs.LoadLatestVersion()
}
func (rs *Store) loadCommitStoreFromParams(key types.StoreKey, id types.CommitID, params storeParams) (types.CommitKVStore, error) {
var db dbm.DB
if params.db != nil {
db = dbm.NewPrefixDB(params.db, []byte("s/_/"))
} else {
prefix := "s/k:" + params.key.Name() + "/"
db = dbm.NewPrefixDB(rs.db, []byte(prefix))
}
switch params.typ {
case types.StoreTypeMulti:
panic("recursive MultiStores not yet supported")
case types.StoreTypeIAVL:
var store types.CommitKVStore
var err error
if params.initialVersion == 0 {
store, err = iavl.LoadStore(db, rs.logger, key, id, rs.lazyLoading, rs.iavlCacheSize)
} else {
store, err = iavl.LoadStoreWithInitialVersion(db, rs.logger, key, id, rs.lazyLoading, params.initialVersion, rs.iavlCacheSize)
}
if err != nil {
return nil, err
}
if rs.interBlockCache != nil {
// Wrap and get a CommitKVStore with inter-block caching. Note, this should
// only wrap the primary CommitKVStore, not any store that is already
// branched as that will create unexpected behavior.
store = rs.interBlockCache.GetStoreCache(key, store)
}
return store, err
case types.StoreTypeDB:
return commitDBStoreAdapter{Store: dbadapter.Store{DB: db}}, nil
case types.StoreTypeTransient:
_, ok := key.(*types.TransientStoreKey)
if !ok {
return nil, fmt.Errorf("invalid StoreKey for StoreTypeTransient: %s", key.String())
}
return transient.NewStore(), nil
case types.StoreTypeMemory:
if _, ok := key.(*types.MemoryStoreKey); !ok {
return nil, fmt.Errorf("unexpected key type for a MemoryStoreKey; got: %s", key.String())
}
return mem.NewStore(), nil
default:
panic(fmt.Sprintf("unrecognized store type %v", params.typ))
}
}
func (rs *Store) buildCommitInfo(version int64) *types.CommitInfo {
storeInfos := []types.StoreInfo{}
for key, store := range rs.stores {
if store.GetStoreType() == types.StoreTypeTransient {
continue
}
storeInfos = append(storeInfos, types.StoreInfo{
Name: key.Name(),
CommitId: store.LastCommitID(),
})
}
return &types.CommitInfo{
Version: version,
StoreInfos: storeInfos,
}
}
// Commits each store and returns a new commitInfo.
func (rs *Store) commitStores(version int64, storeMap map[types.StoreKey]types.CommitKVStore) *types.CommitInfo {
storeInfos := make([]types.StoreInfo, 0, len(storeMap))
for key, store := range storeMap {
commitID := store.Commit()
rs.logger.Info("commit kvstore", "height", commitID.Version, "key", key, "commit_store_hash", commitID.Hash)
if store.GetStoreType() == types.StoreTypeTransient {
continue
}
si := types.StoreInfo{}
si.Name = key.Name()
si.CommitId = commitID
storeInfos = append(storeInfos, si)
}
return &types.CommitInfo{
Version: version,
StoreInfos: storeInfos,
}
}
func (rs *Store) flushMetadata(db dbm.DB, version int64, cInfo *types.CommitInfo) {
rs.logger.Debug("flushing metadata", "height", version)
batch := db.NewBatch()
defer batch.Close()
flushCommitInfo(batch, version, cInfo)
flushLatestVersion(batch, version)
rs.pruningManager.FlushPruningHeights(batch)
if err := batch.Write(); err != nil {
panic(fmt.Errorf("error on batch write %w", err))
}
rs.logger.Debug("flushing metadata finished", "height", version)
}