From b5630ae3f7d76df40543941ef41a11e12c1f69d2 Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Wed, 13 May 2020 21:39:55 -0700 Subject: [PATCH] etcdserver/*, wal/*: find valid snapshots by cross checking snap files and wal snap entries --- etcdserver/api/snap/snapshotter.go | 47 ++++++++++---------- etcdserver/api/snap/snapshotter_test.go | 47 +++++++++++++++++--- etcdserver/raft.go | 2 + etcdserver/server.go | 41 +++++++----------- etcdserver/storage.go | 43 ++++++------------- wal/wal.go | 57 +++++++++++++++++++++++++ wal/wal_test.go | 57 +++++++++++++++++++++++++ 7 files changed, 208 insertions(+), 86 deletions(-) diff --git a/etcdserver/api/snap/snapshotter.go b/etcdserver/api/snap/snapshotter.go index 227c92f481e0..7a577dd84af8 100644 --- a/etcdserver/api/snap/snapshotter.go +++ b/etcdserver/api/snap/snapshotter.go @@ -30,6 +30,7 @@ import ( "go.etcd.io/etcd/v3/pkg/pbutil" "go.etcd.io/etcd/v3/raft" "go.etcd.io/etcd/v3/raft/raftpb" + "go.etcd.io/etcd/v3/wal/walpb" "go.uber.org/zap" ) @@ -38,7 +39,6 @@ const snapSuffix = ".snap" var ( ErrNoSnapshot = errors.New("snap: no available snapshot") - ErrSnapshotIndex = errors.New("snap: no available snapshot index") ErrEmptySnapshot = errors.New("snap: empty snapshot") ErrCRCMismatch = errors.New("snap: crc mismatch") crcTable = crc32.MakeTable(crc32.Castagnoli) @@ -103,38 +103,37 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error { return nil } +// Load returns the newest snapshot. func (s *Snapshotter) Load() (*raftpb.Snapshot, error) { - names, err := s.snapNames() - if err != nil { - return nil, err - } - var snap *raftpb.Snapshot - for _, name := range names { - if snap, err = loadSnap(s.lg, s.dir, name); err == nil { - break + return s.loadMatching(func(*raftpb.Snapshot) bool { return true }) +} + +// LoadNewestAvailable loads the newest snapshot available that is in walSnaps. +func (s *Snapshotter) LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error) { + return s.loadMatching(func(snapshot *raftpb.Snapshot) bool { + m := snapshot.Metadata + for i := len(walSnaps) - 1; i >= 0; i-- { + if m.Term == walSnaps[i].Term && m.Index == walSnaps[i].Index { + return true + } } - } - if err != nil { - return nil, ErrNoSnapshot - } - return snap, nil + return false + }) } -func (s *Snapshotter) LoadIndex(i uint64) (*raftpb.Snapshot, error) { +// loadMatching returns the newest snapshot where matchFn returns true. +func (s *Snapshotter) loadMatching(matchFn func(*raftpb.Snapshot) bool) (*raftpb.Snapshot, error) { names, err := s.snapNames() if err != nil { return nil, err } - - if len(names) == 0 { - return nil, ErrNoSnapshot - } - - if i >= uint64(len(names)) { - return nil, ErrSnapshotIndex + var snap *raftpb.Snapshot + for _, name := range names { + if snap, err = loadSnap(s.lg, s.dir, name); err == nil && matchFn(snap) { + return snap, nil + } } - - return loadSnap(s.lg, s.dir, names[i]) + return nil, ErrNoSnapshot } func loadSnap(lg *zap.Logger, dir, name string) (*raftpb.Snapshot, error) { diff --git a/etcdserver/api/snap/snapshotter_test.go b/etcdserver/api/snap/snapshotter_test.go index 7f7ac7c7e312..648fdadfd9fb 100644 --- a/etcdserver/api/snap/snapshotter_test.go +++ b/etcdserver/api/snap/snapshotter_test.go @@ -24,6 +24,8 @@ import ( "testing" "go.etcd.io/etcd/v3/raft/raftpb" + "go.etcd.io/etcd/v3/wal/walpb" + "go.uber.org/zap" ) @@ -166,12 +168,47 @@ func TestLoadNewestSnap(t *testing.T) { t.Fatal(err) } - g, err := ss.Load() - if err != nil { - t.Errorf("err = %v, want nil", err) + cases := []struct { + name string + availableWalSnaps []walpb.Snapshot + expected *raftpb.Snapshot + }{ + { + name: "load-newest", + expected: &newSnap, + }, + { + name: "loadnewestavailable-newest", + availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}, {Index: 5, Term: 1}}, + expected: &newSnap, + }, + { + name: "loadnewestavailable-newest-unsorted", + availableWalSnaps: []walpb.Snapshot{{Index: 5, Term: 1}, {Index: 1, Term: 1}, {Index: 0, Term: 0}}, + expected: &newSnap, + }, + { + name: "loadnewestavailable-previous", + availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}}, + expected: testSnap, + }, } - if !reflect.DeepEqual(g, &newSnap) { - t.Errorf("snap = %#v, want %#v", g, &newSnap) + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + var err error + var g *raftpb.Snapshot + if tc.availableWalSnaps != nil { + g, err = ss.LoadNewestAvailable(tc.availableWalSnaps) + } else { + g, err = ss.Load() + } + if err != nil { + t.Errorf("err = %v, want nil", err) + } + if !reflect.DeepEqual(g, tc.expected) { + t.Errorf("snap = %#v, want %#v", g, tc.expected) + } + }) } } diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 1361f59c469b..e28b033b76c5 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -227,6 +227,8 @@ func (r *raftNode) start(rh *raftReadyHandler) { r.transport.Send(r.processMessages(rd.Messages)) } + // Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to + // ensure that recovery after a snapshot restore is possible. if !raft.IsEmptySnap(rd.Snapshot) { // gofail: var raftBeforeSaveSnap struct{} if err := r.storage.SaveSnap(rd.Snapshot); err != nil { diff --git a/etcdserver/server.go b/etcdserver/server.go index 26419d0106b4..5e1925ce090d 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -29,6 +29,11 @@ import ( "sync/atomic" "time" + "github.com/coreos/go-semver/semver" + humanize "github.com/dustin/go-humanize" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" + "go.etcd.io/etcd/v3/auth" "go.etcd.io/etcd/v3/etcdserver/api" "go.etcd.io/etcd/v3/etcdserver/api/membership" @@ -59,11 +64,6 @@ import ( "go.etcd.io/etcd/v3/raft/raftpb" "go.etcd.io/etcd/v3/version" "go.etcd.io/etcd/v3/wal" - - "github.com/coreos/go-semver/semver" - humanize "github.com/dustin/go-humanize" - "github.com/prometheus/client_golang/prometheus" - "go.uber.org/zap" ) const ( @@ -416,24 +416,15 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { } // Find a snapshot to start/restart a raft node - for i := uint64(0); ; i++ { - snapshot, err = ss.LoadIndex(i) - if err != nil && err != snap.ErrNoSnapshot { - return nil, err - } - - if err == snap.ErrNoSnapshot { - break - } - - if checkWALSnap(cfg.Logger, cfg.WALDir(), snapshot) { - break - } - - cfg.Logger.Info( - "skip snapshot", - zap.Uint64("index", i), - ) + walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir()) + if err != nil { + return nil, err + } + // snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding + // wal log entries + snapshot, err := ss.LoadNewestAvailable(walSnaps) + if err != nil && err != snap.ErrNoSnapshot { + return nil, err } if snapshot != nil { @@ -2168,12 +2159,10 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { } lg.Panic("failed to create snapshot", zap.Error(err)) } - // SaveSnap saves the snapshot and releases the locked wal files - // to the snapshot index. + // SaveSnap saves the snapshot to file and appends the corresponding WAL entry. if err = s.r.storage.SaveSnap(snap); err != nil { lg.Panic("failed to save snapshot", zap.Error(err)) } - if err = s.r.storage.Release(snap); err != nil { lg.Panic("failed to release wal", zap.Error(err)) } diff --git a/etcdserver/storage.go b/etcdserver/storage.go index 3376a825a95f..15367566df3a 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -36,7 +36,7 @@ type Storage interface { SaveSnap(snap raftpb.Snapshot) error // Close closes the Storage and performs finalization. Close() error - // Release release release the locked wal files since they will not be used. + // Release releases the locked wal files older than the provided snapshot. Release(snap raftpb.Snapshot) error // Sync WAL Sync() error @@ -51,51 +51,32 @@ func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage { return &storage{w, s} } -// SaveSnap saves the snapshot to disk and release the locked -// wal files since they will not be used. +// SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry. func (st *storage) SaveSnap(snap raftpb.Snapshot) error { walsnap := walpb.Snapshot{ Index: snap.Metadata.Index, Term: snap.Metadata.Term, } - err := st.WAL.SaveSnapshot(walsnap) + // save the snapshot file before writing the snapshot to the wal. + // This makes it possible for the snapshot file to become orphaned, but prevents + // a WAL snapshot entry from having no corresponding snapshot file. + err := st.Snapshotter.SaveSnap(snap) if err != nil { return err } + // gofail: var raftBeforeWALSaveSnaphot struct{} - return st.Snapshotter.SaveSnap(snap) + return st.WAL.SaveSnapshot(walsnap) } +// Release release the locks to the wal files that are older than the provided wal for the given snap. func (st *storage) Release(snap raftpb.Snapshot) error { return st.WAL.ReleaseLockTo(snap.Metadata.Index) } -func checkWALSnap(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot) bool { - if snapshot == nil { - lg.Fatal("checkWALSnap: snapshot is empty") - } - - walsnap := walpb.Snapshot{ - Index: snapshot.Metadata.Index, - Term: snapshot.Metadata.Term, - } - - w, _, _, st, _ := readWAL(lg, waldir, walsnap) - defer w.Close() - - lg.Info( - "checkWALSnap: snapshot and hardstate data", - zap.Uint64("snapshot-index", snapshot.Metadata.Index), - zap.Uint64("st-commit", st.Commit), - ) - - if snapshot.Metadata.Index > st.Commit { - return false - } - - return true -} - +// readWAL reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear +// after the position of the given snap in the WAL. +// The snap must have been previously saved to the WAL, or this call will panic. func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) { var ( err error diff --git a/wal/wal.go b/wal/wal.go index 17df41e26652..f106da4aa9c5 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -531,6 +531,63 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. return metadata, state, ents, err } +// ValidSnapshotEntries returns all the valid snapshot entries in the wal logs in the given directory. +// Snapshot entries are valid if their index is less than or equal to the most recent committed hardstate. +func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, error) { + var snaps []walpb.Snapshot + var state raftpb.HardState + var err error + + rec := &walpb.Record{} + names, err := readWALNames(lg, walDir) + if err != nil { + return nil, err + } + + // open wal files in read mode, so that there is no conflict + // when the same WAL is opened elsewhere in write mode + rs, _, closer, err := openWALFiles(lg, walDir, names, 0, false) + if err != nil { + return nil, err + } + defer func() { + if closer != nil { + closer() + } + }() + + // create a new decoder from the readers on the WAL files + decoder := newDecoder(rs...) + + for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) { + switch rec.Type { + case snapshotType: + var loadedSnap walpb.Snapshot + pbutil.MustUnmarshal(&loadedSnap, rec.Data) + snaps = append(snaps, loadedSnap) + case stateType: + state = mustUnmarshalState(rec.Data) + } + } + // We do not have to read out all the WAL entries + // as the decoder is opened in read mode. + if err != io.EOF && err != io.ErrUnexpectedEOF { + return nil, err + } + + // filter out any snaps that have a higher term and index than the newest hardstate + n := 0 + for _, s := range snaps { + if s.Index <= state.Commit { + snaps[n] = s + n++ + } + } + snaps = snaps[:n] + + return snaps, nil +} + // Verify reads through the given WAL and verifies that it is not corrupted. // It creates a new decoder to read through the records of the given WAL. // It does not conflict with any open WAL, but it is recommended not to diff --git a/wal/wal_test.go b/wal/wal_test.go index bc9fa1e6344a..07fb49feac13 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -1000,3 +1000,60 @@ func TestReadAllFail(t *testing.T) { t.Fatalf("err = %v, want ErrDecoderNotFound", err) } } + +// TestValidSnapshotEntries ensures ValidSnapshotEntries returns all valid wal snapshot entries, accounting +// for hardstate +func TestValidSnapshotEntries(t *testing.T) { + p, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(p) + snap0 := walpb.Snapshot{Index: 0, Term: 0} + snap1 := walpb.Snapshot{Index: 1, Term: 1} + snap2 := walpb.Snapshot{Index: 2, Term: 1} + snap3 := walpb.Snapshot{Index: 3, Term: 2} + snap4 := walpb.Snapshot{Index: 4, Term: 2} + func() { + w, err := Create(zap.NewExample(), p, nil) + if err != nil { + t.Fatal(err) + } + defer w.Close() + + // snap0 is implicitly created at index 0, term 0 + if err = w.SaveSnapshot(snap1); err != nil { + t.Fatal(err) + } + state := raftpb.HardState{Commit: 1, Term: 1} + if err = w.Save(state, nil); err != nil { + t.Fatal(err) + } + if err = w.SaveSnapshot(snap2); err != nil { + t.Fatal(err) + } + if err = w.SaveSnapshot(snap3); err != nil { + t.Fatal(err) + } + state2 := raftpb.HardState{Commit: 3, Term: 2} + if err = w.Save(state2, nil); err != nil { + t.Fatal(err) + } + if err = w.SaveSnapshot(snap4); err != nil { + t.Fatal(err) + } + }() + walSnaps, err := ValidSnapshotEntries(zap.NewExample(), p) + if err != nil { + t.Fatal(err) + } + expected := []walpb.Snapshot{snap0, snap1, snap2, snap3} + if len(walSnaps) != len(expected) { + t.Fatalf("expected 4 walSnaps, got %d", len(expected)) + } + for i := 0; i < len(expected); i++ { + if walSnaps[i].Index != expected[i].Index || walSnaps[i].Term != expected[i].Term { + t.Errorf("expected walSnaps %+v at index %d, got %+v", expected[i], i, walSnaps[i]) + } + } +}