Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RawKV] support checksum #519

Merged
merged 6 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ type RawKV interface {
RawBatchDelete(cf string, keys [][]byte)
RawDeleteRange(cf string, startKey, endKey []byte)
RawCompareAndSwap(cf string, key, expectedValue, newvalue []byte) ([]byte, bool, error)
RawChecksum(cf string, startKey, endKey []byte) (uint64, uint64, uint64, error)
}

// MVCCDebugger is for debugging.
Expand Down
39 changes: 39 additions & 0 deletions internal/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ package mocktikv

import (
"bytes"
"hash/crc64"
"math"
"sync"

Expand Down Expand Up @@ -1778,6 +1779,44 @@ func (mvcc *MVCCLevelDB) RawReverseScan(cf string, startKey, endKey []byte, limi
return pairs
}

// RawChecksum implements the RawKV interface.
func (mvcc *MVCCLevelDB) RawChecksum(cf string, startKey, endKey []byte) (uint64, uint64, uint64, error) {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()

db := mvcc.getDB(cf)
if db == nil {
return 0, 0, 0, nil
}

iter := db.NewIterator(&util.Range{
Start: startKey,
}, nil)

crc64Xor := uint64(0)
totalKvs := uint64(0)
totalBytes := uint64(0)
digest := crc64.New(crc64.MakeTable(crc64.ECMA))
for iter.Next() {
key := iter.Key()
value := iter.Value()
err := iter.Error()
if err != nil {
return 0, 0, 0, err
}
if len(endKey) > 0 && bytes.Compare(key, endKey) >= 0 {
break
}
digest.Reset()
digest.Write(key)
digest.Write(value)
crc64Xor ^= digest.Sum64()
totalKvs++
totalBytes += (uint64)(len(key) + len(value))
}
return crc64Xor, totalKvs, totalBytes, nil
}

// RawDeleteRange implements the RawKV interface.
func (mvcc *MVCCLevelDB) RawDeleteRange(cf string, startKey, endKey []byte) {
tikverr.Log(mvcc.doRawDeleteRange(cf, startKey, endKey))
Expand Down
49 changes: 49 additions & 0 deletions internal/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,48 @@ func (h kvHandler) handleKvRawScan(req *kvrpcpb.RawScanRequest) *kvrpcpb.RawScan
}
}

func (h kvHandler) handleKvRawChecksum(req *kvrpcpb.RawChecksumRequest) *kvrpcpb.RawChecksumResponse {
rawKV, ok := h.mvccStore.(RawKV)
if !ok {
errStr := "not implemented"
return &kvrpcpb.RawChecksumResponse{
RegionError: &errorpb.Error{
Message: errStr,
},
}
}

crc64Xor := uint64(0)
totalKvs := uint64(0)
totalBytes := uint64(0)
for _, r := range req.Ranges {
upperBound := h.endKey
if len(r.EndKey) > 0 && (len(upperBound) == 0 || bytes.Compare(r.EndKey, upperBound) < 0) {
upperBound = r.EndKey
}
rangeCrc64Xor, rangeKvs, rangeBytes, err := rawKV.RawChecksum(
"CF_DEFAULT",
r.StartKey,
upperBound,
)
if err != nil {
return &kvrpcpb.RawChecksumResponse{
RegionError: &errorpb.Error{
Message: err.Error(),
},
}
}
crc64Xor ^= rangeCrc64Xor
totalKvs += rangeKvs
totalBytes += rangeBytes
}
return &kvrpcpb.RawChecksumResponse{
Checksum: crc64Xor,
TotalKvs: totalKvs,
TotalBytes: totalBytes,
}
}

func (h kvHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.SplitRegionResponse {
keys := req.GetSplitKeys()
resp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)+1)}
Expand Down Expand Up @@ -933,6 +975,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
return resp, nil
}
resp.Resp = kvHandler{session}.HandleKvRawCompareAndSwap(r)
case tikvrpc.CmdRawChecksum:
r := req.RawChecksum()
if err := session.checkRequest(reqCtx, r.Size()); err != nil {
resp.Resp = &kvrpcpb.RawScanResponse{RegionError: err}
return resp, nil
}
resp.Resp = kvHandler{session}.handleKvRawChecksum(r)
case tikvrpc.CmdUnsafeDestroyRange:
panic("unimplemented")
case tikvrpc.CmdRegisterLockObserver:
Expand Down
2 changes: 2 additions & 0 deletions metrics/shortcuts.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var (
RawkvCmdHistogramWithRawReversScan prometheus.Observer
RawkvSizeHistogramWithKey prometheus.Observer
RawkvSizeHistogramWithValue prometheus.Observer
RawkvCmdHistogramWithRawChecksum prometheus.Observer

BackoffHistogramRPC prometheus.Observer
BackoffHistogramLock prometheus.Observer
Expand Down Expand Up @@ -152,6 +153,7 @@ func initShortcuts() {
RawkvCmdHistogramWithRawReversScan = TiKVRawkvCmdHistogram.WithLabelValues("raw_reverse_scan")
RawkvSizeHistogramWithKey = TiKVRawkvSizeHistogram.WithLabelValues("key")
RawkvSizeHistogramWithValue = TiKVRawkvSizeHistogram.WithLabelValues("value")
RawkvCmdHistogramWithRawChecksum = TiKVRawkvSizeHistogram.WithLabelValues("raw_checksum")

BackoffHistogramRPC = TiKVBackoffHistogram.WithLabelValues("tikvRPC")
BackoffHistogramLock = TiKVBackoffHistogram.WithLabelValues("txnLock")
Expand Down
48 changes: 48 additions & 0 deletions rawkv/rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ type rawOptions struct {
KeyOnly bool
}

// RawChecksum represents the checksum result of raw kv pairs in TiKV cluster.
type RawChecksum struct {
// Crc64Xor is the checksum result with crc64 algorithm
Crc64Xor uint64
// TotalKvs is the total number of kvpairs
TotalKvs uint64
// TotalBytes is the total bytes of kvpairs, including prefix in APIV2
TotalBytes uint64
}

// RawOption represents possible options that can be cotrolled by the user
// to tweak the API behavior.
//
Expand Down Expand Up @@ -504,6 +514,44 @@ func (c *Client) ReverseScan(ctx context.Context, startKey, endKey []byte, limit
return
}

// Checksum do checksum of continuous kv pairs in range [startKey, endKey).
// If endKey is empty, it means unbounded.
// If you want to exclude the startKey or include the endKey, push a '\0' to the key. For example, to scan
// (startKey, endKey], you can write:
// `Checksum(ctx, push(startKey, '\0'), push(endKey, '\0'))`.
func (c *Client) Checksum(ctx context.Context, startKey, endKey []byte, options ...RawOption,
) (check RawChecksum, err error) {

start := time.Now()
defer func() { metrics.RawkvCmdHistogramWithRawChecksum.Observe(time.Since(start).Seconds()) }()

for len(endKey) == 0 || bytes.Compare(startKey, endKey) < 0 {
req := tikvrpc.NewRequest(tikvrpc.CmdRawChecksum, &kvrpcpb.RawChecksumRequest{
Algorithm: kvrpcpb.ChecksumAlgorithm_Crc64_Xor,
Ranges: []*kvrpcpb.KeyRange{{
StartKey: startKey,
EndKey: endKey,
}},
})
resp, loc, err := c.sendReq(ctx, startKey, req, false)
if err != nil {
return RawChecksum{0, 0, 0}, err
}
if resp.Resp == nil {
return RawChecksum{0, 0, 0}, errors.WithStack(tikverr.ErrBodyMissing)
}
cmdResp := resp.Resp.(*kvrpcpb.RawChecksumResponse)
check.Crc64Xor ^= cmdResp.GetChecksum()
check.TotalKvs += cmdResp.GetTotalKvs()
check.TotalBytes += cmdResp.GetTotalBytes()
startKey = loc.EndKey
if len(startKey) == 0 {
break
}
}
return
}

// CompareAndSwap results in an atomic compare-and-set operation for the given key while SetAtomicForCAS(true)
// If the value retrieved is equal to previousValue, newValue is written.
// It returns the previous value and whether the value is successfully swapped.
Expand Down
54 changes: 54 additions & 0 deletions rawkv/rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"bytes"
"context"
"fmt"
"hash/crc64"
"testing"

"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -571,3 +572,56 @@ func (s *testRawkvSuite) TestCompareAndSwap() {
s.Nil(err)
s.Equal(string(v), string(newValue))
}

func (s *testRawkvSuite) TestRawChecksum() {
mvccStore := mocktikv.MustNewMVCCStore()
defer mvccStore.Close()

client := &Client{
clusterID: 0,
regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)),
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil),
}
defer client.Close()

cf := "CF_DEFAULT"
paris := map[string]string{
"db": "TiDB",
"key2": "value2",
"key1": "value1",
"key4": "value4",
"key3": "value3",
"kv": "TiKV",
}
keys := make([]key, 0)
values := make([]value, 0)
for k, v := range paris {
keys = append(keys, []byte(k))
values = append(values, []byte(v))
}

expectCrc64Xor := uint64(0)
expectTotalKvs := uint64(0)
expectTotalBytes := uint64(0)
digest := crc64.New(crc64.MakeTable(crc64.ECMA))
for i, key := range keys {
digest.Reset()
digest.Write(key)
digest.Write(values[i])
expectCrc64Xor ^= digest.Sum64()
expectTotalKvs++
expectTotalBytes += (uint64)(len(key) + len(values[i]))
}

// BatchPut
err := client.BatchPut(context.Background(), keys, values, SetColumnFamily(cf))
s.Nil(err)

// test Checksum
startKey, endKey := []byte("db"), []byte(nil)
check, err := client.Checksum(context.Background(), startKey, endKey, SetColumnFamily(cf))
s.Nil(err)
s.Equal(expectCrc64Xor, check.Crc64Xor)
s.Equal(expectTotalKvs, check.TotalKvs)
s.Equal(expectTotalBytes, check.TotalBytes)
}
16 changes: 16 additions & 0 deletions tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ const (
CmdRawScan
CmdGetKeyTTL
CmdRawCompareAndSwap
CmdRawChecksum

CmdUnsafeDestroyRange

Expand Down Expand Up @@ -156,6 +157,8 @@ func (t CmdType) String() string {
return "RawDeleteRange"
case CmdRawScan:
return "RawScan"
case CmdRawChecksum:
return "RawChecksum"
case CmdUnsafeDestroyRange:
return "UnsafeDestroyRange"
case CmdRegisterLockObserver:
Expand Down Expand Up @@ -388,6 +391,11 @@ func (req *Request) RawCompareAndSwap() *kvrpcpb.RawCASRequest {
return req.Req.(*kvrpcpb.RawCASRequest)
}

// RawChecksum returns RawChecksumRequest in request.
func (req *Request) RawChecksum() *kvrpcpb.RawChecksumRequest {
return req.Req.(*kvrpcpb.RawChecksumRequest)
}

// RegisterLockObserver returns RegisterLockObserverRequest in request.
func (req *Request) RegisterLockObserver() *kvrpcpb.RegisterLockObserverRequest {
return req.Req.(*kvrpcpb.RegisterLockObserverRequest)
Expand Down Expand Up @@ -713,6 +721,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
req.RawGetKeyTTL().Context = ctx
case CmdRawCompareAndSwap:
req.RawCompareAndSwap().Context = ctx
case CmdRawChecksum:
req.RawChecksum().Context = ctx
case CmdRegisterLockObserver:
req.RegisterLockObserver().Context = ctx
case CmdCheckLockObserver:
Expand Down Expand Up @@ -851,6 +861,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) {
p = &kvrpcpb.RawCASResponse{
RegionError: e,
}
case CmdRawChecksum:
p = &kvrpcpb.RawChecksumResponse{
RegionError: e,
}
case CmdCop:
p = &coprocessor.Response{
RegionError: e,
Expand Down Expand Up @@ -967,6 +981,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp
resp.Resp, err = client.RawGetKeyTTL(ctx, req.RawGetKeyTTL())
case CmdRawCompareAndSwap:
resp.Resp, err = client.RawCompareAndSwap(ctx, req.RawCompareAndSwap())
case CmdRawChecksum:
resp.Resp, err = client.RawChecksum(ctx, req.RawChecksum())
case CmdRegisterLockObserver:
resp.Resp, err = client.RegisterLockObserver(ctx, req.RegisterLockObserver())
case CmdCheckLockObserver:
Expand Down