Skip to content

Commit

Permalink
etcdserver/*, wal/*: find valid snapshots by cross checking snap file…
Browse files Browse the repository at this point in the history
…s and wal snap entries
  • Loading branch information
jpbetz committed May 14, 2020
1 parent 5051703 commit b5630ae
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 86 deletions.
47 changes: 23 additions & 24 deletions etcdserver/api/snap/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.etcd.io/etcd/v3/pkg/pbutil"
"go.etcd.io/etcd/v3/raft"
"go.etcd.io/etcd/v3/raft/raftpb"
"go.etcd.io/etcd/v3/wal/walpb"

"go.uber.org/zap"
)
Expand All @@ -38,7 +39,6 @@ const snapSuffix = ".snap"

var (
ErrNoSnapshot = errors.New("snap: no available snapshot")
ErrSnapshotIndex = errors.New("snap: no available snapshot index")
ErrEmptySnapshot = errors.New("snap: empty snapshot")
ErrCRCMismatch = errors.New("snap: crc mismatch")
crcTable = crc32.MakeTable(crc32.Castagnoli)
Expand Down Expand Up @@ -103,38 +103,37 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
return nil
}

// Load returns the newest snapshot.
func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
names, err := s.snapNames()
if err != nil {
return nil, err
}
var snap *raftpb.Snapshot
for _, name := range names {
if snap, err = loadSnap(s.lg, s.dir, name); err == nil {
break
return s.loadMatching(func(*raftpb.Snapshot) bool { return true })
}

// LoadNewestAvailable loads the newest snapshot available that is in walSnaps.
func (s *Snapshotter) LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error) {
return s.loadMatching(func(snapshot *raftpb.Snapshot) bool {
m := snapshot.Metadata
for i := len(walSnaps) - 1; i >= 0; i-- {
if m.Term == walSnaps[i].Term && m.Index == walSnaps[i].Index {
return true
}
}
}
if err != nil {
return nil, ErrNoSnapshot
}
return snap, nil
return false
})
}

func (s *Snapshotter) LoadIndex(i uint64) (*raftpb.Snapshot, error) {
// loadMatching returns the newest snapshot where matchFn returns true.
func (s *Snapshotter) loadMatching(matchFn func(*raftpb.Snapshot) bool) (*raftpb.Snapshot, error) {
names, err := s.snapNames()
if err != nil {
return nil, err
}

if len(names) == 0 {
return nil, ErrNoSnapshot
}

if i >= uint64(len(names)) {
return nil, ErrSnapshotIndex
var snap *raftpb.Snapshot
for _, name := range names {
if snap, err = loadSnap(s.lg, s.dir, name); err == nil && matchFn(snap) {
return snap, nil
}
}

return loadSnap(s.lg, s.dir, names[i])
return nil, ErrNoSnapshot
}

func loadSnap(lg *zap.Logger, dir, name string) (*raftpb.Snapshot, error) {
Expand Down
47 changes: 42 additions & 5 deletions etcdserver/api/snap/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"testing"

"go.etcd.io/etcd/v3/raft/raftpb"
"go.etcd.io/etcd/v3/wal/walpb"

"go.uber.org/zap"
)

Expand Down Expand Up @@ -166,12 +168,47 @@ func TestLoadNewestSnap(t *testing.T) {
t.Fatal(err)
}

g, err := ss.Load()
if err != nil {
t.Errorf("err = %v, want nil", err)
cases := []struct {
name string
availableWalSnaps []walpb.Snapshot
expected *raftpb.Snapshot
}{
{
name: "load-newest",
expected: &newSnap,
},
{
name: "loadnewestavailable-newest",
availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}, {Index: 5, Term: 1}},
expected: &newSnap,
},
{
name: "loadnewestavailable-newest-unsorted",
availableWalSnaps: []walpb.Snapshot{{Index: 5, Term: 1}, {Index: 1, Term: 1}, {Index: 0, Term: 0}},
expected: &newSnap,
},
{
name: "loadnewestavailable-previous",
availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}},
expected: testSnap,
},
}
if !reflect.DeepEqual(g, &newSnap) {
t.Errorf("snap = %#v, want %#v", g, &newSnap)
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
var err error
var g *raftpb.Snapshot
if tc.availableWalSnaps != nil {
g, err = ss.LoadNewestAvailable(tc.availableWalSnaps)
} else {
g, err = ss.Load()
}
if err != nil {
t.Errorf("err = %v, want nil", err)
}
if !reflect.DeepEqual(g, tc.expected) {
t.Errorf("snap = %#v, want %#v", g, tc.expected)
}
})
}
}

Expand Down
2 changes: 2 additions & 0 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ func (r *raftNode) start(rh *raftReadyHandler) {
r.transport.Send(r.processMessages(rd.Messages))
}

// Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
// ensure that recovery after a snapshot restore is possible.
if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
Expand Down
41 changes: 15 additions & 26 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import (
"sync/atomic"
"time"

"github.com/coreos/go-semver/semver"
humanize "github.com/dustin/go-humanize"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"go.etcd.io/etcd/v3/auth"
"go.etcd.io/etcd/v3/etcdserver/api"
"go.etcd.io/etcd/v3/etcdserver/api/membership"
Expand Down Expand Up @@ -59,11 +64,6 @@ import (
"go.etcd.io/etcd/v3/raft/raftpb"
"go.etcd.io/etcd/v3/version"
"go.etcd.io/etcd/v3/wal"

"github.com/coreos/go-semver/semver"
humanize "github.com/dustin/go-humanize"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -416,24 +416,15 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
}

// Find a snapshot to start/restart a raft node
for i := uint64(0); ; i++ {
snapshot, err = ss.LoadIndex(i)
if err != nil && err != snap.ErrNoSnapshot {
return nil, err
}

if err == snap.ErrNoSnapshot {
break
}

if checkWALSnap(cfg.Logger, cfg.WALDir(), snapshot) {
break
}

cfg.Logger.Info(
"skip snapshot",
zap.Uint64("index", i),
)
walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
if err != nil {
return nil, err
}
// snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
// wal log entries
snapshot, err := ss.LoadNewestAvailable(walSnaps)
if err != nil && err != snap.ErrNoSnapshot {
return nil, err
}

if snapshot != nil {
Expand Down Expand Up @@ -2168,12 +2159,10 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
}
lg.Panic("failed to create snapshot", zap.Error(err))
}
// SaveSnap saves the snapshot and releases the locked wal files
// to the snapshot index.
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
if err = s.r.storage.SaveSnap(snap); err != nil {
lg.Panic("failed to save snapshot", zap.Error(err))
}

if err = s.r.storage.Release(snap); err != nil {
lg.Panic("failed to release wal", zap.Error(err))
}
Expand Down
43 changes: 12 additions & 31 deletions etcdserver/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Storage interface {
SaveSnap(snap raftpb.Snapshot) error
// Close closes the Storage and performs finalization.
Close() error
// Release release release the locked wal files since they will not be used.
// Release releases the locked wal files older than the provided snapshot.
Release(snap raftpb.Snapshot) error
// Sync WAL
Sync() error
Expand All @@ -51,51 +51,32 @@ func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
return &storage{w, s}
}

// SaveSnap saves the snapshot to disk and release the locked
// wal files since they will not be used.
// SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry.
func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
walsnap := walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
}
err := st.WAL.SaveSnapshot(walsnap)
// save the snapshot file before writing the snapshot to the wal.
// This makes it possible for the snapshot file to become orphaned, but prevents
// a WAL snapshot entry from having no corresponding snapshot file.
err := st.Snapshotter.SaveSnap(snap)
if err != nil {
return err
}
// gofail: var raftBeforeWALSaveSnaphot struct{}

return st.Snapshotter.SaveSnap(snap)
return st.WAL.SaveSnapshot(walsnap)
}

// Release release the locks to the wal files that are older than the provided wal for the given snap.
func (st *storage) Release(snap raftpb.Snapshot) error {
return st.WAL.ReleaseLockTo(snap.Metadata.Index)
}

func checkWALSnap(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot) bool {
if snapshot == nil {
lg.Fatal("checkWALSnap: snapshot is empty")
}

walsnap := walpb.Snapshot{
Index: snapshot.Metadata.Index,
Term: snapshot.Metadata.Term,
}

w, _, _, st, _ := readWAL(lg, waldir, walsnap)
defer w.Close()

lg.Info(
"checkWALSnap: snapshot and hardstate data",
zap.Uint64("snapshot-index", snapshot.Metadata.Index),
zap.Uint64("st-commit", st.Commit),
)

if snapshot.Metadata.Index > st.Commit {
return false
}

return true
}

// readWAL reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
// after the position of the given snap in the WAL.
// The snap must have been previously saved to the WAL, or this call will panic.
func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
var (
err error
Expand Down
57 changes: 57 additions & 0 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,63 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
return metadata, state, ents, err
}

// ValidSnapshotEntries returns all the valid snapshot entries in the wal logs in the given directory.
// Snapshot entries are valid if their index is less than or equal to the most recent committed hardstate.
func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, error) {
var snaps []walpb.Snapshot
var state raftpb.HardState
var err error

rec := &walpb.Record{}
names, err := readWALNames(lg, walDir)
if err != nil {
return nil, err
}

// open wal files in read mode, so that there is no conflict
// when the same WAL is opened elsewhere in write mode
rs, _, closer, err := openWALFiles(lg, walDir, names, 0, false)
if err != nil {
return nil, err
}
defer func() {
if closer != nil {
closer()
}
}()

// create a new decoder from the readers on the WAL files
decoder := newDecoder(rs...)

for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
switch rec.Type {
case snapshotType:
var loadedSnap walpb.Snapshot
pbutil.MustUnmarshal(&loadedSnap, rec.Data)
snaps = append(snaps, loadedSnap)
case stateType:
state = mustUnmarshalState(rec.Data)
}
}
// We do not have to read out all the WAL entries
// as the decoder is opened in read mode.
if err != io.EOF && err != io.ErrUnexpectedEOF {
return nil, err
}

// filter out any snaps that have a higher term and index than the newest hardstate
n := 0
for _, s := range snaps {
if s.Index <= state.Commit {
snaps[n] = s
n++
}
}
snaps = snaps[:n]

return snaps, nil
}

// Verify reads through the given WAL and verifies that it is not corrupted.
// It creates a new decoder to read through the records of the given WAL.
// It does not conflict with any open WAL, but it is recommended not to
Expand Down
Loading

0 comments on commit b5630ae

Please sign in to comment.