Skip to content

Commit

Permalink
storage/engine: migrate Pebble timeseries merge API
Browse files Browse the repository at this point in the history
Migrate to the new batch merge API introduced in
cockroachdb/pebble#367. This PR also includes a
version update of Pebble to
cockroachdb/pebble@a8b23ef
in order to obtain the API change.

Release note: None
  • Loading branch information
ajkr committed Nov 1, 2019
1 parent cb127fd commit 0cbddc4
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 107 deletions.
4 changes: 2 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 48 additions & 16 deletions pkg/storage/engine/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,13 @@ func TestGoMergeCorruption(t *testing.T) {
if err == nil {
t.Errorf("goMerge: %d: expected error", i)
}
_, err = merge(nil /* key */, c.existing, c.update, nil /* buf */)
_, err = mergeValuesPebble(false /* reverse */, [][]byte{c.existing, c.update})
if err == nil {
t.Fatalf("pebble merge: %d: expected error", i)
t.Fatalf("pebble merge forward: %d: expected error", i)
}
_, err = mergeValuesPebble(true /* reverse */, [][]byte{c.existing, c.update})
if err == nil {
t.Fatalf("pebble merge reverse: %d: expected error", i)
}
}
}
Expand Down Expand Up @@ -222,17 +226,43 @@ func TestGoMergeAppend(t *testing.T) {
}
}

func MergeInternalTimeSeriesDataPebble(
sources ...roachpb.InternalTimeSeriesData,
func mergeValuesPebble(reverse bool, srcBytes [][]byte) ([]byte, error) {
if reverse {
valueMerger, err := MVCCMerger.Merge(nil /* key */, srcBytes[len(srcBytes)-1])
if err != nil {
return nil, err
}
for i := len(srcBytes) - 2; i >= 0; i-- {
err := valueMerger.MergeOlder(srcBytes[i])
if err != nil {
return nil, err
}
}
return valueMerger.Finish()
}
valueMerger, err := MVCCMerger.Merge(nil /* key */, srcBytes[0])
if err != nil {
return nil, err
}
for _, bytes := range srcBytes[1:] {
err := valueMerger.MergeNewer(bytes)
if err != nil {
return nil, err
}
}
return valueMerger.Finish()
}

func mergeInternalTimeSeriesDataPebble(
reverse bool, sources ...roachpb.InternalTimeSeriesData,
) (roachpb.InternalTimeSeriesData, error) {
srcBytes, err := serializeMergeInputs(sources...)
if err != nil {
return roachpb.InternalTimeSeriesData{}, nil
return roachpb.InternalTimeSeriesData{}, err
}
merger := MVCCMerger
var mergedBytes = srcBytes[0]
for _, bytes := range srcBytes[1:] {
mergedBytes = merger.Merge(nil /* key */, mergedBytes, bytes, nil /* buf */)
mergedBytes, err := mergeValuesPebble(reverse, srcBytes)
if err != nil {
return roachpb.InternalTimeSeriesData{}, err
}
return deserializeMergeOutput(mergedBytes)
}
Expand Down Expand Up @@ -539,17 +569,19 @@ func TestGoMergeTimeSeries(t *testing.T) {
}
}
}
if len(operands) < 2 {
// TODO(ajkr): Pebble merge operator isn't currently called with one operand,
// though maybe it should be to match RocksDB behavior.
return
resultTS, err := mergeInternalTimeSeriesDataPebble(false /* reverse */, operands...)
if err != nil {
t.Errorf("pebble merge forward error: %s", err.Error())
}
if a, e := resultTS, expectedTS; !reflect.DeepEqual(a, e) {
t.Errorf("pebble merge forward returned wrong result got %v, wanted %v", a, e)
}
resultTS, err := MergeInternalTimeSeriesDataPebble(operands...)
resultTS, err = mergeInternalTimeSeriesDataPebble(true /* reverse */, operands...)
if err != nil {
t.Errorf("MergeInternalTimeSeriesDataPebble error: %s", err.Error())
t.Errorf("pebble merge reverse error: %s", err.Error())
}
if a, e := resultTS, expectedTS; !reflect.DeepEqual(a, e) {
t.Errorf("MergeInternalTimeSeriesDataPebble returned wrong result got %v, wanted %v", a, e)
t.Errorf("pebble merge reverse returned wrong result got %v, wanted %v", a, e)
}
})
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/engine/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/vfs"
Expand Down Expand Up @@ -81,12 +80,13 @@ var MVCCComparer = &pebble.Comparer{
// by Cockroach.
var MVCCMerger = &pebble.Merger{
Name: "cockroach_merge_operator",
Merge: func(key, oldValue, newValue, buf []byte) []byte {
res, err := merge(key, oldValue, newValue, buf)
Merge: func(_, value []byte) (pebble.ValueMerger, error) {
res := &MVCCValueMerger{}
err := res.MergeNewer(value)
if err != nil {
log.Fatalf(context.Background(), "merge: %v", err)
return nil, err
}
return res
return res, nil
},
}

Expand Down
220 changes: 137 additions & 83 deletions pkg/storage/engine/pebble_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,110 +156,164 @@ func ensureColumnar(ts *roachpb.InternalTimeSeriesData) {
ts.Samples = ts.Samples[:0]
}

// mergeTimeSeries combines two `InternalTimeSeriesData`s and returns the result as an
// `InternalTimeSeriesData`. The inputs cannot be merged if they have different start
// timestamps or sample durations.
func mergeTimeSeries(
oldTs, newTs roachpb.InternalTimeSeriesData,
) (roachpb.InternalTimeSeriesData, error) {
if oldTs.StartTimestampNanos != newTs.StartTimestampNanos {
return roachpb.InternalTimeSeriesData{}, errors.Errorf("start timestamp mismatch")
// MVCCValueMerger implements the `ValueMerger` interface. It buffers
// deserialized values in a slice in order specified by `oldToNew`.
// It determines the order of incoming operands by whether they were added
// with `MergeNewer()` or `MergeOlder()`, reversing the slice as necessary
// to ensure operands are always appended. It merges these deserialized
// operands when `Finish()` is called.
//
// It supports merging either all `roachpb.InternalTimeSeriesData` values
// or all non-timeseries values. Attempting to merge a mixture of timeseries
// and non-timeseries values will result in an error.
type MVCCValueMerger struct {
timeSeriesOps []roachpb.InternalTimeSeriesData
rawByteOps [][]byte
oldToNew bool

// Used to avoid heap allocations when passing pointer to `Unmarshal()`.
meta enginepb.MVCCMetadata
}

const (
mvccChecksumSize = 4
mvccTagPos = mvccChecksumSize
mvccHeaderSize = mvccChecksumSize + 1
)

func (t *MVCCValueMerger) ensureOrder(oldToNew bool) {
if oldToNew == t.oldToNew {
return
}
if oldTs.SampleDurationNanos != newTs.SampleDurationNanos {
return roachpb.InternalTimeSeriesData{}, errors.Errorf("sample duration mismatch")
// Only one of the two loop bodies should actually execute under error-free
// conditions, i.e., all operands are either timeseries or all are non-
// timeseries.
for i := 0; i < len(t.timeSeriesOps)/2; i++ {
t.timeSeriesOps[i], t.timeSeriesOps[len(t.timeSeriesOps)-1-i] = t.timeSeriesOps[len(t.timeSeriesOps)-1-i], t.timeSeriesOps[i]
}

// TODO(ajkr): confirm it is the case that (1) today's CRDB always merges timeseries
// values in columnar format, and (2) today's CRDB does not need to be downgrade-
// compatible with any version that supports row format only. Then we can drop support
// for row format entirely. It requires significant cleanup effort as many tests target
// the row format.
if len(oldTs.Offset) > 0 || len(newTs.Offset) > 0 {
ensureColumnar(&oldTs)
ensureColumnar(&newTs)
proto.Merge(&oldTs, &newTs)
sortAndDeduplicateColumns(&oldTs)
} else {
proto.Merge(&oldTs, &newTs)
sortAndDeduplicateRows(&oldTs)
for i := 0; i < len(t.rawByteOps)/2; i++ {
t.rawByteOps[i], t.rawByteOps[len(t.rawByteOps)-1-i] = t.rawByteOps[len(t.rawByteOps)-1-i], t.rawByteOps[i]
}
return oldTs, nil
t.oldToNew = oldToNew
}

// mergeTimeSeriesValues attempts to merge two values which contain
// InternalTimeSeriesData messages.
func mergeTimeSeriesValues(oldTsBytes, newTsBytes []byte) ([]byte, error) {
var oldTs, newTs, mergedTs roachpb.InternalTimeSeriesData
if err := protoutil.Unmarshal(oldTsBytes, &oldTs); err != nil {
return nil, errors.Errorf("corrupted old timeseries: %v", err)
func (t *MVCCValueMerger) deserializeMVCCValueAndAppend(value []byte) error {
if err := protoutil.Unmarshal(value, &t.meta); err != nil {
return errors.Errorf("corrupted operand value: %v", err)
}
if err := protoutil.Unmarshal(newTsBytes, &newTs); err != nil {
return nil, errors.Errorf("corrupted new timeseries: %v", err)
if len(t.meta.RawBytes) < mvccHeaderSize {
return errors.Errorf("operand value too short")
}

var err error
if mergedTs, err = mergeTimeSeries(oldTs, newTs); err != nil {
return nil, errors.Errorf("mergeTimeSeries: %v", err)
if t.meta.RawBytes[mvccTagPos] == byte(roachpb.ValueType_TIMESERIES) {
if t.rawByteOps != nil {
return errors.Errorf("inconsistent value types for timeseries merge")
}
t.timeSeriesOps = append(t.timeSeriesOps, roachpb.InternalTimeSeriesData{})
ts := &t.timeSeriesOps[len(t.timeSeriesOps)-1]
if err := protoutil.Unmarshal(t.meta.RawBytes[mvccHeaderSize:], ts); err != nil {
return errors.Errorf("corrupted timeseries: %v", err)
}
} else {
if t.timeSeriesOps != nil {
return errors.Errorf("inconsistent value types for non-timeseries merge")
}
t.rawByteOps = append(t.rawByteOps, t.meta.RawBytes[mvccHeaderSize:])
}
return nil
}

res, err := protoutil.Marshal(&mergedTs)
if err != nil {
return nil, errors.Errorf("corrupted merged timeseries: %v", err)
// MergeNewer deserializes the value and appends it to the slice corresponding to its type
// (timeseries or non-timeseries). The slice will be reversed if needed such that it is in
// old-to-new order.
func (t *MVCCValueMerger) MergeNewer(value []byte) error {
t.ensureOrder(true /* oldToNew */)
if err := t.deserializeMVCCValueAndAppend(value); err != nil {
return err
}
return res, nil
return nil
}

// merge combines two serialized `MVCCMetadata`s and returns the result as a serialized
// `MVCCMetadata`.
//
// Replay Advisory: Because merge commands pass through raft, it is possible
// for merging values to be "replayed". Currently, the only actual use of
// the merge system is for time series data, which is safe against replay;
// however, this property is not general for all potential mergeable types.
// If a future need arises to merge another type of data, replay protection
// will likely need to be a consideration.
func merge(key, oldValue, newValue, buf []byte) ([]byte, error) {
const (
checksumSize = 4
tagPos = checksumSize
headerSize = checksumSize + 1
)

var oldMeta, newMeta, mergedMeta enginepb.MVCCMetadata
if err := protoutil.Unmarshal(oldValue, &oldMeta); err != nil {
return nil, errors.Errorf("corrupted old operand value: %v", err)
}
if len(oldMeta.RawBytes) < headerSize {
return nil, errors.Errorf("old operand value too short")
// MergeOlder deserializes the value and appends it to the slice corresponding to its type
// (timeseries or non-timeseries). The slice will be reversed if needed such that it is in
// new-to-old order.
func (t *MVCCValueMerger) MergeOlder(value []byte) error {
t.ensureOrder(false /* oldToNew */)
if err := t.deserializeMVCCValueAndAppend(value); err != nil {
return err
}
if err := protoutil.Unmarshal(newValue, &newMeta); err != nil {
return nil, errors.Errorf("corrupted new operand value: %v", err)
return nil
}

// Finish combines the buffered values from all `Merge*()` calls and marshals the result.
// In case of non-timeseries the values are simply concatenated from old to new. In case
// of timeseries the values are sorted, deduplicated, and potentially migrated to columnar
// format. When deduplicating, only the latest sample for a given offset is retained.
func (t *MVCCValueMerger) Finish() ([]byte, error) {
isColumnar := false
if t.timeSeriesOps == nil && t.rawByteOps == nil {
return nil, errors.Errorf("empty merge unsupported")
}
if len(newMeta.RawBytes) < headerSize {
return nil, errors.Errorf("new operand value too short")
t.ensureOrder(true /* oldToNew */)
if t.timeSeriesOps == nil {
// Concatenate non-timeseries operands from old to new
totalLen := 0
for _, rawByteOp := range t.rawByteOps {
totalLen += len(rawByteOp)
}
var meta enginepb.MVCCMetadata
meta.RawBytes = make([]byte, mvccHeaderSize, mvccHeaderSize+totalLen)
meta.RawBytes[mvccTagPos] = byte(roachpb.ValueType_BYTES)
for _, rawByteOp := range t.rawByteOps {
meta.RawBytes = append(meta.RawBytes, rawByteOp...)
}
res, err := protoutil.Marshal(&meta)
if err != nil {
return nil, err
}
return res, nil
}

tsTag := byte(roachpb.ValueType_TIMESERIES)
if oldMeta.RawBytes[tagPos] == tsTag || newMeta.RawBytes[tagPos] == tsTag {
if oldMeta.RawBytes[tagPos] != tsTag || newMeta.RawBytes[tagPos] != tsTag {
return nil, errors.Errorf("inconsistent value types for timeseries merge")
// TODO(ajkr): confirm it is the case that (1) today's CRDB always merges timeseries
// values in columnar format, and (2) today's CRDB does not need to be downgrade-
// compatible with any version that supports row format only. Then we can drop support
// for row format entirely. It requires significant cleanup effort as many tests target
// the row format.
var merged roachpb.InternalTimeSeriesData
merged.StartTimestampNanos = t.timeSeriesOps[0].StartTimestampNanos
merged.SampleDurationNanos = t.timeSeriesOps[0].SampleDurationNanos
for _, timeSeriesOp := range t.timeSeriesOps {
if timeSeriesOp.StartTimestampNanos != merged.StartTimestampNanos {
return nil, errors.Errorf("start timestamp mismatch")
}
tsBytes, err := mergeTimeSeriesValues(
oldMeta.RawBytes[headerSize:], newMeta.RawBytes[headerSize:])
if err != nil {
return nil, errors.Errorf("mergeTimeSeriesValues: %v", err)
if timeSeriesOp.SampleDurationNanos != merged.SampleDurationNanos {
return nil, errors.Errorf("sample duration mismatch")
}
if !isColumnar && len(timeSeriesOp.Offset) > 0 {
ensureColumnar(&merged)
ensureColumnar(&timeSeriesOp)
isColumnar = true
} else if isColumnar {
ensureColumnar(&timeSeriesOp)
}
header := make([]byte, headerSize)
header[tagPos] = tsTag
mergedMeta.RawBytes = append(header, tsBytes...)
proto.Merge(&merged, &timeSeriesOp)
}
if isColumnar {
sortAndDeduplicateColumns(&merged)
} else {
// For non-timeseries values, merge is a simple append.
mergedMeta.RawBytes = append(oldMeta.RawBytes, newMeta.RawBytes[headerSize:]...)
sortAndDeduplicateRows(&merged)
}

res, err := protoutil.Marshal(&mergedMeta)
tsBytes, err := protoutil.Marshal(&merged)
if err != nil {
return nil, err
}
var meta enginepb.MVCCMetadata
tsTag := byte(roachpb.ValueType_TIMESERIES)
header := make([]byte, mvccHeaderSize)
header[mvccTagPos] = tsTag
meta.RawBytes = append(header, tsBytes...)
res, err := protoutil.Marshal(&meta)
if err != nil {
return nil, errors.Errorf("corrupted merged value: %v", err)
return nil, err
}
return res, nil
}

0 comments on commit 0cbddc4

Please sign in to comment.