Skip to content

Commit

Permalink
mvcc/backend: optimize compaction batch interval and write directly t…
Browse files Browse the repository at this point in the history
…o boltdb
  • Loading branch information
jpbetz committed Aug 12, 2019
1 parent 8d823e7 commit 54c340b
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 11 deletions.
17 changes: 17 additions & 0 deletions mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ type Backend interface {
ConcurrentReadTx() ReadTx

Snapshot() Snapshot

// Compact deletes all the provided keys directly from boltdb.
Compact(bucket []byte, keys [][]byte) error

Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
// Size returns the current size of the backend physically allocated.
// The backend can hold DB space that is not utilized at the moment,
Expand Down Expand Up @@ -275,6 +279,19 @@ type IgnoreKey struct {
Key string
}

func (b *backend) Compact(bucket []byte, keys [][]byte) error {
return b.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
for _, key := range keys {
err := b.Delete(key)
if err != nil {
return err
}
}
return nil
})
}

func (b *backend) Hash(ignores map[IgnoreKey]struct{}) (uint32, error) {
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))

Expand Down
57 changes: 46 additions & 11 deletions mvcc/kvstore_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,62 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))

batchsize := int64(10000)
batchsize := int64(1000)
last := make([]byte, 8+1+8)

compactKeys := make([][]byte, 0, batchsize)
for {
var rev revision

start := time.Now()
tx := s.b.BatchTx()
tx.Lock()
rtx := s.b.ConcurrentReadTx()
rtx.RLock()

keys, _ := tx.UnsafeRange(keyBucketName, last, end, batchsize)
compactKeys = compactKeys[:0]
batchCompactions := 0
// Find keys to remove in read transaction to maximize concurrency.
keys, _ := rtx.UnsafeRange(keyBucketName, last, end, batchsize)
for _, key := range keys {
rev = bytesToRev(key)
if _, ok := keep[rev]; !ok {
tx.UnsafeDelete(keyBucketName, key)
keyCompactions++
ckey := make([]byte, len(key))
copy(ckey, key)
compactKeys = append(compactKeys, ckey)
batchCompactions++
}
}
keyCompactions += batchCompactions
revToBytes(revision{main: rev.main, sub: rev.sub + 1}, last)
rtx.RUnlock()

if len(compactKeys) > 0 {
batchStart := time.Now()
err := s.b.Compact(keyBucketName, compactKeys)
if err != nil {
if s.lg != nil {
s.lg.Error(
"compaction failed",
zap.Int64("compact-revision", compactMainRev),
)
} else {
plog.Errorf("compaction failed for %d: %v", compactMainRev, err)
}
}
if s.lg != nil {
s.lg.Info(
"finished scheduled compaction batch",
zap.Int64("compact-revision", compactMainRev),
zap.Int("key-count", batchCompactions),
zap.Duration("took", time.Since(totalStart)),
)
} else {
plog.Printf("finished scheduled compaction batch of %d keys at %d (took %v)", batchCompactions, compactMainRev, time.Since(batchStart))
}
}

if len(keys) < int(batchsize) {
tx := s.b.BatchTx()
tx.Lock()
rbytes := make([]byte, 8+1+8)
revToBytes(revision{main: compactMainRev}, rbytes)
tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
Expand All @@ -57,21 +94,19 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
s.lg.Info(
"finished scheduled compaction",
zap.Int64("compact-revision", compactMainRev),
zap.Int("key-count", keyCompactions),
zap.Duration("took", time.Since(totalStart)),
)
} else {
plog.Printf("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart))
plog.Printf("finished scheduled compaction of %d keys at %d (took %v)", keyCompactions, compactMainRev, time.Since(totalStart))
}
return true
}

// update last
revToBytes(revision{main: rev.main, sub: rev.sub + 1}, last)
tx.Unlock()
dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))

select {
case <-time.After(100 * time.Millisecond):
case <-time.After(10 * time.Millisecond):
case <-s.stopc:
return false
}
Expand Down
1 change: 1 addition & 0 deletions mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,7 @@ func (b *fakeBackend) Snapshot() backend.Snapshot
func (b *fakeBackend) ForceCommit() {}
func (b *fakeBackend) Defrag() error { return nil }
func (b *fakeBackend) Close() error { return nil }
func (b *fakeBackend) Compact(bucket []byte, keys [][]byte) error { return nil }

type indexGetResp struct {
rev revision
Expand Down

0 comments on commit 54c340b

Please sign in to comment.