From eb745454a8f36a3c6c59f8c175b77a5fd2fe4fba Mon Sep 17 00:00:00 2001 From: jocalvert Date: Thu, 29 Mar 2018 21:03:30 +0000 Subject: [PATCH] mvcc: Clone the key index BTree to traverse for compaction and lock the original tree on each KeyItem operation, so as to not hold the lock for a long time. This allows read/write throughput by not blocking when the index tree is large (greater than 1M entries). --- mvcc/index.go | 43 +++++++++++++++++----------------------- mvcc/index_bench_test.go | 25 +++++++++++++++++++++++ mvcc/kvstore.go | 4 ++-- 3 files changed, 45 insertions(+), 27 deletions(-) create mode 100644 mvcc/index_bench_test.go diff --git a/mvcc/index.go b/mvcc/index.go index 626de3825fcc..dbdc216e6673 100644 --- a/mvcc/index.go +++ b/mvcc/index.go @@ -17,7 +17,6 @@ package mvcc import ( "sort" "sync" - "github.com/google/btree" "go.uber.org/zap" ) @@ -185,27 +184,32 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision { func (ti *treeIndex) Compact(rev int64) map[revision]struct{} { available := make(map[revision]struct{}) - var emptyki []*keyIndex if ti.lg != nil { ti.lg.Info("compact tree index", zap.Int64("revision", rev)) } else { plog.Printf("store.index: compact %d", rev) } - // TODO: do not hold the lock for long time? - // This is probably OK. Compacting 10M keys takes O(10ms). ti.Lock() - defer ti.Unlock() - ti.tree.Ascend(compactIndex(rev, available, &emptyki)) - for _, ki := range emptyki { - item := ti.tree.Delete(ki) - if item == nil { - if ti.lg != nil { - ti.lg.Panic("failed to delete during compaction") - } else { - plog.Panic("store.index: unexpected delete failure during compaction") + clone := ti.tree.Clone() + ti.Unlock() + + clone.Ascend(func(item btree.Item) bool { + keyi := item.(*keyIndex) + ti.Lock() + keyi.compact(rev, available) + if keyi.isEmpty() { + item := ti.tree.Delete(keyi) + if item == nil { + if ti.lg != nil { + ti.lg.Panic("failed to delete during compaction") + } else { + plog.Panic("store.index: unexpected delete failure during compaction") + } } } - } + ti.Unlock() + return true + }) return available } @@ -222,17 +226,6 @@ func (ti *treeIndex) Keep(rev int64) map[revision]struct{} { return available } -func compactIndex(rev int64, available map[revision]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool { - return func(i btree.Item) bool { - keyi := i.(*keyIndex) - keyi.compact(rev, available) - if keyi.isEmpty() { - *emptyki = append(*emptyki, keyi) - } - return true - } -} - func (ti *treeIndex) Equal(bi index) bool { b := bi.(*treeIndex) diff --git a/mvcc/index_bench_test.go b/mvcc/index_bench_test.go new file mode 100644 index 000000000000..b6c088ee729c --- /dev/null +++ b/mvcc/index_bench_test.go @@ -0,0 +1,25 @@ +package mvcc + +import ( + "testing" +) + +func BenchmarkIndexCompact1(b *testing.B) { benchmarkIndexCompact(b, 1) } +func BenchmarkIndexCompact100(b *testing.B) { benchmarkIndexCompact(b, 100) } +func BenchmarkIndexCompact10000(b *testing.B) { benchmarkIndexCompact(b, 10000) } +func BenchmarkIndexCompact100000(b *testing.B) { benchmarkIndexCompact(b, 100000) } +func BenchmarkIndexCompact1000000(b *testing.B) { benchmarkIndexCompact(b, 1000000) } + +func benchmarkIndexCompact(b *testing.B, size int) { + kvindex := newTreeIndex() + + bytesN := 64 + keys := createBytesSlice(bytesN, size) + for i := 1; i < size; i++ { + kvindex.Put(keys[i], revision {main: int64(i), sub: int64(i)}) + } + b.ResetTimer() + for i := 1; i < b.N; i++ { + kvindex.Compact(int64(i)) + } +} diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index f03b6311e4f4..3d941335ea7a 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -245,9 +245,10 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) { // ensure that desired compaction is persisted s.b.ForceCommit() - keep := s.kvindex.Compact(rev) ch := make(chan struct{}) var j = func(ctx context.Context) { + keep := s.kvindex.Compact(rev) + indexCompactionPauseDurations.Observe(float64(time.Since(start) / time.Millisecond)) if ctx.Err() != nil { s.compactBarrier(ctx, ch) return @@ -261,7 +262,6 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) { s.fifoSched.Schedule(j) - indexCompactionPauseDurations.Observe(float64(time.Since(start) / time.Millisecond)) return ch, nil }