From b320ff5c5a3338e2207496c88228f781e4b711b1 Mon Sep 17 00:00:00 2001 From: Jeffrey Xiao Date: Thu, 8 Aug 2019 21:16:37 -0400 Subject: [PATCH] storage: build SSTs from KV_BATCH snapshot Incrementally build SSTs from the batches sent in a KV_BATCH snapshot. This logic is only on the receiver side for ease of testing and compatibility. The complications of subsumed replicas that are not fully contained by the current replica are also handled. The following is an example of this case happening. a b c d |---1---|-------2-------| S1 |---1-------------------| S2 |---1-----------|---3---| S3 Since the merge is the first operation to happen, a follower could be down before it completes. It is reasonable for r1-snapshot from S3 to subsume both r1 and r2 in S1. Note that it's impossible for a replica to subsume anything to its left. The maximum number of SSTs created using the strategy is 4 + SR + 2 where SR is the number of subsumed replicas. - Three SSTs get created when the snapshot is being received (range local keys, replicated range-id local keys, and user keys). - One SST is constructed for the unreplicated range-id local keys when the snapshot is being applied. - One SST is constructed for every subsumed replica to clear the range-id local keys. These SSTs consist of one range deletion tombstone and one RaftTombstoneKey. - A maximum of two SSTs for all subsumed replicas are constructed to account the case of not fully contained subsumed replicas. We need to delete the key space of the subsumed replicas that we did not delete in the previous SSTs. We need one for the range-local keys and one for the user keys. These SSTs consist of normal tombstones, one range deletion tombstone, or they could be empty. This commit also introduced a cluster setting "kv.snapshot_sst.sync_size" which defines the maximum SST chunk size before fsync-ing. Fsync-ing is necessary to prevent the OS from accumulating such a large buffer that it blocks unrelated small/fast writes for a long time when it flushes. Release note (performance improvement): Snapshots sent between replicas are now applied more performantly and use less memory. --- docs/generated/settings/settings.html | 1 + pkg/storage/client_merge_test.go | 110 ++++ pkg/storage/client_raft_test.go | 2 + pkg/storage/engine/engine.go | 50 ++ pkg/storage/engine/mvcc.go | 20 + pkg/storage/rditer/replica_data_iter.go | 76 ++- .../replica_application_state_machine.go | 5 +- pkg/storage/replica_command.go | 30 +- pkg/storage/replica_destroy.go | 31 +- pkg/storage/replica_raftstorage.go | 483 +++++++++++------- pkg/storage/stateloader/stateloader.go | 28 +- pkg/storage/store.go | 11 + pkg/storage/store_snapshot.go | 251 ++++++++- pkg/storage/testing_knobs.go | 3 + 14 files changed, 835 insertions(+), 266 deletions(-) diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 5cc133424d0a..7018b07a4586 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -51,6 +51,7 @@ kv.rangefeed.enabledbooleanfalseif set, rangefeed registration is enabled kv.snapshot_rebalance.max_ratebyte size8.0 MiBthe rate limit (bytes/sec) to use for rebalance and upreplication snapshots kv.snapshot_recovery.max_ratebyte size8.0 MiBthe rate limit (bytes/sec) to use for recovery snapshots +kv.snapshot_sst.sync_sizebyte size2.0 MiBthreshold after which snapshot SST writes must fsync kv.transaction.max_intents_bytesinteger262144maximum number of bytes used to track write intents in transactions kv.transaction.max_refresh_spans_bytesinteger256000maximum number of bytes used to track refresh spans in serializable transactions kv.transaction.parallel_commits_enabledbooleantrueif enabled, transactional commits will be parallelized with transactional writes diff --git a/pkg/storage/client_merge_test.go b/pkg/storage/client_merge_test.go index dff83d9866a0..98ddd75e011e 100644 --- a/pkg/storage/client_merge_test.go +++ b/pkg/storage/client_merge_test.go @@ -18,6 +18,7 @@ import ( "math/rand" "reflect" "regexp" + "strconv" "strings" "sync" "sync/atomic" @@ -37,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/rditer" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" @@ -3046,10 +3048,105 @@ func (h *unreliableRaftHandler) HandleRaftResponse( func TestStoreRangeMergeRaftSnapshot(t *testing.T) { defer leaktest.AfterTest(t)() + // We will be testing the SSTs written on store2's engine. + var eng engine.Engine ctx := context.Background() storeCfg := storage.TestStoreConfig(nil) storeCfg.TestingKnobs.DisableReplicateQueue = true storeCfg.TestingKnobs.DisableReplicaGCQueue = true + storeCfg.TestingKnobs.BeforeSnapshotSSTIngestion = func( + inSnap storage.IncomingSnapshot, + snapType storage.SnapshotRequest_Type, + sstNames []string, + ) error { + // Only verify snapshots of type RAFT and on the range under exercise + // (range 2). Note that the keys of range 2 aren't verified in this + // functions. Unreplicated range-id local keys are not verified because + // there are too many keys and the other replicated keys are verified later + // on in the test. This function verifies that the subsumed replicas have + // been handled properly. + if snapType != storage.SnapshotRequest_RAFT || inSnap.State.Desc.RangeID != roachpb.RangeID(2) { + return nil + } + // The seven SSTs we are expecting to ingest are in the following order: + // 1. Replicated range-id local keys of the range in the snapshot. + // 2. Range-local keys of the range in the snapshot. + // 3. User keys of the range in the snapshot. + // 4. Unreplicated range-id local keys of the range in the snapshot. + // 5. SST to clear range-id local keys of the subsumed replica with + // RangeID 3. + // 6. SST to clear range-id local keys of the subsumed replica with + // RangeID 4. + // 7. SST to clear the user keys of the subsumed replicas. + // + // NOTE: There are no range-local keys in [d, /Max) in the store we're + // sending a snapshot to, so we aren't expecting an SST to clear those + // keys. + if len(sstNames) != 7 { + return errors.Errorf("expected to ingest 7 SSTs, got %d SSTs", len(sstNames)) + } + + // Only verify the SSTs of the subsumed replicas (the last three SSTs) by + // constructing the expected SST and ensuring that they are byte-by-byte + // equal. This verification ensures that the SSTs have the same tombstones + // and range deletion tombstones. + var expectedSSTs [][]byte + sstNames = sstNames[4:] + + // Range-id local range of subsumed replicas. + for _, rangeID := range []roachpb.RangeID{roachpb.RangeID(3), roachpb.RangeID(4)} { + sst, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return err + } + defer sst.Close() + r := rditer.MakeRangeIDLocalKeyRange(rangeID, false) + if err := sst.ClearRange(r.Start, r.End); err != nil { + return err + } + tombstoneKey := keys.RaftTombstoneKey(rangeID) + tombstoneValue := &roachpb.RaftTombstone{NextReplicaID: math.MaxInt32} + if err := engine.MVCCBlindPutProto(context.TODO(), &sst, nil, tombstoneKey, hlc.Timestamp{}, tombstoneValue, nil); err != nil { + return err + } + expectedSST, err := sst.Finish() + if err != nil { + return err + } + expectedSSTs = append(expectedSSTs, expectedSST) + } + + // User key range of subsumed replicas. + sst, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return err + } + defer sst.Close() + desc := roachpb.RangeDescriptor{ + StartKey: roachpb.RKey("d"), + EndKey: roachpb.RKeyMax, + } + r := rditer.MakeUserKeyRange(&desc) + if err := engine.ClearRangeWithHeuristic(eng, &sst, r.Start, r.End); err != nil { + return err + } + expectedSST, err := sst.Finish() + if err != nil { + return err + } + expectedSSTs = append(expectedSSTs, expectedSST) + + for i := range sstNames { + actualSST, err := eng.ReadFile(sstNames[i]) + if err != nil { + return err + } + if !bytes.Equal(actualSST, expectedSSTs[i]) { + return errors.Errorf("contents of %s were unexpected", sstNames[i]) + } + } + return nil + } mtc := &multiTestContext{ storeConfig: &storeCfg, // This test was written before the multiTestContext started creating many @@ -3060,6 +3157,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { mtc.Start(t, 3) defer mtc.Stop() store0, store2 := mtc.Store(0), mtc.Store(2) + eng = store2.Engine() distSender := mtc.distSenders[0] // Create three fully-caught-up, adjacent ranges on all three stores. @@ -3074,6 +3172,18 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { mtc.waitForValues(key, []int64{1, 1, 1}) } + // Put some keys in [d, /Max) so the subsumed replica of [c, /Max) with range + // ID 4 has tombstones. We will clear uncontained key range of subsumed + // replicas, so when we are receiving a snapshot for [a, d), we expect to + // clear the keys in [d, /Max). + for i := 0; i < 10; i++ { + key := roachpb.Key("d" + strconv.Itoa(i)) + if _, pErr := client.SendWrapped(ctx, distSender, incrementArgs(key, 1)); pErr != nil { + t.Fatal(pErr) + } + mtc.waitForValues(key, []int64{1, 1, 1}) + } + aRepl0 := store0.LookupReplica(roachpb.RKey("a")) // Start dropping all Raft traffic to the first range on store1. diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index e8a7b42816d8..15f487ca7fbb 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -45,6 +45,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1072,6 +1073,7 @@ func TestFailedSnapshotFillsReservation(t *testing.T) { RangeSize: 100, State: storagepb.ReplicaState{Desc: rep.Desc()}, } + header.RaftMessageRequest.Message.Snapshot.Data = uuid.UUID{}.GetBytes() // Cause this stream to return an error as soon as we ask it for something. // This injects an error into HandleSnapshotStream when we try to send the // "snapshot accepted" message. diff --git a/pkg/storage/engine/engine.go b/pkg/storage/engine/engine.go index 0ed91a63c105..a8addb28cea1 100644 --- a/pkg/storage/engine/engine.go +++ b/pkg/storage/engine/engine.go @@ -504,3 +504,53 @@ func WriteSyncNoop(ctx context.Context, eng Engine) error { } return nil } + +// ClearRangeWithHeuristic clears the keys from start (inclusive) to end +// (exclusive). Depending on the number of keys, it will either use ClearRange +// or ClearRangeIter. +func ClearRangeWithHeuristic(eng Reader, writer Writer, start, end MVCCKey) error { + iter := eng.NewIterator(IterOptions{UpperBound: end.Key}) + defer iter.Close() + + // It is expensive for there to be many range deletion tombstones in the same + // sstable because all of the tombstones in an sstable are loaded whenever the + // sstable is accessed. So we avoid using range deletion unless there is some + // minimum number of keys. The value here was pulled out of thin air. It might + // be better to make this dependent on the size of the data being deleted. Or + // perhaps we should fix RocksDB to handle large numbers of tombstones in an + // sstable better. + const clearRangeMinKeys = 64 + // Peek into the range to see whether it's large enough to justify + // ClearRange. Note that the work done here is bounded by + // clearRangeMinKeys, so it will be fairly cheap even for large + // ranges. + // + // TODO(bdarnell): Move this into ClearIterRange so we don't have + // to do this scan twice. + count := 0 + iter.Seek(start) + for { + valid, err := iter.Valid() + if err != nil { + return err + } + if !valid || !iter.Key().Less(end) { + break + } + count++ + if count > clearRangeMinKeys { + break + } + iter.Next() + } + var err error + if count > clearRangeMinKeys { + err = writer.ClearRange(start, end) + } else { + err = writer.ClearIterRange(iter, start, end) + } + if err != nil { + return err + } + return nil +} diff --git a/pkg/storage/engine/mvcc.go b/pkg/storage/engine/mvcc.go index 937375f81298..8d401ad0d053 100644 --- a/pkg/storage/engine/mvcc.go +++ b/pkg/storage/engine/mvcc.go @@ -621,6 +621,26 @@ func MVCCPutProto( return MVCCPut(ctx, engine, ms, key, timestamp, value, txn) } +// MVCCBlindPutProto sets the given key to the protobuf-serialized byte string +// of msg and the provided timestamp. See MVCCBlindPut for a discussion on this +// fast-path and when it is appropriate to use. +func MVCCBlindPutProto( + ctx context.Context, + engine Writer, + ms *enginepb.MVCCStats, + key roachpb.Key, + timestamp hlc.Timestamp, + msg protoutil.Message, + txn *roachpb.Transaction, +) error { + value := roachpb.Value{} + if err := value.SetProto(msg); err != nil { + return err + } + value.InitChecksum(key) + return MVCCBlindPut(ctx, engine, ms, key, timestamp, value, txn) +} + type getBuffer struct { meta enginepb.MVCCMetadata value roachpb.Value diff --git a/pkg/storage/rditer/replica_data_iter.go b/pkg/storage/rditer/replica_data_iter.go index f8a998250bc6..f63a56e61090 100644 --- a/pkg/storage/rditer/replica_data_iter.go +++ b/pkg/storage/rditer/replica_data_iter.go @@ -43,21 +43,60 @@ type ReplicaDataIterator struct { // MakeAllKeyRanges returns all key ranges for the given Range. func MakeAllKeyRanges(d *roachpb.RangeDescriptor) []KeyRange { - return makeReplicaKeyRanges(d, keys.MakeRangeIDPrefix) + return []KeyRange{ + MakeRangeIDLocalKeyRange(d.RangeID, false /* replicatedOnly */), + MakeRangeLocalKeyRange(d), + MakeUserKeyRange(d), + } } -// MakeReplicatedKeyRanges returns all key ranges that are fully Raft replicated -// for the given Range. +// MakeReplicatedKeyRanges returns all key ranges that are fully Raft +// replicated for the given Range. +// +// NOTE: The logic for receiving snapshot relies on this function returning the +// ranges in the following sorted order: +// +// 1. Replicated range-id local key range +// 2. Range-local key range +// 3. User key range func MakeReplicatedKeyRanges(d *roachpb.RangeDescriptor) []KeyRange { - return makeReplicaKeyRanges(d, keys.MakeRangeIDReplicatedPrefix) + return []KeyRange{ + MakeRangeIDLocalKeyRange(d.RangeID, true /* replicatedOnly */), + MakeRangeLocalKeyRange(d), + MakeUserKeyRange(d), + } } -// makeReplicaKeyRanges returns a slice of 3 key ranges. The last key range in -// the returned slice corresponds to the actual range data (i.e. not the range -// metadata). -func makeReplicaKeyRanges( - d *roachpb.RangeDescriptor, metaFunc func(roachpb.RangeID) roachpb.Key, -) []KeyRange { +// MakeRangeIDLocalKeyRange returns the range-id local key range. If +// replicatedOnly is true, then it returns only the replicated keys, otherwise, +// it only returns both the replicated and unreplicated keys. +func MakeRangeIDLocalKeyRange(rangeID roachpb.RangeID, replicatedOnly bool) KeyRange { + var prefixFn func(roachpb.RangeID) roachpb.Key + if replicatedOnly { + prefixFn = keys.MakeRangeIDReplicatedPrefix + } else { + prefixFn = keys.MakeRangeIDPrefix + } + sysRangeIDKey := prefixFn(rangeID) + return KeyRange{ + Start: engine.MakeMVCCMetadataKey(sysRangeIDKey), + End: engine.MakeMVCCMetadataKey(sysRangeIDKey.PrefixEnd()), + } +} + +// MakeRangeLocalKeyRange returns the range local key range. Range-local keys +// are replicated keys that do not belong to the range they would naturally +// sort into. For example, /Local/Range/Table/1 would sort into [/Min, +// /System), but it actually belongs to [/Table/1, /Table/2). +func MakeRangeLocalKeyRange(d *roachpb.RangeDescriptor) KeyRange { + return KeyRange{ + Start: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.StartKey)), + End: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.EndKey)), + } +} + +// MakeUserKeyRange returns the user key range. +func MakeUserKeyRange(d *roachpb.RangeDescriptor) KeyRange { // The first range in the keyspace starts at KeyMin, which includes the // node-local space. We need the original StartKey to find the range // metadata, but the actual data starts at LocalMax. @@ -65,20 +104,9 @@ func makeReplicaKeyRanges( if d.StartKey.Equal(roachpb.RKeyMin) { dataStartKey = keys.LocalMax } - sysRangeIDKey := metaFunc(d.RangeID) - return []KeyRange{ - { - Start: engine.MakeMVCCMetadataKey(sysRangeIDKey), - End: engine.MakeMVCCMetadataKey(sysRangeIDKey.PrefixEnd()), - }, - { - Start: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.StartKey)), - End: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.EndKey)), - }, - { - Start: engine.MakeMVCCMetadataKey(dataStartKey), - End: engine.MakeMVCCMetadataKey(d.EndKey.AsRawKey()), - }, + return KeyRange{ + Start: engine.MakeMVCCMetadataKey(dataStartKey), + End: engine.MakeMVCCMetadataKey(d.EndKey.AsRawKey()), } } diff --git a/pkg/storage/replica_application_state_machine.go b/pkg/storage/replica_application_state_machine.go index fc644d74320a..75829de30021 100644 --- a/pkg/storage/replica_application_state_machine.go +++ b/pkg/storage/replica_application_state_machine.go @@ -550,9 +550,10 @@ func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *replicat if err != nil { return wrapWithNonDeterministicFailure(err, "unable to get replica for merge") } - const destroyData = false + const rangeIDLocalOnly = true + const mustClearRange = false if err := rhsRepl.preDestroyRaftMuLocked( - ctx, b.batch, b.batch, merge.RightDesc.NextReplicaID, destroyData, + ctx, b.batch, b.batch, merge.RightDesc.NextReplicaID, rangeIDLocalOnly, mustClearRange, ); err != nil { return wrapWithNonDeterministicFailure(err, "unable to destroy range before merge") } diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index f1668813a64c..fb4ec63eb805 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -1313,16 +1313,30 @@ func execChangeReplicasTxn( // waiting for a second and final response from the recipient which indicates if // the snapshot was a success. // +// `receiveSnapshot` takes the key-value pairs sent and creates three SSTs from +// them for direct ingestion: one for the replicated range-ID local keys, one +// for the range local keys, and one for the user keys. The reason it creates +// three separate SSTs is to prevent overlaps with the memtable and existing +// SSTs in RocksDB. Each of the SSTs also has a range deletion tombstone to +// delete the existing data in the range. +// // Applying the snapshot: After the recipient has received the message // indicating it has all the data, it hands it all to -// `(Store).processRaftSnapshotRequest` to be applied. First, this re-checks the -// same things as `shouldAcceptSnapshotData` to make sure nothing has changed -// while the snapshot was being transferred. It then guarantees that there is -// either an initialized[3] replica or a `ReplicaPlaceholder`[4] to accept the -// snapshot by creating a placeholder if necessary. Finally, a *Raft snapshot* -// message is manually handed to the replica's Raft node (by calling -// `stepRaftGroup` + `handleRaftReadyRaftMuLocked`), at which point the snapshot -// has been applied. +// `(Store).processRaftSnapshotRequest` to be applied. First, this re-checks +// the same things as `shouldAcceptSnapshotData` to make sure nothing has +// changed while the snapshot was being transferred. It then guarantees that +// there is either an initialized[3] replica or a `ReplicaPlaceholder`[4] to +// accept the snapshot by creating a placeholder if necessary. Finally, a *Raft +// snapshot* message is manually handed to the replica's Raft node (by calling +// `stepRaftGroup` + `handleRaftReadyRaftMuLocked`). During the application +// process, several other SSTs may be created for direct ingestion. An SST for +// the unreplicated range-ID local keys is created for the Raft entries, hard +// state, and truncated state. An SST is created for deleting each subsumed +// replica's range-ID local keys and at most two SSTs are created for deleting +// the user keys and range local keys of all subsumed replicas. All in all, a +// maximum of 6 + SR SSTs will be created for direct ingestion where SR is the +// number of subsumed replicas. In the case where there are no subsumed +// replicas, 4 SSTs will be created. // // [1]: There is a third kind of snapshot, called "preemptive", which is how we // avoided the above fragility before learner replicas were introduced in the diff --git a/pkg/storage/replica_destroy.go b/pkg/storage/replica_destroy.go index 5dac458b4311..b2b5b253275d 100644 --- a/pkg/storage/replica_destroy.go +++ b/pkg/storage/replica_destroy.go @@ -65,12 +65,13 @@ func (s destroyStatus) Removed() bool { func (r *Replica) preDestroyRaftMuLocked( ctx context.Context, reader engine.Reader, - batch engine.Batch, + writer engine.Writer, nextReplicaID roachpb.ReplicaID, - destroyData bool, + rangeIDLocalOnly bool, + mustClearRange bool, ) error { desc := r.Desc() - err := clearRangeData(ctx, desc, reader, batch, destroyData) + err := clearRangeData(desc, reader, writer, rangeIDLocalOnly, mustClearRange) if err != nil { return err } @@ -80,7 +81,7 @@ func (r *Replica) preDestroyRaftMuLocked( // NB: Legacy tombstones (which are in the replicated key space) are wiped // in clearRangeData, but that's OK since we're writing a new one in the same // batch (and in particular, sequenced *after* the wipe). - return r.setTombstoneKey(ctx, batch, nextReplicaID) + return r.setTombstoneKey(ctx, writer, nextReplicaID) } func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCStats) error { @@ -109,22 +110,27 @@ func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCS if r.raftMu.sideloaded != nil { return r.raftMu.sideloaded.Clear(ctx) } + return nil } // destroyRaftMuLocked deletes data associated with a replica, leaving a -// tombstone. If `destroyData` is true, data in all of the range's keyspaces -// will be deleted. Otherwise, only data in the range-ID local keyspace will be -// deleted. Requires that Replica.raftMu is held. +// tombstone. func (r *Replica) destroyRaftMuLocked(ctx context.Context, nextReplicaID roachpb.ReplicaID) error { startTime := timeutil.Now() ms := r.GetMVCCStats() - const destroyData = true batch := r.Engine().NewWriteOnlyBatch() defer batch.Close() - if err := r.preDestroyRaftMuLocked(ctx, r.Engine(), batch, nextReplicaID, destroyData); err != nil { + if err := r.preDestroyRaftMuLocked( + ctx, + r.Engine(), + batch, + nextReplicaID, + false, /* rangeIDLocalOnly */ + false, /* mustClearRange */ + ); err != nil { return err } preTime := timeutil.Now() @@ -172,7 +178,7 @@ func (r *Replica) cancelPendingCommandsLocked() { // ID that it hasn't yet received a RangeDescriptor for if it receives raft // requests for that replica ID (as seen in #14231). func (r *Replica) setTombstoneKey( - ctx context.Context, eng engine.ReadWriter, externalNextReplicaID roachpb.ReplicaID, + ctx context.Context, eng engine.Writer, externalNextReplicaID roachpb.ReplicaID, ) error { r.mu.Lock() nextReplicaID := r.mu.state.Desc.NextReplicaID @@ -188,6 +194,7 @@ func (r *Replica) setTombstoneKey( tombstone := &roachpb.RaftTombstone{ NextReplicaID: nextReplicaID, } - return engine.MVCCPutProto(ctx, eng, nil, tombstoneKey, - hlc.Timestamp{}, nil, tombstone) + // "Blind" because ms == nil and timestamp == hlc.Timestamp{}. + return engine.MVCCBlindPutProto(ctx, eng, nil, tombstoneKey, + hlc.Timestamp{}, tombstone, nil) } diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 443a73cc4dd3..cded380c95a2 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -482,8 +482,8 @@ func (s *OutgoingSnapshot) Close() { // IncomingSnapshot contains the data for an incoming streaming snapshot message. type IncomingSnapshot struct { SnapUUID uuid.UUID - // The RocksDB BatchReprs that make up this snapshot. - Batches [][]byte + // The storage interface for the underlying SSTs. + SSSS *SSTSnapshotStorageScratch // The Raft log entries for this snapshot. LogEntries [][]byte // The replica state at the time the snapshot was generated (never nil). @@ -593,9 +593,14 @@ func snapshot( // append is intentionally oblivious to the existence of sideloaded proposals. // They are managed by the caller, including cleaning up obsolete on-disk // payloads in case the log tail is replaced. +// +// NOTE: This method takes a engine.Writer because reads are unnecessary when +// prevLastIndex is 0 and prevLastTerm is invalidLastTerm. In the case where +// reading is necessary (I.E. entries are getting overwritten or deleted), a +// engine.ReadWriter must be passed in. func (r *Replica) append( ctx context.Context, - batch engine.ReadWriter, + eng engine.Writer, prevLastIndex uint64, prevLastTerm uint64, prevRaftLogSize int64, @@ -616,30 +621,43 @@ func (r *Replica) append( value.InitChecksum(key) var err error if ent.Index > prevLastIndex { - err = engine.MVCCBlindPut(ctx, batch, &diff, key, hlc.Timestamp{}, value, nil /* txn */) + err = engine.MVCCBlindPut(ctx, eng, &diff, key, hlc.Timestamp{}, value, nil /* txn */) } else { - err = engine.MVCCPut(ctx, batch, &diff, key, hlc.Timestamp{}, value, nil /* txn */) + // We type assert eng to also be an engine.Reader only in the case where + // we're replacing existing entries. + eng, ok := eng.(engine.ReadWriter) + if !ok { + return 0, 0, 0, errors.Errorf("expected eng to be a engine.ReadWriter when overwriting log entries") + } + err = engine.MVCCPut(ctx, eng, &diff, key, hlc.Timestamp{}, value, nil /* txn */) } if err != nil { return 0, 0, 0, err } } - // Delete any previously appended log entries which never committed. lastIndex := entries[len(entries)-1].Index lastTerm := entries[len(entries)-1].Term - for i := lastIndex + 1; i <= prevLastIndex; i++ { - // Note that the caller is in charge of deleting any sideloaded payloads - // (which they must only do *after* the batch has committed). - err := engine.MVCCDelete(ctx, batch, &diff, r.raftMu.stateLoader.RaftLogKey(i), - hlc.Timestamp{}, nil /* txn */) - if err != nil { - return 0, 0, 0, err + // Delete any previously appended log entries which never committed. + if prevLastIndex > 0 { + // We type assert eng to also be an engine.Reader only in the case where + // we're deleting existing entries. + eng, ok := eng.(engine.ReadWriter) + if !ok { + return 0, 0, 0, errors.Errorf("expected eng to be a engine.ReadWriter when deleting log entries") + } + for i := lastIndex + 1; i <= prevLastIndex; i++ { + // Note that the caller is in charge of deleting any sideloaded payloads + // (which they must only do *after* the batch has committed). + err := engine.MVCCDelete(ctx, eng, &diff, r.raftMu.stateLoader.RaftLogKey(i), + hlc.Timestamp{}, nil /* txn */) + if err != nil { + return 0, 0, 0, err + } } } raftLogSize := prevRaftLogSize + diff.SysBytes - return lastIndex, lastTerm, raftLogSize, nil } @@ -673,62 +691,38 @@ func (r *Replica) updateRangeInfo(desc *roachpb.RangeDescriptor) error { return nil } +// clearRangeData clears the data associated with a range descriptor. If +// rangeIDLocalOnly is true, then only the range-id local keys are deleted. +// Otherwise, the range-id local keys, range local keys, and user keys are all +// deleted. If mustClearRange is true, ClearRange will always be used to remove +// the keys. Otherwise, ClearRangeWithHeuristic will be used, which chooses +// ClearRange or ClearIterRange depending on how many keys there are in the +// range. func clearRangeData( - ctx context.Context, desc *roachpb.RangeDescriptor, eng engine.Reader, - batch engine.Batch, - destroyData bool, + writer engine.Writer, + rangeIDLocalOnly bool, + mustClearRange bool, ) error { - iter := eng.NewIterator(engine.IterOptions{UpperBound: desc.EndKey.AsRawKey()}) - defer iter.Close() - - // It is expensive for there to be many range deletion tombstones in the same - // sstable because all of the tombstones in an sstable are loaded whenever the - // sstable is accessed. So we avoid using range deletion unless there is some - // minimum number of keys. The value here was pulled out of thin air. It might - // be better to make this dependent on the size of the data being deleted. Or - // perhaps we should fix RocksDB to handle large numbers of tombstones in an - // sstable better. - const clearRangeMinKeys = 64 - keyRanges := rditer.MakeAllKeyRanges(desc) - if !destroyData { - // TODO(benesch): The fact that we hardcode the number of - // "metadata" ranges (i.e. non-user-keyspace) suggests that - // rditer.MakeAllKeyRanges has the wrong API. - keyRanges = keyRanges[:1] + var keyRanges []rditer.KeyRange + if rangeIDLocalOnly { + keyRanges = []rditer.KeyRange{rditer.MakeRangeIDLocalKeyRange(desc.RangeID, false)} + } else { + keyRanges = rditer.MakeAllKeyRanges(desc) } - for _, keyRange := range keyRanges { - // Peek into the range to see whether it's large enough to justify - // ClearRange. Note that the work done here is bounded by - // clearRangeMinKeys, so it will be fairly cheap even for large - // ranges. - // - // TODO(bdarnell): Move this into ClearIterRange so we don't have - // to do this scan twice. - count := 0 - iter.Seek(keyRange.Start) - for { - valid, err := iter.Valid() - if err != nil { - return err - } - if !valid || !iter.Key().Less(keyRange.End) { - break - } - count++ - if count > clearRangeMinKeys { - break - } - iter.Next() - } - var err error - if count > clearRangeMinKeys { - err = batch.ClearRange(keyRange.Start, keyRange.End) - } else { - err = batch.ClearIterRange(iter, keyRange.Start, keyRange.End) + + var clearRangeFn func(engine.Reader, engine.Writer, engine.MVCCKey, engine.MVCCKey) error + if mustClearRange { + clearRangeFn = func(eng engine.Reader, writer engine.Writer, start, end engine.MVCCKey) error { + return writer.ClearRange(start, end) } - if err != nil { + } else { + clearRangeFn = engine.ClearRangeWithHeuristic + } + + for _, keyRange := range keyRanges { + if err := clearRangeFn(eng, writer, keyRange.Start, keyRange.End); err != nil { return err } } @@ -806,172 +800,144 @@ func (r *Replica) applySnapshot( } var stats struct { - clear time.Time - batch time.Time - entries time.Time - commit time.Time - } - - var size int - for _, b := range inSnap.Batches { - size += len(b) - } - for _, e := range inSnap.LogEntries { - size += len(e) - } - - log.Infof(ctx, "applying %s snapshot at index %d "+ - "(id=%s, encoded size=%d, %d rocksdb batches, %d log entries)", - snapType, snap.Metadata.Index, inSnap.SnapUUID.Short(), - size, len(inSnap.Batches), len(inSnap.LogEntries)) + // Time to clear unreplicated range-ID local keys and update unreplicated + // state. + unreplicatedState time.Time + // Time to process subsumed replicas. + subsumedReplicas time.Time + // Time to ingest SSTs. + ingestion time.Time + } + log.Infof(ctx, "applying %s snapshot [id=%s index=%d]", + snapType, inSnap.SnapUUID.Short(), snap.Metadata.Index) defer func(start time.Time) { now := timeutil.Now() - log.Infof(ctx, "applied %s snapshot in %0.0fms [clear=%0.0fms batch=%0.0fms entries=%0.0fms commit=%0.0fms]", - snapType, now.Sub(start).Seconds()*1000, - stats.clear.Sub(start).Seconds()*1000, - stats.batch.Sub(stats.clear).Seconds()*1000, - stats.entries.Sub(stats.batch).Seconds()*1000, - stats.commit.Sub(stats.entries).Seconds()*1000) - }(timeutil.Now()) - - // Use a more efficient write-only batch because we don't need to do any - // reads from the batch. - batch := r.store.Engine().NewWriteOnlyBatch() - defer batch.Close() - - // If we're subsuming a replica below, we don't have its last NextReplicaID, - // nor can we obtain it. That's OK: we can just be conservative and use the - // maximum possible replica ID. preDestroyRaftMuLocked will write a replica - // tombstone using this maximum possible replica ID, which would normally be - // problematic, as it would prevent this store from ever having a new replica - // of the removed range. In this case, however, it's copacetic, as subsumed - // ranges _can't_ have new replicas. - const subsumedNextReplicaID = math.MaxInt32 - - // As part of applying the snapshot, we may need to subsume replicas that have - // been merged into this range. Destroy their data in the same batch in which - // we apply the snapshot. - for _, sr := range subsumedRepls { - if err := sr.preDestroyRaftMuLocked( - ctx, r.store.Engine(), batch, subsumedNextReplicaID, true, /* destroyData */ - ); err != nil { - return err + totalLog := fmt.Sprintf( + "total=%0.0fms ", + now.Sub(start).Seconds()*1000, + ) + unreplicatedStateLog := fmt.Sprintf( + "unreplicatedState=%0.0fms ", + stats.unreplicatedState.Sub(start).Seconds()*1000, + ) + var subsumedReplicasLog string + if len(subsumedRepls) > 0 { + subsumedReplicasLog = fmt.Sprintf( + "subsumedReplicas=%d@%0.0fms ", + len(subsumedRepls), + stats.subsumedReplicas.Sub(stats.unreplicatedState).Seconds()*1000, + ) } - } + ingestionLog := fmt.Sprintf( + "ingestion=%d@%0.0fms ", + len(inSnap.SSSS.SSTs()), + stats.ingestion.Sub(stats.subsumedReplicas).Seconds()*1000, + ) + log.Infof(ctx, "applied %s snapshot [%s%s%s%sid=%s index=%d]", + snapType, totalLog, unreplicatedStateLog, subsumedReplicasLog, + ingestionLog, inSnap.SnapUUID.Short(), snap.Metadata.Index) + }(timeutil.Now()) - // Delete everything in the range and recreate it from the snapshot. - // We need to delete any old Raft log entries here because any log entries - // that predate the snapshot will be orphaned and never truncated or GC'd. - if err := clearRangeData(ctx, s.Desc, r.store.Engine(), batch, true /* destroyData */); err != nil { + unreplicatedSST, err := engine.MakeRocksDBSstFileWriter() + if err != nil { return err } - // Clear the cached raft log entries to ensure that old or uncommitted - // entries don't impact the in-memory state. - r.store.raftEntryCache.Drop(r.RangeID) - stats.clear = timeutil.Now() + defer unreplicatedSST.Close() - // Write the snapshot into the range. - for _, batchRepr := range inSnap.Batches { - if err := batch.ApplyBatchRepr(batchRepr, false); err != nil { - return err - } + // Clearing the unreplicated state. + unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID) + unreplicatedStart := engine.MakeMVCCMetadataKey(unreplicatedPrefixKey) + unreplicatedEnd := engine.MakeMVCCMetadataKey(unreplicatedPrefixKey.PrefixEnd()) + if err = unreplicatedSST.ClearRange(unreplicatedStart, unreplicatedEnd); err != nil { + return errors.Wrapf(err, "error clearing range of unreplicated SST writer") } - // The log entries are all written to distinct keys so we can use a - // distinct batch. - distinctBatch := batch.Distinct() - stats.batch = timeutil.Now() + // Update HardState. + if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil { + return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") + } + // Update TruncatedState if it is unreplicated. if inSnap.UsesUnreplicatedTruncatedState { - // We're using the unreplicated truncated state, which we need to - // manually persist to disk. If we're not taking this branch, the - // snapshot contains a legacy TruncatedState and we don't need to do - // anything (in fact, must not -- the invariant is that exactly one of - // them exists at any given point in the state machine). - if err := stateloader.Make(s.Desc.RangeID).SetRaftTruncatedState( - ctx, distinctBatch, s.TruncatedState, + if err := r.raftMu.stateLoader.SetRaftTruncatedState( + ctx, &unreplicatedSST, s.TruncatedState, ); err != nil { - return err + return errors.Wrapf(err, "unable to write UnreplicatedTruncatedState to unreplicated SST writer") } } - logEntries := make([]raftpb.Entry, len(inSnap.LogEntries)) - for i, bytes := range inSnap.LogEntries { - if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil { - return err - } - } - // If this replica doesn't know its ReplicaID yet, we're applying a - // preemptive snapshot. In this case, we're going to have to write the - // sideloaded proposals into the Raft log. Otherwise, sideload. + // Update Raft entries. + var lastTerm uint64 var raftLogSize int64 - thinEntries := logEntries - if replicaID != 0 { + if len(inSnap.LogEntries) > 0 { + logEntries := make([]raftpb.Entry, len(inSnap.LogEntries)) + for i, bytes := range inSnap.LogEntries { + if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil { + return err + } + } + // If this replica doesn't know its ReplicaID yet, we're applying a + // preemptive snapshot. In this case, we're going to have to write the + // sideloaded proposals into the Raft log. Otherwise, sideload. + if replicaID != 0 { + var err error + var sideloadedEntriesSize int64 + logEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries) + if err != nil { + return err + } + raftLogSize += sideloadedEntriesSize + } var err error - var sideloadedEntriesSize int64 - thinEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries) + _, lastTerm, raftLogSize, err = r.append(ctx, &unreplicatedSST, 0, invalidLastTerm, raftLogSize, logEntries) if err != nil { return err } - raftLogSize += sideloadedEntriesSize + } else { + lastTerm = invalidLastTerm } + r.store.raftEntryCache.Drop(r.RangeID) - // Write the snapshot's Raft log into the range. - var lastTerm uint64 - _, lastTerm, raftLogSize, err = r.append( - ctx, distinctBatch, 0, invalidLastTerm, raftLogSize, thinEntries, - ) - if err != nil { + stats.unreplicatedState = timeutil.Now() + if err := inSnap.SSSS.WriteSST(ctx, &unreplicatedSST); err != nil { return err } - stats.entries = timeutil.Now() - - // Note that since this snapshot comes from Raft, we don't have to synthesize - // the HardState -- Raft wouldn't ask us to update the HardState in incorrect - // ways. - if err := r.raftMu.stateLoader.SetHardState(ctx, distinctBatch, hs); err != nil { - return errors.Wrapf(err, "unable to persist HardState %+v", &hs) - } - - // We need to close the distinct batch and start using the normal batch for - // the read below. - distinctBatch.Close() - // As outlined above, last and applied index are the same after applying - // the snapshot (i.e. the snapshot has no uncommitted tail). if s.RaftAppliedIndex != snap.Metadata.Index { log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d", s.RaftAppliedIndex, snap.Metadata.Index) } - // We've written Raft log entries, so we need to sync the WAL. - if err := batch.Commit(!disableSyncRaftLog.Get(&r.store.cfg.Settings.SV)); err != nil { + // If we're subsuming a replica below, we don't have its last NextReplicaID, + // nor can we obtain it. That's OK: we can just be conservative and use the + // maximum possible replica ID. preDestroyRaftMuLocked will write a replica + // tombstone using this maximum possible replica ID, which would normally be + // problematic, as it would prevent this store from ever having a new replica + // of the removed range. In this case, however, it's copacetic, as subsumed + // ranges _can't_ have new replicas. + const subsumedNextReplicaID = math.MaxInt32 + if err := r.clearSubsumedReplicaDiskData(ctx, inSnap.SSSS, s.Desc, subsumedRepls, subsumedNextReplicaID); err != nil { return err } - stats.commit = timeutil.Now() + stats.subsumedReplicas = timeutil.Now() + + // Ingest all SSTs atomically. + if fn := r.store.cfg.TestingKnobs.BeforeSnapshotSSTIngestion; fn != nil { + if err := fn(inSnap, snapType, inSnap.SSSS.SSTs()); err != nil { + return err + } + } + if err := r.store.engine.IngestExternalFiles(ctx, inSnap.SSSS.SSTs(), true /* skipWritingSeqNo */, true /* modify */); err != nil { + return errors.Wrapf(err, "while ingesting %s", inSnap.SSSS.SSTs()) + } + stats.ingestion = timeutil.Now() // The on-disk state is now committed, but the corresponding in-memory state // has not yet been updated. Any errors past this point must therefore be // treated as fatal. - for _, sr := range subsumedRepls { - // We removed sr's data when we committed the batch. Finish subsumption by - // updating the in-memory bookkeping. - if err := sr.postDestroyRaftMuLocked(ctx, sr.GetMVCCStats()); err != nil { - log.Fatalf(ctx, "unable to finish destroying %s while applying snapshot: %+v", sr, err) - } - // We already hold sr's raftMu, so we must call removeReplicaImpl directly. - // Note that it's safe to update the store's metadata for sr's removal - // separately from updating the store's metadata for r's new descriptor - // (i.e., under a different store.mu acquisition). Each store.mu acquisition - // leaves the store in a consistent state, and access to the replicas - // themselves is protected by their raftMus, which are held from start to - // finish. - if err := r.store.removeReplicaImpl(ctx, sr, subsumedNextReplicaID, RemoveOptions{ - DestroyData: false, // data is already destroyed - }); err != nil { - log.Fatalf(ctx, "unable to remove %s while applying snapshot: %+v", sr, err) - } + if err := r.clearSubsumedReplicaInMemoryData(ctx, subsumedRepls, subsumedNextReplicaID); err != nil { + log.Fatalf(ctx, "failed to clear in-memory data of subsumed replicas while applying snapshot: %+v", err) } // Atomically swap the placeholder, if any, for the replica, and update the @@ -1034,6 +1000,137 @@ func (r *Replica) applySnapshot( return nil } +// clearSubsumedReplicaDiskData clears the on disk data of the subsumed +// replicas by creating SSTs with range deletion tombstones. We have to be +// careful here not to have overlapping ranges with the SSTs we have already +// created since that will throw an error while we are ingesting them. This +// method requires that each of the subsumed replicas raftMu is held. +func (r *Replica) clearSubsumedReplicaDiskData( + ctx context.Context, + ssss *SSTSnapshotStorageScratch, + desc *roachpb.RangeDescriptor, + subsumedRepls []*Replica, + subsumedNextReplicaID roachpb.ReplicaID, +) error { + getKeyRanges := func(desc *roachpb.RangeDescriptor) [2]rditer.KeyRange { + return [...]rditer.KeyRange{ + rditer.MakeRangeLocalKeyRange(desc), + rditer.MakeUserKeyRange(desc), + } + } + keyRanges := getKeyRanges(desc) + totalKeyRanges := append([]rditer.KeyRange(nil), keyRanges[:]...) + for _, sr := range subsumedRepls { + // We have to create an SST for the subsumed replica's range-id local keys. + subsumedReplSST, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return err + } + // NOTE: We set mustClearRange to true because we are setting + // RaftTombstoneKey. Since Clears and Puts need to be done in increasing + // order of keys, it is not safe to use ClearRangeIter. + if err := sr.preDestroyRaftMuLocked( + ctx, + r.store.Engine(), + &subsumedReplSST, + subsumedNextReplicaID, + true, /* rangeIDLocalOnly */ + true, /* mustClearRange */ + ); err != nil { + subsumedReplSST.Close() + return err + } + if err := ssss.WriteSST(ctx, &subsumedReplSST); err != nil { + return err + } + + srKeyRanges := getKeyRanges(sr.Desc()) + // Compute the total key space covered by the current replica and all + // subsumed replicas. + for i := range srKeyRanges { + if srKeyRanges[i].Start.Key.Compare(totalKeyRanges[i].Start.Key) < 0 { + totalKeyRanges[i].Start = srKeyRanges[i].Start + } + if srKeyRanges[i].End.Key.Compare(totalKeyRanges[i].End.Key) > 0 { + totalKeyRanges[i].End = srKeyRanges[i].End + } + } + } + + // We might have to create SSTs for the range local keys and user keys + // depending on if the subsumed replicas are not fully contained by the + // replica in our snapshot. The following is an example to this case + // happening. + // + // a b c d + // |---1---|-------2-------| S1 + // |---1-------------------| S2 + // |---1-----------|---3---| S3 + // + // Since the merge is the first operation to happen, a follower could be down + // before it completes. It is reasonable for a snapshot for r1 from S3 to + // subsume both r1 and r2 in S1. + for i := range keyRanges { + if totalKeyRanges[i].End.Key.Compare(keyRanges[i].End.Key) > 0 { + subsumedReplSST, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return err + } + if err := engine.ClearRangeWithHeuristic( + r.store.Engine(), + &subsumedReplSST, + keyRanges[i].End, + totalKeyRanges[i].End, + ); err != nil { + subsumedReplSST.Close() + return err + } + if err := ssss.WriteSST(ctx, &subsumedReplSST); err != nil { + return err + } + } + // The snapshot must never subsume a replica that extends the range of the + // replica to the left. This is because splits and merges (the only + // operation that change the key bounds) always leave the start key intact. + // Extending to the left implies that either we merged "to the left" (we + // don't), or that we're applying a snapshot for another range (we don't do + // that either). Something is severely wrong for this to happen. + if totalKeyRanges[i].Start.Key.Compare(keyRanges[i].Start.Key) < 0 { + log.Fatalf(ctx, "subsuming replica to our left; key range: %v; total key range %v", + keyRanges[i], totalKeyRanges[i]) + } + } + return nil +} + +// clearSubsumedReplicaInMemoryData clears the in-memory data of the subsumed +// replicas. This method requires that each of the subsumed replicas raftMu is +// held. +func (r *Replica) clearSubsumedReplicaInMemoryData( + ctx context.Context, subsumedRepls []*Replica, subsumedNextReplicaID roachpb.ReplicaID, +) error { + for _, sr := range subsumedRepls { + // We removed sr's data when we committed the batch. Finish subsumption by + // updating the in-memory bookkeping. + if err := sr.postDestroyRaftMuLocked(ctx, sr.GetMVCCStats()); err != nil { + return err + } + // We already hold sr's raftMu, so we must call removeReplicaImpl directly. + // Note that it's safe to update the store's metadata for sr's removal + // separately from updating the store's metadata for r's new descriptor + // (i.e., under a different store.mu acquisition). Each store.mu + // acquisition leaves the store in a consistent state, and access to the + // replicas themselves is protected by their raftMus, which are held from + // start to finish. + if err := r.store.removeReplicaImpl(ctx, sr, subsumedNextReplicaID, RemoveOptions{ + DestroyData: false, // data is already destroyed + }); err != nil { + return err + } + } + return nil +} + type raftCommandEncodingVersion byte // Raft commands are encoded with a 1-byte version (currently 0 or 1), an 8-byte diff --git a/pkg/storage/stateloader/stateloader.go b/pkg/storage/stateloader/stateloader.go index 9174d0a7ec7e..b401dfffa657 100644 --- a/pkg/storage/stateloader/stateloader.go +++ b/pkg/storage/stateloader/stateloader.go @@ -541,13 +541,21 @@ func (rsl StateLoader) LoadRaftTruncatedState( // SetRaftTruncatedState overwrites the truncated state. func (rsl StateLoader) SetRaftTruncatedState( - ctx context.Context, eng engine.ReadWriter, truncState *roachpb.RaftTruncatedState, + ctx context.Context, eng engine.Writer, truncState *roachpb.RaftTruncatedState, ) error { if (*truncState == roachpb.RaftTruncatedState{}) { return errors.New("cannot persist empty RaftTruncatedState") } - return engine.MVCCPutProto(ctx, eng, nil, /* ms */ - rsl.RaftTruncatedStateKey(), hlc.Timestamp{}, nil, truncState) + // "Blind" because ms == nil and timestamp == hlc.Timestamp{}. + return engine.MVCCBlindPutProto( + ctx, + eng, + nil, /* ms */ + rsl.RaftTruncatedStateKey(), + hlc.Timestamp{}, /* timestamp */ + truncState, + nil, /* txn */ + ) } // LoadHardState loads the HardState. @@ -566,10 +574,18 @@ func (rsl StateLoader) LoadHardState( // SetHardState overwrites the HardState. func (rsl StateLoader) SetHardState( - ctx context.Context, batch engine.ReadWriter, st raftpb.HardState, + ctx context.Context, batch engine.Writer, st raftpb.HardState, ) error { - return engine.MVCCPutProto(ctx, batch, nil, - rsl.RaftHardStateKey(), hlc.Timestamp{}, nil, &st) + // "Blind" because ms == nil and timestamp == hlc.Timestamp{}. + return engine.MVCCBlindPutProto( + ctx, + batch, + nil, /* ms */ + rsl.RaftHardStateKey(), + hlc.Timestamp{}, /* timestamp */ + &st, + nil, /* txn */ + ) } // SynthesizeRaftState creates a Raft state which synthesizes both a HardState diff --git a/pkg/storage/store.go b/pkg/storage/store.go index befd75eeed12..05a276f6b239 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -412,6 +412,7 @@ type Store struct { raftEntryCache *raftentry.Cache limiters batcheval.Limiters txnWaitMetrics *txnwait.Metrics + sss SSTSnapshotStorage // gossipRangeCountdown and leaseRangeCountdown are countdowns of // changes to range and leaseholder counts, after which the store @@ -865,6 +866,16 @@ func NewStore( s.limiters.ConcurrentExportRequests = limit.MakeConcurrentRequestLimiter( "exportRequestLimiter", int(ExportRequestsLimit.Get(&cfg.Settings.SV)), ) + + // The snapshot storage is usually empty at this point since it is cleared + // after each snapshot application, except when the node crashed right before + // it can clean it up. If this fails it's not a correctness issue since the + // storage is also cleared before receiving a snapshot. + s.sss = NewSSTSnapshotStorage(s.engine, s.limiters.BulkIOWriteRate) + if err := s.sss.Clear(); err != nil { + log.Warningf(ctx, "failed to clear snapshot storage: %v", err) + } + // On low-CPU instances, a default limit value may still allow ExportRequests // to tie up all cores so cap limiter at cores-1 when setting value is higher. exportCores := runtime.NumCPU() - 1 diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go index 7aafe83985a4..b11fc4fb6596 100644 --- a/pkg/storage/store_snapshot.go +++ b/pkg/storage/store_snapshot.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/rditer" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -78,6 +79,9 @@ type snapshotStrategy interface { // Status provides a status report on the work performed during the // snapshot. Only valid if the strategy succeeded. Status() string + + // Close cleans up any resources associated with the snapshot strategy. + Close(context.Context) } func assertStrategy( @@ -94,47 +98,225 @@ type kvBatchSnapshotStrategy struct { raftCfg *base.RaftConfig status string - // Fields used when sending snapshots. + // The size of the batches of PUT operations to send to the receiver of the + // snapshot. Only used on the sender side. batchSize int64 - limiter *rate.Limiter - newBatch func() engine.Batch + // Limiter for sending KV batches. Only used on the sender side. + limiter *rate.Limiter + // Only used on the sender side. + newBatch func() engine.Batch + + // The approximate size of the SST chunk to buffer in memory on the receiver + // before flushing to disk. Only used on the receiver side. + sstChunkSize int64 + // Only used on the receiver side. + ssss *SSTSnapshotStorageScratch +} + +// multiSSTWriter is a wrapper around RocksDBSstFileWriter and +// SSTSnapshotStorageScratch that handles chunking SSTs and persisting them to +// disk. +type multiSSTWriter struct { + ssss *SSTSnapshotStorageScratch + currSST engine.RocksDBSstFileWriter + currSSTFile *SSTSnapshotStorageFile + keyRanges []rditer.KeyRange + currRange int + // The size of the SST the last time the SST file writer was truncated. This + // size is used to determine the size of the SST chunk buffered in-memory. + truncatedSize int64 + // The approximate size of the SST chunk to buffer in memory on the receiver + // before flushing to disk. + sstChunkSize int64 +} + +func newMultiSSTWriter( + ssss *SSTSnapshotStorageScratch, keyRanges []rditer.KeyRange, sstChunkSize int64, +) (multiSSTWriter, error) { + msstw := multiSSTWriter{ + ssss: ssss, + keyRanges: keyRanges, + sstChunkSize: sstChunkSize, + } + if err := msstw.initSST(); err != nil { + return msstw, err + } + return msstw, nil +} + +func (msstw *multiSSTWriter) initSST() error { + newSSTFile, err := msstw.ssss.NewFile() + if err != nil { + return errors.Wrap(err, "failed to create new sst file") + } + msstw.currSSTFile = newSSTFile + newSST, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return errors.Wrap(err, "failed to create sst file writer") + } + msstw.currSST = newSST + if err := msstw.currSST.ClearRange(msstw.keyRanges[msstw.currRange].Start, msstw.keyRanges[msstw.currRange].End); err != nil { + msstw.currSST.Close() + return errors.Wrap(err, "failed to clear range on sst file writer") + } + msstw.truncatedSize = 0 + return nil +} + +func (msstw *multiSSTWriter) finalizeSST(ctx context.Context) error { + chunk, err := msstw.currSST.Finish() + if err != nil { + return errors.Wrap(err, "failed to finish sst") + } + if err := msstw.currSSTFile.Write(ctx, chunk); err != nil { + return errors.Wrap(err, "failed to write to sst file") + } + if err := msstw.currSSTFile.Close(); err != nil { + return errors.Wrap(err, "failed to close sst file") + } + msstw.currRange++ + msstw.currSST.Close() + return nil +} + +func (msstw *multiSSTWriter) Put(ctx context.Context, key engine.MVCCKey, value []byte) error { + for msstw.keyRanges[msstw.currRange].End.Key.Compare(key.Key) <= 0 { + // Finish the current SST, write to the file, and move to the next key + // range. + if err := msstw.finalizeSST(ctx); err != nil { + return err + } + if err := msstw.initSST(); err != nil { + return err + } + } + if msstw.keyRanges[msstw.currRange].Start.Key.Compare(key.Key) > 0 { + return crdberrors.AssertionFailedf("client error: expected %s to fall in one of %s", key.Key, msstw.keyRanges) + } + if err := msstw.currSST.Put(key, value); err != nil { + return errors.Wrap(err, "failed to put in sst") + } + if msstw.currSST.DataSize-msstw.truncatedSize > msstw.sstChunkSize { + msstw.truncatedSize = msstw.currSST.DataSize + chunk, err := msstw.currSST.Truncate() + if err != nil { + return errors.Wrap(err, "failed to truncate sst") + } + // NOTE: Chunk may be empty due to the semantics of Truncate(), but Write() + // handles an empty chunk as a noop. + if err := msstw.currSSTFile.Write(ctx, chunk); err != nil { + return errors.Wrap(err, "failed to write to sst file") + } + } + return nil +} + +func (msstw *multiSSTWriter) Finish(ctx context.Context) error { + if msstw.currRange < len(msstw.keyRanges) { + for { + if err := msstw.finalizeSST(ctx); err != nil { + return err + } + if msstw.currRange >= len(msstw.keyRanges) { + break + } + if err := msstw.initSST(); err != nil { + return err + } + } + } + return nil +} + +func (msstw *multiSSTWriter) Close() error { + msstw.currSST.Close() + return msstw.currSSTFile.Close() } // Receive implements the snapshotStrategy interface. +// +// NOTE: This function assumes that the key-value pairs are sent in sorted +// order. The key-value pairs are sent in the following sorted order: +// +// 1. Replicated range-id local key range +// 2. Range-local key range +// 3. User key range func (kvSS *kvBatchSnapshotStrategy) Receive( ctx context.Context, stream incomingSnapshotStream, header SnapshotRequest_Header, ) (IncomingSnapshot, error) { assertStrategy(ctx, header, SnapshotRequest_KV_BATCH) - var batches [][]byte + // At the moment we'll write at most three SSTs. + // TODO(jeffreyxiao): Re-evaluate as the default range size grows. + keyRanges := rditer.MakeReplicatedKeyRanges(header.State.Desc) + msstw, err := newMultiSSTWriter(kvSS.ssss, keyRanges, kvSS.sstChunkSize) + if err != nil { + return noSnap, err + } + defer func() { + // Nothing actionable if closing multiSSTWriter. Closing the same SST and + // SST file multiple times is idempotent. + if err := msstw.Close(); err != nil { + log.Warningf(ctx, "failed to close multiSSTWriter: %v", err) + } + }() var logEntries [][]byte + for { req, err := stream.Recv() if err != nil { - return IncomingSnapshot{}, err + return noSnap, err } if req.Header != nil { err := errors.New("client error: provided a header mid-stream") - return IncomingSnapshot{}, sendSnapshotError(stream, err) + return noSnap, sendSnapshotError(stream, err) } if req.KVBatch != nil { - batches = append(batches, req.KVBatch) + batchReader, err := engine.NewRocksDBBatchReader(req.KVBatch) + if err != nil { + return noSnap, errors.Wrap(err, "failed to decode batch") + } + // All operations in the batch are guaranteed to be puts. + for batchReader.Next() { + if batchReader.BatchType() != engine.BatchTypeValue { + return noSnap, crdberrors.AssertionFailedf("expected type %d, found type %d", engine.BatchTypeValue, batchReader.BatchType()) + } + key, err := batchReader.MVCCKey() + if err != nil { + return noSnap, errors.Wrap(err, "failed to decode mvcc key") + } + if err := msstw.Put(ctx, key, batchReader.Value()); err != nil { + return noSnap, err + } + } } if req.LogEntries != nil { logEntries = append(logEntries, req.LogEntries...) } if req.Final { + // We finished receiving all batches and log entries. It's possible that + // we did not receive any key-value pairs for some of the key ranges, but + // we must still construct SSTs with range deletion tombstones to remove + // the data. + if err := msstw.Finish(ctx); err != nil { + return noSnap, err + } + + if err := msstw.Close(); err != nil { + return noSnap, err + } + snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) if err != nil { - err = errors.Wrap(err, "invalid snapshot") - return IncomingSnapshot{}, sendSnapshotError(stream, err) + err = errors.Wrap(err, "client error: invalid snapshot") + return noSnap, sendSnapshotError(stream, err) } inSnap := IncomingSnapshot{ UsesUnreplicatedTruncatedState: header.UnreplicatedTruncatedState, SnapUUID: snapUUID, - Batches: batches, + SSSS: kvSS.ssss, LogEntries: logEntries, State: &header.State, snapType: header.Type, @@ -153,7 +335,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( inSnap.snapType = SnapshotRequest_PREEMPTIVE } - kvSS.status = fmt.Sprintf("kv batches: %d, log entries: %d", len(batches), len(logEntries)) + kvSS.status = fmt.Sprintf("log entries: %d, ssts: %d", len(logEntries), len(kvSS.ssss.SSTs())) return inSnap, nil } } @@ -190,10 +372,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } if int64(b.Len()) >= kvSS.batchSize { - if err := kvSS.limiter.WaitN(ctx, 1); err != nil { - return err - } - if err := kvSS.sendBatch(stream, b); err != nil { + if err := kvSS.sendBatch(ctx, stream, b); err != nil { return err } b = nil @@ -204,10 +383,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } } if b != nil { - if err := kvSS.limiter.WaitN(ctx, 1); err != nil { - return err - } - if err := kvSS.sendBatch(stream, b); err != nil { + if err := kvSS.sendBatch(ctx, stream, b); err != nil { return err } } @@ -330,8 +506,11 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } func (kvSS *kvBatchSnapshotStrategy) sendBatch( - stream outgoingSnapshotStream, batch engine.Batch, + ctx context.Context, stream outgoingSnapshotStream, batch engine.Batch, ) error { + if err := kvSS.limiter.WaitN(ctx, 1); err != nil { + return err + } repr := batch.Repr() batch.Close() return stream.Send(&SnapshotRequest{KVBatch: repr}) @@ -340,6 +519,18 @@ func (kvSS *kvBatchSnapshotStrategy) sendBatch( // Status implements the snapshotStrategy interface. func (kvSS *kvBatchSnapshotStrategy) Status() string { return kvSS.status } +// Close implements the snapshotStrategy interface. +func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) { + if kvSS.ssss != nil { + // A failure to clean up the storage is benign except that it will leak + // disk space (which is reclaimed on node restart). It is unexpected + // though, so log a warning. + if err := kvSS.ssss.Clear(); err != nil { + log.Warningf(ctx, "error closing kvBatchSnapshotStrategy: %v", err) + } + } +} + // reserveSnapshot throttles incoming snapshots. The returned closure is used // to cleanup the reservation and release its resources. A nil cleanup function // and a non-empty rejectionMessage indicates the reservation was declined. @@ -631,9 +822,18 @@ func (s *Store) receiveSnapshot( var ss snapshotStrategy switch header.Strategy { case SnapshotRequest_KV_BATCH: + snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) + if err != nil { + err = errors.Wrap(err, "invalid snapshot") + return sendSnapshotError(stream, err) + } + ss = &kvBatchSnapshotStrategy{ - raftCfg: &s.cfg.RaftConfig, + raftCfg: &s.cfg.RaftConfig, + ssss: s.sss.NewSSTSnapshotStorageScratch(header.State.Desc.RangeID, snapUUID), + sstChunkSize: snapshotSSTWriteSyncRate.Get(&s.cfg.Settings.SV), } + defer ss.Close(ctx) default: return sendSnapshotError(stream, errors.Errorf("%s,r%d: unknown snapshot strategy: %s", @@ -697,6 +897,15 @@ var recoverySnapshotRate = settings.RegisterByteSizeSetting( envutil.EnvOrDefaultBytes("COCKROACH_RAFT_SNAPSHOT_RATE", 8<<20), ) +// snapshotSSTWriteSyncRate is the size of chunks to write before fsync-ing. +// The default of 2 MiB was chosen to be in line with the behavior in bulk-io. +// See sstWriteSyncRate. +var snapshotSSTWriteSyncRate = settings.RegisterByteSizeSetting( + "kv.snapshot_sst.sync_size", + "threshold after which snapshot SST writes must fsync", + 2<<20, /* 2 MiB */ +) + func snapshotRateLimit( st *cluster.Settings, priority SnapshotRequest_Priority, ) (rate.Limit, error) { diff --git a/pkg/storage/testing_knobs.go b/pkg/storage/testing_knobs.go index 30cf76e21458..be1e7a656063 100644 --- a/pkg/storage/testing_knobs.go +++ b/pkg/storage/testing_knobs.go @@ -195,6 +195,9 @@ type StoreTestingKnobs struct { // This ensures the `*Replica` will be materialized on the Store when it // returns. ReplicaAddStopAfterLearnerSnapshot func() bool + // BeforeSnapshotSSTIngestion is run just before the SSTs are ingested when + // applying a snapshot. + BeforeSnapshotSSTIngestion func(IncomingSnapshot, SnapshotRequest_Type, []string) error // MaxApplicationBatchSize enforces a maximum size on application batches. // This can be useful for testing conditions which require commands to be