diff --git a/CHANGELOG.md b/CHANGELOG.md index 627045e2f2f4..ae68bfc1653f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * (deps) Bump Tendermint version to [v0.34.23](https://github.com/tendermint/tendermint/releases/tag/v0.34.23). * (deps) Bump IAVL version to [v0.19.4](https://github.com/cosmos/iavl/releases/tag/v0.19.4). +* (snapshot) [#13400](https://github.com/cosmos/cosmos-sdk/pull/13400) Fix snapshot checksum issue in golang 1.19. ### Bug Fixes @@ -55,6 +56,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ ### API Breaking Changes * [#13673](https://github.com/cosmos/cosmos-sdk/pull/13673) The `GetFromFields` function now takes `Context` as an argument and removes `genOnly`. +* (store) [#11825](https://github.com/cosmos/cosmos-sdk/pull/11825) Make extension snapshotter interface safer to use, renamed the util function `WriteExtensionItem` to `WriteExtensionPayload`. ## v0.45.10 - 2022-10-24 diff --git a/baseapp/abci.go b/baseapp/abci.go index ec37ec65f20f..b9c401e0b7a7 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -308,6 +308,19 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliv // against that height and gracefully halt if it matches the latest committed // height. func (app *BaseApp) Commit() (res abci.ResponseCommit) { + res, snapshotHeight := app.CommitWithoutSnapshot() + + if snapshotHeight > 0 { + go app.Snapshot(snapshotHeight) + } + + return res +} + +// CommitWithoutSnapshot is like Commit but returns the snapshot information instead +// of starting the snapshot goroutine +// It can be used by apps to synchronize snapshots according to their requirements. +func (app *BaseApp) CommitWithoutSnapshot() (res abci.ResponseCommit, snapshotHeight int64) { defer telemetry.MeasureSince(time.Now(), "abci", "commit") header := app.deliverState.ctx.BlockHeader() @@ -348,13 +361,15 @@ func (app *BaseApp) Commit() (res abci.ResponseCommit) { } if app.snapshotInterval > 0 && uint64(header.Height)%app.snapshotInterval == 0 { - go app.snapshot(header.Height) + snapshotHeight = header.Height } - return abci.ResponseCommit{ + res = abci.ResponseCommit{ Data: commitID.Hash, RetainHeight: retainHeight, } + + return res, snapshotHeight } // halt attempts to gracefully shutdown the node via SIGINT and SIGTERM falling @@ -379,8 +394,9 @@ func (app *BaseApp) halt() { os.Exit(0) } -// snapshot takes a snapshot of the current state and prunes any old snapshottypes. -func (app *BaseApp) snapshot(height int64) { +// Snapshot takes a snapshot of the current state and prunes any old snapshottypes. +// It should be started as a goroutine +func (app *BaseApp) Snapshot(height int64) { if app.snapshotManager == nil { app.logger.Info("snapshot manager not configured") return diff --git a/baseapp/baseapp_test.go b/baseapp/baseapp_test.go index f9cdfcb71438..bf962439c096 100644 --- a/baseapp/baseapp_test.go +++ b/baseapp/baseapp_test.go @@ -167,7 +167,7 @@ func TestListSnapshots(t *testing.T) { app, _ := setupBaseAppWithSnapshots(t, 2, 5) expected := abci.ResponseListSnapshots{Snapshots: []*abci.Snapshot{ - {Height: 2, Format: 1, Chunks: 2}, + {Height: 2, Format: 2, Chunks: 2}, }} resp := app.ListSnapshots(abci.RequestListSnapshots{}) diff --git a/baseapp/deliver_tx_test.go b/baseapp/deliver_tx_test.go index c0126f9964f4..df2dea33a8a7 100644 --- a/baseapp/deliver_tx_test.go +++ b/baseapp/deliver_tx_test.go @@ -39,13 +39,13 @@ func TestLoadSnapshotChunk(t *testing.T) { chunk uint32 expectEmpty bool }{ - "Existing snapshot": {2, 1, 1, false}, - "Missing height": {100, 1, 1, true}, - "Missing format": {2, 2, 1, true}, - "Missing chunk": {2, 1, 9, true}, - "Zero height": {0, 1, 1, true}, + "Existing snapshot": {2, 2, 1, false}, + "Missing height": {100, 2, 1, true}, + "Missing format": {2, 3, 1, true}, + "Missing chunk": {2, 2, 9, true}, + "Zero height": {0, 2, 1, true}, "Zero format": {2, 0, 1, true}, - "Zero chunk": {2, 1, 0, false}, + "Zero chunk": {2, 2, 0, false}, } for name, tc := range testcases { tc := tc diff --git a/docs/architecture/adr-049-state-sync-hooks.md b/docs/architecture/adr-049-state-sync-hooks.md new file mode 100644 index 000000000000..5cc2b684c4df --- /dev/null +++ b/docs/architecture/adr-049-state-sync-hooks.md @@ -0,0 +1,174 @@ +# ADR 049: State Sync Hooks + +## Changelog + +- Jan 19, 2022: Initial Draft +- Apr 29, 2022: Safer extension snapshotter interface + +## Status + +Implemented + +## Abstract + +This ADR outlines a hooks-based mechanism for application modules to provide additional state (outside of the IAVL tree) to be used +during state sync. + +## Context + +New clients use state-sync to download snapshots of module state from peers. Currently, the snapshot consists of a +stream of `SnapshotStoreItem` and `SnapshotIAVLItem`, which means that application modules that define their state outside of the IAVL +tree cannot include their state as part of the state-sync process. + +Note, Even though the module state data is outside of the tree, for determinism we require that the hash of the external data should +be posted in the IAVL tree. + +## Decision + +A simple proposal based on our existing implementation is that, we can add two new message types: `SnapshotExtensionMeta` +and `SnapshotExtensionPayload`, and they are appended to the existing multi-store stream with `SnapshotExtensionMeta` +acting as a delimiter between extensions. As the chunk hashes should be able to ensure data integrity, we don't need +a delimiter to mark the end of the snapshot stream. + +Besides, we provide `Snapshotter` and `ExtensionSnapshotter` interface for modules to implement snapshotters, which will handle both taking +snapshot and the restoration. Each module could have mutiple snapshotters, and for modules with additional state, they should +implement `ExtensionSnapshotter` as extension snapshotters. When setting up the application, the snapshot `Manager` should call +`RegisterExtensions([]ExtensionSnapshotter…)` to register all the extension snapshotters. + +```proto +// SnapshotItem is an item contained in a rootmulti.Store snapshot. +// On top of the exsiting SnapshotStoreItem and SnapshotIAVLItem, we add two new options for the item. +message SnapshotItem { + // item is the specific type of snapshot item. + oneof item { + SnapshotStoreItem store = 1; + SnapshotIAVLItem iavl = 2 [(gogoproto.customname) = "IAVL"]; + SnapshotExtensionMeta extension = 3; + SnapshotExtensionPayload extension_payload = 4; + } +} + +// SnapshotExtensionMeta contains metadata about an external snapshotter. +// One module may need multiple snapshotters, so each module may have multiple SnapshotExtensionMeta. +message SnapshotExtensionMeta { + // the name of the ExtensionSnapshotter, and it is registered to snapshotter manager when setting up the application + // name should be unique for each ExtensionSnapshotter as we need to alphabetically order their snapshots to get + // deterministic snapshot stream. + string name = 1; + // this is used by each ExtensionSnapshotter to decide the format of payloads included in SnapshotExtensionPayload message + // it is used within the snapshotter/namespace, not global one for all modules + uint32 format = 2; +} + +// SnapshotExtensionPayload contains payloads of an external snapshotter. +message SnapshotExtensionPayload { + bytes payload = 1; +} +``` + +When we create a snapshot stream, the `multistore` snapshot is always placed at the beginning of the binary stream, and other extension snapshots are alphabetically ordered by the name of the corresponding `ExtensionSnapshotter`. + +The snapshot stream would look like as follows: + +```go +// multi-store snapshot +{SnapshotStoreItem | SnapshotIAVLItem, ...} +// extension1 snapshot +SnapshotExtensionMeta +{SnapshotExtensionPayload, ...} +// extension2 snapshot +SnapshotExtensionMeta +{SnapshotExtensionPayload, ...} +``` + +We add an `extensions` field to snapshot `Manager` for extension snapshotters. The `multistore` snapshotter is a special one and it doesn't need a name because it is always placed at the beginning of the binary stream. + +```go +type Manager struct { + store *Store + multistore types.Snapshotter + extensions map[string]types.ExtensionSnapshotter + mtx sync.Mutex + operation operation + chRestore chan<- io.ReadCloser + chRestoreDone <-chan restoreDone + restoreChunkHashes [][]byte + restoreChunkIndex uint32 +} +``` + +For extension snapshotters that implement the `ExtensionSnapshotter` interface, their names should be registered to the snapshot `Manager` by +calling `RegisterExtensions` when setting up the application. The snapshotters will handle both taking snapshot and restoration. + +```go +// RegisterExtensions register extension snapshotters to manager +func (m *Manager) RegisterExtensions(extensions ...types.ExtensionSnapshotter) error +``` + +On top of the existing `Snapshotter` interface for the `multistore`, we add `ExtensionSnapshotter` interface for the extension snapshotters. Three more function signatures: `SnapshotFormat()`, `SupportedFormats()` and `SnapshotName()` are added to `ExtensionSnapshotter`. + +```go +// ExtensionPayloadReader read extension payloads, +// it returns io.EOF when reached either end of stream or the extension boundaries. +type ExtensionPayloadReader = func() ([]byte, error) + +// ExtensionPayloadWriter is a helper to write extension payloads to underlying stream. +type ExtensionPayloadWriter = func([]byte) error + +// ExtensionSnapshotter is an extension Snapshotter that is appended to the snapshot stream. +// ExtensionSnapshotter has an unique name and manages it's own internal formats. +type ExtensionSnapshotter interface { + // SnapshotName returns the name of snapshotter, it should be unique in the manager. + SnapshotName() string + + // SnapshotFormat returns the default format used to take a snapshot. + SnapshotFormat() uint32 + + // SupportedFormats returns a list of formats it can restore from. + SupportedFormats() []uint32 + + // SnapshotExtension writes extension payloads into the underlying protobuf stream. + SnapshotExtension(height uint64, payloadWriter ExtensionPayloadWriter) error + + // RestoreExtension restores an extension state snapshot, + // the payload reader returns `io.EOF` when reached the extension boundaries. + RestoreExtension(height uint64, format uint32, payloadReader ExtensionPayloadReader) error + +} +``` + +## Consequences + +As a result of this implementation, we are able to create snapshots of binary chunk stream for the state that we maintain outside of the IAVL Tree, CosmWasm blobs for example. And new clients are able to fetch sanpshots of state for all modules that have implemented the corresponding interface from peer nodes. + + +### Backwards Compatibility + +This ADR introduces new proto message types, add an `extensions` field in snapshot `Manager`, and add new `ExtensionSnapshotter` interface, so this is not backwards compatible if we have extensions. + +But for applications that does not have the state data outside of the IAVL tree for any module, the snapshot stream is backwards-compatible. + +### Positive + +- State maintained outside of IAVL tree like CosmWasm blobs can create snapshots by implementing extension snapshotters, and being fetched by new clients via state-sync. + +### Negative + +### Neutral + +- All modules that maintain state outside of IAVL tree need to implement `ExtensionSnapshotter` and the snapshot `Manager` need to call `RegisterExtensions` when setting up the application. + +## Further Discussions + +While an ADR is in the DRAFT or PROPOSED stage, this section should contain a summary of issues to be solved in future iterations (usually referencing comments from a pull-request discussion). +Later, this section can optionally list ideas or improvements the author or reviewers found during the analysis of this ADR. + +## Test Cases [optional] + +Test cases for an implementation are mandatory for ADRs that are affecting consensus changes. Other ADRs can choose to include links to test cases if applicable. + +## References + +- https://github.com/cosmos/cosmos-sdk/pull/10961 +- https://github.com/cosmos/cosmos-sdk/issues/7340 +- https://hackmd.io/gJoyev6DSmqqkO667WQlGw diff --git a/snapshots/helpers_test.go b/snapshots/helpers_test.go index 219dfd93e9ce..d84ae45c7ee2 100644 --- a/snapshots/helpers_test.go +++ b/snapshots/helpers_test.go @@ -18,6 +18,7 @@ import ( "github.com/cosmos/cosmos-sdk/snapshots" "github.com/cosmos/cosmos-sdk/snapshots/types" snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types" + sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" ) @@ -62,7 +63,7 @@ func readChunks(chunks <-chan io.ReadCloser) [][]byte { } // snapshotItems serialize a array of bytes as SnapshotItem_ExtensionPayload, and return the chunks. -func snapshotItems(items [][]byte) [][]byte { +func snapshotItems(items [][]byte, ext snapshottypes.ExtensionSnapshotter) [][]byte { // copy the same parameters from the code snapshotChunkSize := uint64(10e6) snapshotBufferSize := int(snapshotChunkSize) @@ -74,10 +75,21 @@ func snapshotItems(items [][]byte) [][]byte { zWriter, _ := zlib.NewWriterLevel(bufWriter, 7) protoWriter := protoio.NewDelimitedWriter(zWriter) for _, item := range items { - types.WriteExtensionItem(protoWriter, item) + types.WriteExtensionPayload(protoWriter, item) } + // write extension metadata + _ = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ + Item: &snapshottypes.SnapshotItem_Extension{ + Extension: &snapshottypes.SnapshotExtensionMeta{ + Name: ext.SnapshotName(), + Format: ext.SnapshotFormat(), + }, + }, + }) + _ = ext.SnapshotExtension(0, func(payload []byte) error { + return snapshottypes.WriteExtensionPayload(protoWriter, payload) + }) protoWriter.Close() - zWriter.Close() bufWriter.Flush() chunkWriter.Close() }() @@ -107,10 +119,11 @@ func (m *mockSnapshotter) Restore( return snapshottypes.SnapshotItem{}, errors.New("already has contents") } + var item snapshottypes.SnapshotItem m.items = [][]byte{} for { - item := &snapshottypes.SnapshotItem{} - err := protoReader.ReadMsg(item) + item.Reset() + err := protoReader.ReadMsg(&item) if err == io.EOF { break } else if err != nil { @@ -118,17 +131,17 @@ func (m *mockSnapshotter) Restore( } payload := item.GetExtensionPayload() if payload == nil { - return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "invalid protobuf message") + break } m.items = append(m.items, payload.Payload) } - return snapshottypes.SnapshotItem{}, nil + return item, nil } func (m *mockSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error { for _, item := range m.items { - if err := types.WriteExtensionItem(protoWriter, item); err != nil { + if err := types.WriteExtensionPayload(protoWriter, item); err != nil { return err } } @@ -136,11 +149,11 @@ func (m *mockSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) er } func (m *mockSnapshotter) SnapshotFormat() uint32 { - return 1 + return 2 } func (m *mockSnapshotter) SupportedFormats() []uint32 { - return []uint32{1} + return []uint32{2} } // setupBusyManager creates a manager with an empty store that is busy creating a snapshot at height 1. @@ -193,3 +206,52 @@ func (m *hungSnapshotter) Restore( ) (snapshottypes.SnapshotItem, error) { panic("not implemented") } + +type extSnapshotter struct { + state []uint64 +} + +func newExtSnapshotter(count int) *extSnapshotter { + state := make([]uint64, 0, count) + for i := 0; i < count; i++ { + state = append(state, uint64(i)) + } + return &extSnapshotter{ + state, + } +} + +func (s *extSnapshotter) SnapshotName() string { + return "mock" +} + +func (s *extSnapshotter) SnapshotFormat() uint32 { + return 1 +} + +func (s *extSnapshotter) SupportedFormats() []uint32 { + return []uint32{1} +} + +func (s *extSnapshotter) SnapshotExtension(height uint64, payloadWriter snapshottypes.ExtensionPayloadWriter) error { + for _, i := range s.state { + if err := payloadWriter(sdk.Uint64ToBigEndian(uint64(i))); err != nil { + return err + } + } + return nil +} + +func (s *extSnapshotter) RestoreExtension(height uint64, format uint32, payloadReader snapshottypes.ExtensionPayloadReader) error { + for { + payload, err := payloadReader() + if err == io.EOF { + break + } else if err != nil { + return err + } + s.state = append(s.state, sdk.BigEndianToUint64(payload)) + } + // finalize restoration + return nil +} diff --git a/snapshots/manager.go b/snapshots/manager.go index 74ea4f4d8676..1ec632718ecf 100644 --- a/snapshots/manager.go +++ b/snapshots/manager.go @@ -79,6 +79,9 @@ func NewManagerWithExtensions(store *Store, multistore types.Snapshotter, extens // RegisterExtensions register extension snapshotters to manager func (m *Manager) RegisterExtensions(extensions ...types.ExtensionSnapshotter) error { + if m.extensions == nil { + m.extensions = make(map[string]types.ExtensionSnapshotter, len(extensions)) + } for _, extension := range extensions { name := extension.SnapshotName() if _, ok := m.extensions[name]; ok { @@ -195,7 +198,10 @@ func (m *Manager) createSnapshot(height uint64, ch chan<- io.ReadCloser) { streamWriter.CloseWithError(err) return } - if err := extension.Snapshot(height, streamWriter); err != nil { + payloadWriter := func(payload []byte) error { + return types.WriteExtensionPayload(streamWriter, payload) + } + if err := extension.SnapshotExtension(height, payloadWriter); err != nil { streamWriter.CloseWithError(err) return } @@ -285,24 +291,40 @@ func (m *Manager) Restore(snapshot types.Snapshot) error { // restoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed. func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error { + var nextItem types.SnapshotItem + streamReader, err := NewStreamReader(chChunks) if err != nil { return err } defer streamReader.Close() - next, err := m.multistore.Restore(snapshot.Height, snapshot.Format, streamReader) + // payloadReader reads an extension payload for extension snapshotter, it returns `io.EOF` at extension boundaries. + payloadReader := func() ([]byte, error) { + nextItem.Reset() + if err := streamReader.ReadMsg(&nextItem); err != nil { + return nil, err + } + payload := nextItem.GetExtensionPayload() + if payload == nil { + return nil, io.EOF + } + return payload.Payload, nil + } + + nextItem, err = m.multistore.Restore(snapshot.Height, snapshot.Format, streamReader) if err != nil { return sdkerrors.Wrap(err, "multistore restore") } + for { - if next.Item == nil { + if nextItem.Item == nil { // end of stream break } - metadata := next.GetExtension() + metadata := nextItem.GetExtension() if metadata == nil { - return sdkerrors.Wrapf(sdkerrors.ErrLogic, "unknown snapshot item %T", next.Item) + return sdkerrors.Wrapf(sdkerrors.ErrLogic, "unknown snapshot item %T", nextItem.Item) } extension, ok := m.extensions[metadata.Name] if !ok { @@ -311,10 +333,14 @@ func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.Re if !IsFormatSupported(extension, metadata.Format) { return sdkerrors.Wrapf(types.ErrUnknownFormat, "format %v for extension %s", metadata.Format, metadata.Name) } - next, err = extension.Restore(snapshot.Height, metadata.Format, streamReader) - if err != nil { + + if err := extension.RestoreExtension(snapshot.Height, metadata.Format, payloadReader); err != nil { return sdkerrors.Wrapf(err, "extension %s restore", metadata.Name) } + + if nextItem.GetExtensionPayload() != nil { + return sdkerrors.Wrapf(err, "extension %s don't exhausted payload stream", metadata.Name) + } } return nil } diff --git a/snapshots/manager_test.go b/snapshots/manager_test.go index 256b5b2cc8fe..efd3a7f78dfd 100644 --- a/snapshots/manager_test.go +++ b/snapshots/manager_test.go @@ -61,11 +61,15 @@ func TestManager_Take(t *testing.T) { snapshotter := &mockSnapshotter{ items: items, } - expectChunks := snapshotItems(items) + extSnapshotter := newExtSnapshotter(10) + + expectChunks := snapshotItems(items, extSnapshotter) manager := snapshots.NewManager(store, snapshotter) + err := manager.RegisterExtensions(extSnapshotter) + require.NoError(t, err) // nil manager should return error - _, err := (*snapshots.Manager)(nil).Create(1) + _, err = (*snapshots.Manager)(nil).Create(1) require.Error(t, err) // creating a snapshot at a lower height than the latest should error @@ -79,7 +83,7 @@ func TestManager_Take(t *testing.T) { Height: 5, Format: snapshotter.SnapshotFormat(), Chunks: 1, - Hash: []uint8{0xcd, 0x17, 0x9e, 0x7f, 0x28, 0xb6, 0x82, 0x90, 0xc7, 0x25, 0xf3, 0x42, 0xac, 0x65, 0x73, 0x50, 0xaa, 0xa0, 0x10, 0x5c, 0x40, 0x8c, 0xd5, 0x1, 0xed, 0x82, 0xb5, 0xca, 0x8b, 0xe0, 0x83, 0xa2}, + Hash: []uint8{0xc5, 0xf7, 0xfe, 0xea, 0xd3, 0x4d, 0x3e, 0x87, 0xff, 0x41, 0xa2, 0x27, 0xfa, 0xcb, 0x38, 0x17, 0xa, 0x5, 0xeb, 0x27, 0x4e, 0x16, 0x5e, 0xf3, 0xb2, 0x8b, 0x47, 0xd1, 0xe6, 0x94, 0x7e, 0x8b}, Metadata: types.Metadata{ ChunkHashes: checksums(expectChunks), }, @@ -117,7 +121,10 @@ func TestManager_Prune(t *testing.T) { func TestManager_Restore(t *testing.T) { store := setupStore(t) target := &mockSnapshotter{} + extSnapshotter := newExtSnapshotter(0) manager := snapshots.NewManager(store, target) + err := manager.RegisterExtensions(extSnapshotter) + require.NoError(t, err) expectItems := [][]byte{ {1, 2, 3}, @@ -125,10 +132,10 @@ func TestManager_Restore(t *testing.T) { {7, 8, 9}, } - chunks := snapshotItems(expectItems) + chunks := snapshotItems(expectItems, newExtSnapshotter(10)) // Restore errors on invalid format - err := manager.Restore(types.Snapshot{ + err = manager.Restore(types.Snapshot{ Height: 3, Format: 0, Hash: []byte{1, 2, 3}, @@ -155,7 +162,7 @@ func TestManager_Restore(t *testing.T) { // Starting a restore works err = manager.Restore(types.Snapshot{ Height: 3, - Format: 1, + Format: 2, Hash: []byte{1, 2, 3}, Chunks: 1, Metadata: types.Metadata{ChunkHashes: checksums(chunks)}, @@ -186,6 +193,7 @@ func TestManager_Restore(t *testing.T) { } assert.Equal(t, expectItems, target.items) + assert.Equal(t, 10, len(extSnapshotter.state)) // Starting a new restore should fail now, because the target already has contents. err = manager.Restore(types.Snapshot{ @@ -203,7 +211,7 @@ func TestManager_Restore(t *testing.T) { target.items = nil err = manager.Restore(types.Snapshot{ Height: 3, - Format: 1, + Format: 2, Hash: []byte{1, 2, 3}, Chunks: 1, Metadata: types.Metadata{ChunkHashes: checksums(chunks)}, diff --git a/snapshots/stream.go b/snapshots/stream.go index 80cd5c3dfdcb..ba622ebfb5ae 100644 --- a/snapshots/stream.go +++ b/snapshots/stream.go @@ -57,10 +57,6 @@ func (sw *StreamWriter) Close() error { sw.chunkWriter.CloseWithError(err) return err } - if err := sw.zWriter.Close(); err != nil { - sw.chunkWriter.CloseWithError(err) - return err - } if err := sw.bufWriter.Flush(); err != nil { sw.chunkWriter.CloseWithError(err) return err diff --git a/snapshots/types/format.go b/snapshots/types/format.go index edfdb36d7bfc..d5e960660ac9 100644 --- a/snapshots/types/format.go +++ b/snapshots/types/format.go @@ -3,4 +3,4 @@ package types // CurrentFormat is the currently used format for snapshots. Snapshots using the same format // must be identical across all nodes for a given height, so this must be bumped when the binary // snapshot output changes. -const CurrentFormat uint32 = 1 +const CurrentFormat uint32 = 2 diff --git a/snapshots/types/snapshotter.go b/snapshots/types/snapshotter.go index f747920d13ad..1044d57b679d 100644 --- a/snapshots/types/snapshotter.go +++ b/snapshots/types/snapshotter.go @@ -11,17 +11,20 @@ type Snapshotter interface { // Snapshot writes snapshot items into the protobuf writer. Snapshot(height uint64, protoWriter protoio.Writer) error - // Restore restores a state snapshot from the protobuf items read from the reader. - // If the ready channel is non-nil, it returns a ready signal (by being closed) once the - // restorer is ready to accept chunks. + // Restore restores a state snapshot, taking the reader of protobuf message stream as input. Restore(height uint64, format uint32, protoReader protoio.Reader) (SnapshotItem, error) } +// ExtensionPayloadReader read extension payloads, +// it returns io.EOF when reached either end of stream or the extension boundaries. +type ExtensionPayloadReader = func() ([]byte, error) + +// ExtensionPayloadWriter is a helper to write extension payloads to underlying stream. +type ExtensionPayloadWriter = func([]byte) error + // ExtensionSnapshotter is an extension Snapshotter that is appended to the snapshot stream. // ExtensionSnapshotter has an unique name and manages it's own internal formats. type ExtensionSnapshotter interface { - Snapshotter - // SnapshotName returns the name of snapshotter, it should be unique in the manager. SnapshotName() string @@ -32,4 +35,11 @@ type ExtensionSnapshotter interface { // SupportedFormats returns a list of formats it can restore from. SupportedFormats() []uint32 + + // SnapshotExtension writes extension payloads into the underlying protobuf stream. + SnapshotExtension(height uint64, payloadWriter ExtensionPayloadWriter) error + + // RestoreExtension restores an extension state snapshot, + // the payload reader returns `io.EOF` when reached the extension boundaries. + RestoreExtension(height uint64, format uint32, payloadReader ExtensionPayloadReader) error } diff --git a/snapshots/types/util.go b/snapshots/types/util.go index 348b5057682d..e2d4949bf5c2 100644 --- a/snapshots/types/util.go +++ b/snapshots/types/util.go @@ -4,12 +4,12 @@ import ( protoio "github.com/gogo/protobuf/io" ) -// WriteExtensionItem writes an item payload for current extension snapshotter. -func WriteExtensionItem(protoWriter protoio.Writer, item []byte) error { +// WriteExtensionPayload writes an extension payload for current extension snapshotter. +func WriteExtensionPayload(protoWriter protoio.Writer, payload []byte) error { return protoWriter.WriteMsg(&SnapshotItem{ Item: &SnapshotItem_ExtensionPayload{ ExtensionPayload: &SnapshotExtensionPayload{ - Payload: item, + Payload: payload, }, }, }) diff --git a/store/rootmulti/snapshot_test.go b/store/rootmulti/snapshot_test.go index 7a3071940e99..48cd4ff8f4f1 100644 --- a/store/rootmulti/snapshot_test.go +++ b/store/rootmulti/snapshot_test.go @@ -128,7 +128,7 @@ func TestMultistoreSnapshot_Checksum(t *testing.T) { "aa048b4ee0f484965d7b3b06822cf0772cdcaad02f3b1b9055e69f2cb365ef3c", "7921eaa3ed4921341e504d9308a9877986a879fe216a099c86e8db66fcba4c63", "a4a864e6c02c9fca5837ec80dc84f650b25276ed7e4820cf7516ced9f9901b86", - "ca2879ac6e7205d257440131ba7e72bef784cd61642e32b847729e543c1928b9", + "980925390cc50f14998ecb1e87de719ca9dd7e72f5fefbe445397bf670f36c31", }}, } for _, tc := range testcases {