Skip to content

Commit

Permalink
fix the potential data loss for clusters with only one member
Browse files Browse the repository at this point in the history
For a cluster with only one member, the raft always send identical
unstable entries and committed entries to etcdserver, and etcd
responds to the client once it finishes (actually partially) the
applying workflow.

When the client receives the response, it doesn't mean etcd has already
successfully saved the data, including BoltDB and WAL, because:
   1. etcd commits the boltDB transaction periodically instead of on each request;
   2. etcd saves WAL entries in parallel with applying the committed entries.
Accordingly, it may run into a situation of data loss when the etcd crashes
immediately after responding to the client and before the boltDB and WAL
successfully save the data to disk.
Note that this issue can only happen for clusters with only one member.

For clusters with multiple members, it isn't an issue, because etcd will
not commit & apply the data before it being replicated to majority members.
When the client receives the response, it means the data must have been applied.
It further means the data must have been committed.
Note: for clusters with multiple members, the raft will never send identical
unstable entries and committed entries to etcdserver.

Signed-off-by: Benjamin Wang <wachao@vmware.com>
  • Loading branch information
ahrtr committed Aug 31, 2022
1 parent cdd2b73 commit 97e54b2
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 4 deletions.
45 changes: 45 additions & 0 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ type Ready struct {
// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool

// MustSaveEntriesBeforeApply indicates whether the Entries must be
// synchronously saved to disk before applying the CommittedEntries.
// Note that Entries and CommittedEntries may have overlapped entries
// for one-member cluster.
MustSaveEntriesBeforeApply bool
}

func isHardStateEqual(a, b pb.HardState) bool {
Expand Down Expand Up @@ -580,6 +586,7 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
rd.ReadStates = r.readStates
}
rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
rd.MustSaveEntriesBeforeApply = mustSaveEntriesBeforeApply(rd)
return rd
}

Expand All @@ -593,3 +600,41 @@ func MustSync(st, prevst pb.HardState, entsnum int) bool {
// log entries[]
return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
}

// For a cluster with only one member, the raft may send both the
// unstable entries and committed entries to users (e.g. etcdserver),
// and there may have overlapped log entries between them. The
// user (e.g., etcd) should wait for the `Entries` to be successfully
// saved before applying the `CommittedEntries` in this case.
//
// etcd responds to the client once it finishes (actually partially)
// the applying workflow. But when the client receives the response,
// it doesn't mean etcd has already successfully saved the data,
// including BoltDB and WAL, because:
// 1. etcd commits the boltDB transaction periodically instead of on each request;
// 2. etcd saves WAL entries in parallel with applying the committed entries.
// Accordingly, it might run into a situation of data loss when the etcd crashes
// immediately after responding to the client and before the boltDB and WAL
// successfully save the data to disk.
// Note that this issue can only happen for clusters with only one member.
//
// For clusters with multiple members, it isn't an issue, because etcd will
// not commit & apply the data before it being replicated to majority members.
// When the client receives the response, it means the data must have been applied.
// It further means the data must have been committed.
// Note: for clusters with multiple members, the raft will never send
// overlapped unstable entries and committed entries to etcdserver.
//
// Refer to https://github.com/etcd-io/etcd/issues/14370.
func mustSaveEntriesBeforeApply(rd Ready) bool {
if len(rd.CommittedEntries) == 0 || len(rd.Entries) == 0 {
return false
}

// Check if there is overlap between unstable and committed entries
// assuming that their index and term are only incrementing.
lastCommittedEntry := rd.CommittedEntries[len(rd.CommittedEntries)-1]
firstUnstableEntry := rd.Entries[0]
return lastCommittedEntry.Term > firstUnstableEntry.Term ||
(lastCommittedEntry.Term == firstUnstableEntry.Term && lastCommittedEntry.Index >= firstUnstableEntry.Index)
}
78 changes: 78 additions & 0 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,3 +1020,81 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) {
)
}
}

func TestMustSaveEntriesBeforeApply(t *testing.T) {
testcases := []struct {
name string
unstableEntries []raftpb.Entry
commitedEntries []raftpb.Entry
expectedResult bool
}{
{
name: "both entries are nil",
unstableEntries: nil,
commitedEntries: nil,
expectedResult: false,
},
{
name: "both entries are empty slices",
unstableEntries: []raftpb.Entry{},
commitedEntries: []raftpb.Entry{},
expectedResult: false,
},
{
name: "one nil and the other empty",
unstableEntries: nil,
commitedEntries: []raftpb.Entry{},
expectedResult: false,
},
{
name: "one nil and the other has data",
unstableEntries: nil,
commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
expectedResult: false,
},
{
name: "one empty and the other has data",
unstableEntries: []raftpb.Entry{},
commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
expectedResult: false,
},
{
name: "has different term and index",
unstableEntries: []raftpb.Entry{{Term: 5, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
expectedResult: false,
},
{
name: "has identical data",
unstableEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
expectedResult: true,
},
{
name: "has overlapped entry",
unstableEntries: []raftpb.Entry{
{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}},
{Term: 4, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x44, 0x55, 0x66}},
{Term: 4, Index: 12, Type: raftpb.EntryNormal, Data: []byte{0x77, 0x88, 0x99}},
},
commitedEntries: []raftpb.Entry{
{Term: 4, Index: 8, Type: raftpb.EntryNormal, Data: []byte{0x07, 0x08, 0x09}},
{Term: 4, Index: 9, Type: raftpb.EntryNormal, Data: []byte{0x10, 0x11, 0x12}},
{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}},
},
expectedResult: true,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
shouldWait := mustSaveEntriesBeforeApply(Ready{
Entries: tc.unstableEntries,
CommittedEntries: tc.commitedEntries,
})
if tc.expectedResult != shouldWait {
t.Errorf("Unexpected result, expected %t, got %t", tc.expectedResult, shouldWait)
}
})
}
}
3 changes: 2 additions & 1 deletion raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,8 @@ func TestRawNodeStart(t *testing.T) {
{Term: 1, Index: 2, Data: nil}, // empty entry
{Term: 1, Index: 3, Data: []byte("foo")}, // empty entry
},
MustSync: true,
MustSync: true,
MustSaveEntriesBeforeApply: true,
}

storage := NewMemoryStorage()
Expand Down
15 changes: 12 additions & 3 deletions server/etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ func (r *raftNode) start(rh *raftReadyHandler) {

updateCommittedIndex(&ap, rh)

if rd.MustSaveEntriesBeforeApply {
// gofail: var raftBeforeSaveWaitWalSave struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
r.lg.Fatal("failed to save Raft hard state and entries before apply", zap.Error(err))
}
}

select {
case r.applyc <- ap:
case <-r.stopped:
Expand All @@ -233,9 +240,11 @@ func (r *raftNode) start(rh *raftReadyHandler) {
// gofail: var raftAfterSaveSnap struct{}
}

// gofail: var raftBeforeSave struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
if !rd.MustSaveEntriesBeforeApply {
// gofail: var raftBeforeSave struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
}
}
if !raft.IsEmptyHardState(rd.HardState) {
proposalsCommitted.Set(float64(rd.HardState.Commit))
Expand Down

0 comments on commit 97e54b2

Please sign in to comment.