Skip to content

Commit

Permalink
storage: build SSTs from KV_BATCH snapshot
Browse files Browse the repository at this point in the history
Incrementally build SSTs from the batches sent in a KV_BATCH snapshot.
This logic is only on the receiver side for ease of testing and
compatibility.

The complications of subsumed replicas that are not fully contained by
the current replica are also handled. The following is an example of
this case happening.

a       b       c       d
|---1---|-------2-------|  S1
|---1-------------------|  S2
|---1-----------|---3---|  S3

Since the merge is the first operation to happen, a follower could be
down before it completes. It is reasonable for r1-snapshot from S3 to
subsume both r1 and r2 in S1. Note that it's impossible for a replica to
subsume anything to its left.

The maximum number of SSTs created using the strategy is 4 + SR + 2
where SR is the number of subsumed replicas.

- Three SSTs get created when the snapshot is being received (range
  local keys, replicated range-id local keys, and user keys).
- One SST is constructed for the unreplicated range-id local keys when
  the snapshot is being applied.
- One SST is constructed for every subsumed replica to clear the
  range-id local keys. These SSTs consist of one range deletion
  tombstone and one RaftTombstoneKey.
- A maximum of two SSTs for all subsumed replicas are constructed to
  account the case of not fully contained subsumed replicas. We need to
  delete the key space of the subsumed replicas that we did not delete
  in the previous SSTs. We need one for the range-local keys and one for
  the user keys. These SSTs consist of normal tombstones, one range
  deletion tombstone, or they could be empty.

This commit also introduced a cluster setting
"kv.snapshot_sst.sync_size" which defines the maximum SST chunk size
before fsync-ing. Fsync-ing is necessary to prevent the OS from
accumulating such a large buffer that it blocks unrelated small/fast
writes for a long time when it flushes.

Release note (performance improvement): Snapshots sent between replicas
are now applied more performantly and use less memory.
  • Loading branch information
jeffrey-xiao committed Aug 9, 2019
1 parent 56c7a56 commit b320ff5
Show file tree
Hide file tree
Showing 14 changed files with 835 additions and 266 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
<tr><td><code>kv.rangefeed.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td></tr>
<tr><td><code>kv.snapshot_rebalance.max_rate</code></td><td>byte size</td><td><code>8.0 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td></tr>
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>8.0 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
<tr><td><code>kv.snapshot_sst.sync_size</code></td><td>byte size</td><td><code>2.0 MiB</code></td><td>threshold after which snapshot SST writes must fsync</td></tr>
<tr><td><code>kv.transaction.max_intents_bytes</code></td><td>integer</td><td><code>262144</code></td><td>maximum number of bytes used to track write intents in transactions</td></tr>
<tr><td><code>kv.transaction.max_refresh_spans_bytes</code></td><td>integer</td><td><code>256000</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td></tr>
<tr><td><code>kv.transaction.parallel_commits_enabled</code></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional commits will be parallelized with transactional writes</td></tr>
Expand Down
110 changes: 110 additions & 0 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"math/rand"
"reflect"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/rditer"
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
Expand Down Expand Up @@ -3046,10 +3048,105 @@ func (h *unreliableRaftHandler) HandleRaftResponse(
func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()

// We will be testing the SSTs written on store2's engine.
var eng engine.Engine
ctx := context.Background()
storeCfg := storage.TestStoreConfig(nil)
storeCfg.TestingKnobs.DisableReplicateQueue = true
storeCfg.TestingKnobs.DisableReplicaGCQueue = true
storeCfg.TestingKnobs.BeforeSnapshotSSTIngestion = func(
inSnap storage.IncomingSnapshot,
snapType storage.SnapshotRequest_Type,
sstNames []string,
) error {
// Only verify snapshots of type RAFT and on the range under exercise
// (range 2). Note that the keys of range 2 aren't verified in this
// functions. Unreplicated range-id local keys are not verified because
// there are too many keys and the other replicated keys are verified later
// on in the test. This function verifies that the subsumed replicas have
// been handled properly.
if snapType != storage.SnapshotRequest_RAFT || inSnap.State.Desc.RangeID != roachpb.RangeID(2) {
return nil
}
// The seven SSTs we are expecting to ingest are in the following order:
// 1. Replicated range-id local keys of the range in the snapshot.
// 2. Range-local keys of the range in the snapshot.
// 3. User keys of the range in the snapshot.
// 4. Unreplicated range-id local keys of the range in the snapshot.
// 5. SST to clear range-id local keys of the subsumed replica with
// RangeID 3.
// 6. SST to clear range-id local keys of the subsumed replica with
// RangeID 4.
// 7. SST to clear the user keys of the subsumed replicas.
//
// NOTE: There are no range-local keys in [d, /Max) in the store we're
// sending a snapshot to, so we aren't expecting an SST to clear those
// keys.
if len(sstNames) != 7 {
return errors.Errorf("expected to ingest 7 SSTs, got %d SSTs", len(sstNames))
}

// Only verify the SSTs of the subsumed replicas (the last three SSTs) by
// constructing the expected SST and ensuring that they are byte-by-byte
// equal. This verification ensures that the SSTs have the same tombstones
// and range deletion tombstones.
var expectedSSTs [][]byte
sstNames = sstNames[4:]

// Range-id local range of subsumed replicas.
for _, rangeID := range []roachpb.RangeID{roachpb.RangeID(3), roachpb.RangeID(4)} {
sst, err := engine.MakeRocksDBSstFileWriter()
if err != nil {
return err
}
defer sst.Close()
r := rditer.MakeRangeIDLocalKeyRange(rangeID, false)
if err := sst.ClearRange(r.Start, r.End); err != nil {
return err
}
tombstoneKey := keys.RaftTombstoneKey(rangeID)
tombstoneValue := &roachpb.RaftTombstone{NextReplicaID: math.MaxInt32}
if err := engine.MVCCBlindPutProto(context.TODO(), &sst, nil, tombstoneKey, hlc.Timestamp{}, tombstoneValue, nil); err != nil {
return err
}
expectedSST, err := sst.Finish()
if err != nil {
return err
}
expectedSSTs = append(expectedSSTs, expectedSST)
}

// User key range of subsumed replicas.
sst, err := engine.MakeRocksDBSstFileWriter()
if err != nil {
return err
}
defer sst.Close()
desc := roachpb.RangeDescriptor{
StartKey: roachpb.RKey("d"),
EndKey: roachpb.RKeyMax,
}
r := rditer.MakeUserKeyRange(&desc)
if err := engine.ClearRangeWithHeuristic(eng, &sst, r.Start, r.End); err != nil {
return err
}
expectedSST, err := sst.Finish()
if err != nil {
return err
}
expectedSSTs = append(expectedSSTs, expectedSST)

for i := range sstNames {
actualSST, err := eng.ReadFile(sstNames[i])
if err != nil {
return err
}
if !bytes.Equal(actualSST, expectedSSTs[i]) {
return errors.Errorf("contents of %s were unexpected", sstNames[i])
}
}
return nil
}
mtc := &multiTestContext{
storeConfig: &storeCfg,
// This test was written before the multiTestContext started creating many
Expand All @@ -3060,6 +3157,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
mtc.Start(t, 3)
defer mtc.Stop()
store0, store2 := mtc.Store(0), mtc.Store(2)
eng = store2.Engine()
distSender := mtc.distSenders[0]

// Create three fully-caught-up, adjacent ranges on all three stores.
Expand All @@ -3074,6 +3172,18 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
mtc.waitForValues(key, []int64{1, 1, 1})
}

// Put some keys in [d, /Max) so the subsumed replica of [c, /Max) with range
// ID 4 has tombstones. We will clear uncontained key range of subsumed
// replicas, so when we are receiving a snapshot for [a, d), we expect to
// clear the keys in [d, /Max).
for i := 0; i < 10; i++ {
key := roachpb.Key("d" + strconv.Itoa(i))
if _, pErr := client.SendWrapped(ctx, distSender, incrementArgs(key, 1)); pErr != nil {
t.Fatal(pErr)
}
mtc.waitForValues(key, []int64{1, 1, 1})
}

aRepl0 := store0.LookupReplica(roachpb.RKey("a"))

// Start dropping all Raft traffic to the first range on store1.
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1072,6 +1073,7 @@ func TestFailedSnapshotFillsReservation(t *testing.T) {
RangeSize: 100,
State: storagepb.ReplicaState{Desc: rep.Desc()},
}
header.RaftMessageRequest.Message.Snapshot.Data = uuid.UUID{}.GetBytes()
// Cause this stream to return an error as soon as we ask it for something.
// This injects an error into HandleSnapshotStream when we try to send the
// "snapshot accepted" message.
Expand Down
50 changes: 50 additions & 0 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,3 +504,53 @@ func WriteSyncNoop(ctx context.Context, eng Engine) error {
}
return nil
}

// ClearRangeWithHeuristic clears the keys from start (inclusive) to end
// (exclusive). Depending on the number of keys, it will either use ClearRange
// or ClearRangeIter.
func ClearRangeWithHeuristic(eng Reader, writer Writer, start, end MVCCKey) error {
iter := eng.NewIterator(IterOptions{UpperBound: end.Key})
defer iter.Close()

// It is expensive for there to be many range deletion tombstones in the same
// sstable because all of the tombstones in an sstable are loaded whenever the
// sstable is accessed. So we avoid using range deletion unless there is some
// minimum number of keys. The value here was pulled out of thin air. It might
// be better to make this dependent on the size of the data being deleted. Or
// perhaps we should fix RocksDB to handle large numbers of tombstones in an
// sstable better.
const clearRangeMinKeys = 64
// Peek into the range to see whether it's large enough to justify
// ClearRange. Note that the work done here is bounded by
// clearRangeMinKeys, so it will be fairly cheap even for large
// ranges.
//
// TODO(bdarnell): Move this into ClearIterRange so we don't have
// to do this scan twice.
count := 0
iter.Seek(start)
for {
valid, err := iter.Valid()
if err != nil {
return err
}
if !valid || !iter.Key().Less(end) {
break
}
count++
if count > clearRangeMinKeys {
break
}
iter.Next()
}
var err error
if count > clearRangeMinKeys {
err = writer.ClearRange(start, end)
} else {
err = writer.ClearIterRange(iter, start, end)
}
if err != nil {
return err
}
return nil
}
20 changes: 20 additions & 0 deletions pkg/storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,26 @@ func MVCCPutProto(
return MVCCPut(ctx, engine, ms, key, timestamp, value, txn)
}

// MVCCBlindPutProto sets the given key to the protobuf-serialized byte string
// of msg and the provided timestamp. See MVCCBlindPut for a discussion on this
// fast-path and when it is appropriate to use.
func MVCCBlindPutProto(
ctx context.Context,
engine Writer,
ms *enginepb.MVCCStats,
key roachpb.Key,
timestamp hlc.Timestamp,
msg protoutil.Message,
txn *roachpb.Transaction,
) error {
value := roachpb.Value{}
if err := value.SetProto(msg); err != nil {
return err
}
value.InitChecksum(key)
return MVCCBlindPut(ctx, engine, ms, key, timestamp, value, txn)
}

type getBuffer struct {
meta enginepb.MVCCMetadata
value roachpb.Value
Expand Down
76 changes: 52 additions & 24 deletions pkg/storage/rditer/replica_data_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,42 +43,70 @@ type ReplicaDataIterator struct {

// MakeAllKeyRanges returns all key ranges for the given Range.
func MakeAllKeyRanges(d *roachpb.RangeDescriptor) []KeyRange {
return makeReplicaKeyRanges(d, keys.MakeRangeIDPrefix)
return []KeyRange{
MakeRangeIDLocalKeyRange(d.RangeID, false /* replicatedOnly */),
MakeRangeLocalKeyRange(d),
MakeUserKeyRange(d),
}
}

// MakeReplicatedKeyRanges returns all key ranges that are fully Raft replicated
// for the given Range.
// MakeReplicatedKeyRanges returns all key ranges that are fully Raft
// replicated for the given Range.
//
// NOTE: The logic for receiving snapshot relies on this function returning the
// ranges in the following sorted order:
//
// 1. Replicated range-id local key range
// 2. Range-local key range
// 3. User key range
func MakeReplicatedKeyRanges(d *roachpb.RangeDescriptor) []KeyRange {
return makeReplicaKeyRanges(d, keys.MakeRangeIDReplicatedPrefix)
return []KeyRange{
MakeRangeIDLocalKeyRange(d.RangeID, true /* replicatedOnly */),
MakeRangeLocalKeyRange(d),
MakeUserKeyRange(d),
}
}

// makeReplicaKeyRanges returns a slice of 3 key ranges. The last key range in
// the returned slice corresponds to the actual range data (i.e. not the range
// metadata).
func makeReplicaKeyRanges(
d *roachpb.RangeDescriptor, metaFunc func(roachpb.RangeID) roachpb.Key,
) []KeyRange {
// MakeRangeIDLocalKeyRange returns the range-id local key range. If
// replicatedOnly is true, then it returns only the replicated keys, otherwise,
// it only returns both the replicated and unreplicated keys.
func MakeRangeIDLocalKeyRange(rangeID roachpb.RangeID, replicatedOnly bool) KeyRange {
var prefixFn func(roachpb.RangeID) roachpb.Key
if replicatedOnly {
prefixFn = keys.MakeRangeIDReplicatedPrefix
} else {
prefixFn = keys.MakeRangeIDPrefix
}
sysRangeIDKey := prefixFn(rangeID)
return KeyRange{
Start: engine.MakeMVCCMetadataKey(sysRangeIDKey),
End: engine.MakeMVCCMetadataKey(sysRangeIDKey.PrefixEnd()),
}
}

// MakeRangeLocalKeyRange returns the range local key range. Range-local keys
// are replicated keys that do not belong to the range they would naturally
// sort into. For example, /Local/Range/Table/1 would sort into [/Min,
// /System), but it actually belongs to [/Table/1, /Table/2).
func MakeRangeLocalKeyRange(d *roachpb.RangeDescriptor) KeyRange {
return KeyRange{
Start: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.StartKey)),
End: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.EndKey)),
}
}

// MakeUserKeyRange returns the user key range.
func MakeUserKeyRange(d *roachpb.RangeDescriptor) KeyRange {
// The first range in the keyspace starts at KeyMin, which includes the
// node-local space. We need the original StartKey to find the range
// metadata, but the actual data starts at LocalMax.
dataStartKey := d.StartKey.AsRawKey()
if d.StartKey.Equal(roachpb.RKeyMin) {
dataStartKey = keys.LocalMax
}
sysRangeIDKey := metaFunc(d.RangeID)
return []KeyRange{
{
Start: engine.MakeMVCCMetadataKey(sysRangeIDKey),
End: engine.MakeMVCCMetadataKey(sysRangeIDKey.PrefixEnd()),
},
{
Start: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.StartKey)),
End: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.EndKey)),
},
{
Start: engine.MakeMVCCMetadataKey(dataStartKey),
End: engine.MakeMVCCMetadataKey(d.EndKey.AsRawKey()),
},
return KeyRange{
Start: engine.MakeMVCCMetadataKey(dataStartKey),
End: engine.MakeMVCCMetadataKey(d.EndKey.AsRawKey()),
}
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,10 @@ func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *replicat
if err != nil {
return wrapWithNonDeterministicFailure(err, "unable to get replica for merge")
}
const destroyData = false
const rangeIDLocalOnly = true
const mustClearRange = false
if err := rhsRepl.preDestroyRaftMuLocked(
ctx, b.batch, b.batch, merge.RightDesc.NextReplicaID, destroyData,
ctx, b.batch, b.batch, merge.RightDesc.NextReplicaID, rangeIDLocalOnly, mustClearRange,
); err != nil {
return wrapWithNonDeterministicFailure(err, "unable to destroy range before merge")
}
Expand Down
Loading

0 comments on commit b320ff5

Please sign in to comment.