Skip to content

Commit

Permalink
Merge pull request etcd-io#16048 from kkkkun/cp-14457-to-3.5
Browse files Browse the repository at this point in the history
[3.5] etcdserver: fix corruption check when server has just been compacted
  • Loading branch information
ahrtr authored Jun 19, 2023
2 parents 2c04d51 + 8cffdba commit 306c60a
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 10 deletions.
2 changes: 1 addition & 1 deletion server/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (s *store) hashByRev(rev int64) (hash KeyValueHash, currentRev int64, err e
compactRev, currentRev = s.compactMainRev, s.currentRev
s.revMu.RUnlock()

if rev > 0 && rev <= compactRev {
if rev > 0 && rev < compactRev {
s.mu.RUnlock()
return KeyValueHash{}, 0, ErrCompacted
} else if rev > 0 && rev > currentRev {
Expand Down
64 changes: 55 additions & 9 deletions server/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.etcd.io/etcd/server/v3/mvcc/buckets"

"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)

func TestStoreRev(t *testing.T) {
Expand Down Expand Up @@ -536,17 +537,18 @@ type hashKVResult struct {
func TestHashKVWhenCompacting(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer os.Remove(tmpPath)
defer cleanup(s, b, tmpPath)

rev := 10000
for i := 2; i <= rev; i++ {
s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
}

hashCompactc := make(chan hashKVResult, 1)

donec := make(chan struct{})
var wg sync.WaitGroup
donec := make(chan struct{})

// Call HashByRev(10000) in multiple goroutines until donec is closed
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
Expand All @@ -565,28 +567,39 @@ func TestHashKVWhenCompacting(t *testing.T) {
}()
}

// Check computed hashes by HashByRev are correct in a goroutine, until donec is closed
wg.Add(1)
go func() {
defer close(donec)
defer wg.Done()
revHash := make(map[int64]uint32)
for round := 0; round < 1000; round++ {
for {
r := <-hashCompactc
if revHash[r.compactRev] == 0 {
revHash[r.compactRev] = r.hash
}
if r.hash != revHash[r.compactRev] {
t.Errorf("Hashes differ (current %v) != (saved %v)", r.hash, revHash[r.compactRev])
}

select {
case <-donec:
return
default:
}
}
}()

wg.Add(1)
// Compact the store in a goroutine, using revision 9900 to 10000 and close donec when finished
go func() {
defer wg.Done()
defer close(donec)
for i := 100; i >= 0; i-- {
_, err := s.Compact(traceutil.TODO(), int64(rev-1-i))
_, err := s.Compact(traceutil.TODO(), int64(rev-i))
if err != nil {
t.Error(err)
}
// Wait for the compaction job to finish
s.fifoSched.WaitFinish(1)
// Leave time for calls to HashByRev to take place after each compaction
time.Sleep(10 * time.Millisecond)
}
}()
Expand All @@ -599,12 +612,45 @@ func TestHashKVWhenCompacting(t *testing.T) {
}
}

// TestHashKVWithCompactedAndFutureRevisions ensures that HashKV returns a correct hash when called
// with a past revision (lower than compacted), a future revision, and the exact compacted revision
func TestHashKVWithCompactedAndFutureRevisions(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

rev := 10000
compactRev := rev / 2

for i := 2; i <= rev; i++ {
s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
}
if _, err := s.Compact(traceutil.TODO(), int64(compactRev)); err != nil {
t.Fatal(err)
}

_, _, errFutureRev := s.HashStorage().HashByRev(int64(rev + 1))
if errFutureRev != ErrFutureRev {
t.Error(errFutureRev)
}

_, _, errPastRev := s.HashStorage().HashByRev(int64(compactRev - 1))
if errPastRev != ErrCompacted {
t.Error(errPastRev)
}

_, _, errCompactRev := s.HashStorage().HashByRev(int64(compactRev))
if errCompactRev != nil {
t.Error(errCompactRev)
}
}

// TestHashKVZeroRevision ensures that "HashByRev(0)" computes
// correct hash value with latest revision.
func TestHashKVZeroRevision(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer os.Remove(tmpPath)
defer cleanup(s, b, tmpPath)

rev := 10000
for i := 2; i <= rev; i++ {
Expand Down

0 comments on commit 306c60a

Please sign in to comment.