Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.5] etcdserver: fix corruption check when server has just been compacted #16048

Merged
merged 1 commit into from
Jun 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (s *store) hashByRev(rev int64) (hash KeyValueHash, currentRev int64, err e
compactRev, currentRev = s.compactMainRev, s.currentRev
s.revMu.RUnlock()

if rev > 0 && rev <= compactRev {
if rev > 0 && rev < compactRev {
s.mu.RUnlock()
return KeyValueHash{}, 0, ErrCompacted
} else if rev > 0 && rev > currentRev {
Expand Down
64 changes: 55 additions & 9 deletions server/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.etcd.io/etcd/server/v3/mvcc/buckets"

"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)

func TestStoreRev(t *testing.T) {
Expand Down Expand Up @@ -536,17 +537,18 @@ type hashKVResult struct {
func TestHashKVWhenCompacting(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer os.Remove(tmpPath)
defer cleanup(s, b, tmpPath)

rev := 10000
for i := 2; i <= rev; i++ {
s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
}

hashCompactc := make(chan hashKVResult, 1)

donec := make(chan struct{})
var wg sync.WaitGroup
donec := make(chan struct{})

// Call HashByRev(10000) in multiple goroutines until donec is closed
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
Expand All @@ -565,28 +567,39 @@ func TestHashKVWhenCompacting(t *testing.T) {
}()
}

// Check computed hashes by HashByRev are correct in a goroutine, until donec is closed
wg.Add(1)
go func() {
defer close(donec)
defer wg.Done()
revHash := make(map[int64]uint32)
for round := 0; round < 1000; round++ {
for {
r := <-hashCompactc
if revHash[r.compactRev] == 0 {
revHash[r.compactRev] = r.hash
}
if r.hash != revHash[r.compactRev] {
t.Errorf("Hashes differ (current %v) != (saved %v)", r.hash, revHash[r.compactRev])
}

select {
case <-donec:
return
default:
}
}
}()

wg.Add(1)
// Compact the store in a goroutine, using revision 9900 to 10000 and close donec when finished
go func() {
defer wg.Done()
defer close(donec)
for i := 100; i >= 0; i-- {
_, err := s.Compact(traceutil.TODO(), int64(rev-1-i))
_, err := s.Compact(traceutil.TODO(), int64(rev-i))
if err != nil {
t.Error(err)
}
// Wait for the compaction job to finish
s.fifoSched.WaitFinish(1)
// Leave time for calls to HashByRev to take place after each compaction
time.Sleep(10 * time.Millisecond)
}
}()
Expand All @@ -599,12 +612,45 @@ func TestHashKVWhenCompacting(t *testing.T) {
}
}

// TestHashKVWithCompactedAndFutureRevisions ensures that HashKV returns a correct hash when called
// with a past revision (lower than compacted), a future revision, and the exact compacted revision
func TestHashKVWithCompactedAndFutureRevisions(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

rev := 10000
compactRev := rev / 2

for i := 2; i <= rev; i++ {
s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
}
if _, err := s.Compact(traceutil.TODO(), int64(compactRev)); err != nil {
t.Fatal(err)
}

_, _, errFutureRev := s.HashStorage().HashByRev(int64(rev + 1))
if errFutureRev != ErrFutureRev {
t.Error(errFutureRev)
}

_, _, errPastRev := s.HashStorage().HashByRev(int64(compactRev - 1))
if errPastRev != ErrCompacted {
t.Error(errPastRev)
}

_, _, errCompactRev := s.HashStorage().HashByRev(int64(compactRev))
if errCompactRev != nil {
t.Error(errCompactRev)
}
}

// TestHashKVZeroRevision ensures that "HashByRev(0)" computes
// correct hash value with latest revision.
func TestHashKVZeroRevision(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer os.Remove(tmpPath)
defer cleanup(s, b, tmpPath)

rev := 10000
for i := 2; i <= rev; i++ {
Expand Down