From 97e54b270a241634de0096d400d9823beeb192c2 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Wed, 31 Aug 2022 09:25:47 +0800 Subject: [PATCH] fix the potential data loss for clusters with only one member For a cluster with only one member, the raft always send identical unstable entries and committed entries to etcdserver, and etcd responds to the client once it finishes (actually partially) the applying workflow. When the client receives the response, it doesn't mean etcd has already successfully saved the data, including BoltDB and WAL, because: 1. etcd commits the boltDB transaction periodically instead of on each request; 2. etcd saves WAL entries in parallel with applying the committed entries. Accordingly, it may run into a situation of data loss when the etcd crashes immediately after responding to the client and before the boltDB and WAL successfully save the data to disk. Note that this issue can only happen for clusters with only one member. For clusters with multiple members, it isn't an issue, because etcd will not commit & apply the data before it being replicated to majority members. When the client receives the response, it means the data must have been applied. It further means the data must have been committed. Note: for clusters with multiple members, the raft will never send identical unstable entries and committed entries to etcdserver. Signed-off-by: Benjamin Wang --- raft/node.go | 45 ++++++++++++++++++++++ raft/node_test.go | 78 +++++++++++++++++++++++++++++++++++++++ raft/rawnode_test.go | 3 +- server/etcdserver/raft.go | 15 ++++++-- 4 files changed, 137 insertions(+), 4 deletions(-) diff --git a/raft/node.go b/raft/node.go index d374b6c0c21f..aed58979c24d 100644 --- a/raft/node.go +++ b/raft/node.go @@ -87,6 +87,12 @@ type Ready struct { // MustSync indicates whether the HardState and Entries must be synchronously // written to disk or if an asynchronous write is permissible. MustSync bool + + // MustSaveEntriesBeforeApply indicates whether the Entries must be + // synchronously saved to disk before applying the CommittedEntries. + // Note that Entries and CommittedEntries may have overlapped entries + // for one-member cluster. + MustSaveEntriesBeforeApply bool } func isHardStateEqual(a, b pb.HardState) bool { @@ -580,6 +586,7 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { rd.ReadStates = r.readStates } rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries)) + rd.MustSaveEntriesBeforeApply = mustSaveEntriesBeforeApply(rd) return rd } @@ -593,3 +600,41 @@ func MustSync(st, prevst pb.HardState, entsnum int) bool { // log entries[] return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term } + +// For a cluster with only one member, the raft may send both the +// unstable entries and committed entries to users (e.g. etcdserver), +// and there may have overlapped log entries between them. The +// user (e.g., etcd) should wait for the `Entries` to be successfully +// saved before applying the `CommittedEntries` in this case. +// +// etcd responds to the client once it finishes (actually partially) +// the applying workflow. But when the client receives the response, +// it doesn't mean etcd has already successfully saved the data, +// including BoltDB and WAL, because: +// 1. etcd commits the boltDB transaction periodically instead of on each request; +// 2. etcd saves WAL entries in parallel with applying the committed entries. +// Accordingly, it might run into a situation of data loss when the etcd crashes +// immediately after responding to the client and before the boltDB and WAL +// successfully save the data to disk. +// Note that this issue can only happen for clusters with only one member. +// +// For clusters with multiple members, it isn't an issue, because etcd will +// not commit & apply the data before it being replicated to majority members. +// When the client receives the response, it means the data must have been applied. +// It further means the data must have been committed. +// Note: for clusters with multiple members, the raft will never send +// overlapped unstable entries and committed entries to etcdserver. +// +// Refer to https://github.com/etcd-io/etcd/issues/14370. +func mustSaveEntriesBeforeApply(rd Ready) bool { + if len(rd.CommittedEntries) == 0 || len(rd.Entries) == 0 { + return false + } + + // Check if there is overlap between unstable and committed entries + // assuming that their index and term are only incrementing. + lastCommittedEntry := rd.CommittedEntries[len(rd.CommittedEntries)-1] + firstUnstableEntry := rd.Entries[0] + return lastCommittedEntry.Term > firstUnstableEntry.Term || + (lastCommittedEntry.Term == firstUnstableEntry.Term && lastCommittedEntry.Index >= firstUnstableEntry.Index) +} diff --git a/raft/node_test.go b/raft/node_test.go index 77895a0eed41..571a02577377 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -1020,3 +1020,81 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) { ) } } + +func TestMustSaveEntriesBeforeApply(t *testing.T) { + testcases := []struct { + name string + unstableEntries []raftpb.Entry + commitedEntries []raftpb.Entry + expectedResult bool + }{ + { + name: "both entries are nil", + unstableEntries: nil, + commitedEntries: nil, + expectedResult: false, + }, + { + name: "both entries are empty slices", + unstableEntries: []raftpb.Entry{}, + commitedEntries: []raftpb.Entry{}, + expectedResult: false, + }, + { + name: "one nil and the other empty", + unstableEntries: nil, + commitedEntries: []raftpb.Entry{}, + expectedResult: false, + }, + { + name: "one nil and the other has data", + unstableEntries: nil, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: false, + }, + { + name: "one empty and the other has data", + unstableEntries: []raftpb.Entry{}, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: false, + }, + { + name: "has different term and index", + unstableEntries: []raftpb.Entry{{Term: 5, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: false, + }, + { + name: "has identical data", + unstableEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: true, + }, + { + name: "has overlapped entry", + unstableEntries: []raftpb.Entry{ + {Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}, + {Term: 4, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x44, 0x55, 0x66}}, + {Term: 4, Index: 12, Type: raftpb.EntryNormal, Data: []byte{0x77, 0x88, 0x99}}, + }, + commitedEntries: []raftpb.Entry{ + {Term: 4, Index: 8, Type: raftpb.EntryNormal, Data: []byte{0x07, 0x08, 0x09}}, + {Term: 4, Index: 9, Type: raftpb.EntryNormal, Data: []byte{0x10, 0x11, 0x12}}, + {Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}, + }, + expectedResult: true, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + shouldWait := mustSaveEntriesBeforeApply(Ready{ + Entries: tc.unstableEntries, + CommittedEntries: tc.commitedEntries, + }) + if tc.expectedResult != shouldWait { + t.Errorf("Unexpected result, expected %t, got %t", tc.expectedResult, shouldWait) + } + }) + } +} diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 898b0f12c3e2..8c682789a7dc 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -667,7 +667,8 @@ func TestRawNodeStart(t *testing.T) { {Term: 1, Index: 2, Data: nil}, // empty entry {Term: 1, Index: 3, Data: []byte("foo")}, // empty entry }, - MustSync: true, + MustSync: true, + MustSaveEntriesBeforeApply: true, } storage := NewMemoryStorage() diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 704e45a7aac3..d15d6d78b6a6 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -209,6 +209,13 @@ func (r *raftNode) start(rh *raftReadyHandler) { updateCommittedIndex(&ap, rh) + if rd.MustSaveEntriesBeforeApply { + // gofail: var raftBeforeSaveWaitWalSave struct{} + if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { + r.lg.Fatal("failed to save Raft hard state and entries before apply", zap.Error(err)) + } + } + select { case r.applyc <- ap: case <-r.stopped: @@ -233,9 +240,11 @@ func (r *raftNode) start(rh *raftReadyHandler) { // gofail: var raftAfterSaveSnap struct{} } - // gofail: var raftBeforeSave struct{} - if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { - r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err)) + if !rd.MustSaveEntriesBeforeApply { + // gofail: var raftBeforeSave struct{} + if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { + r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err)) + } } if !raft.IsEmptyHardState(rd.HardState) { proposalsCommitted.Set(float64(rd.HardState.Commit))