From 6955c4f2893f32d15ec99312cd74f0f0bbc4a6a5 Mon Sep 17 00:00:00 2001 From: haojinming Date: Wed, 25 May 2022 17:38:01 +0800 Subject: [PATCH 1/4] support checksum Signed-off-by: haojinming --- internal/mockstore/mocktikv/mvcc.go | 1 + internal/mockstore/mocktikv/mvcc_leveldb.go | 39 +++++++++++++++ internal/mockstore/mocktikv/rpc.go | 49 +++++++++++++++++++ metrics/shortcuts.go | 2 + rawkv/rawkv.go | 41 ++++++++++++++++ rawkv/rawkv_test.go | 54 +++++++++++++++++++++ tikvrpc/tikvrpc.go | 16 ++++++ 7 files changed, 202 insertions(+) diff --git a/internal/mockstore/mocktikv/mvcc.go b/internal/mockstore/mocktikv/mvcc.go index e0a7ee6525..c279efb218 100644 --- a/internal/mockstore/mocktikv/mvcc.go +++ b/internal/mockstore/mocktikv/mvcc.go @@ -296,6 +296,7 @@ type RawKV interface { RawBatchDelete(cf string, keys [][]byte) RawDeleteRange(cf string, startKey, endKey []byte) RawCompareAndSwap(cf string, key, expectedValue, newvalue []byte) ([]byte, bool, error) + RawChecksum(cf string, startKey, endKey []byte) (uint64, uint64, uint64, error) } // MVCCDebugger is for debugging. diff --git a/internal/mockstore/mocktikv/mvcc_leveldb.go b/internal/mockstore/mocktikv/mvcc_leveldb.go index cd343801f0..05f5dd0e03 100644 --- a/internal/mockstore/mocktikv/mvcc_leveldb.go +++ b/internal/mockstore/mocktikv/mvcc_leveldb.go @@ -36,6 +36,7 @@ package mocktikv import ( "bytes" + "hash/crc64" "math" "sync" @@ -1778,6 +1779,44 @@ func (mvcc *MVCCLevelDB) RawReverseScan(cf string, startKey, endKey []byte, limi return pairs } +// RawChecksum implements the RawKV interface. +func (mvcc *MVCCLevelDB) RawChecksum(cf string, startKey, endKey []byte) (uint64, uint64, uint64, error) { + mvcc.mu.Lock() + defer mvcc.mu.Unlock() + + db := mvcc.getDB(cf) + if db == nil { + return 0, 0, 0, nil + } + + iter := db.NewIterator(&util.Range{ + Start: startKey, + }, nil) + + crc64Xor := uint64(0) + totalKvs := uint64(0) + totalBytes := uint64(0) + digest := crc64.New(crc64.MakeTable(crc64.ECMA)) + for iter.Next() { + key := iter.Key() + value := iter.Value() + err := iter.Error() + if err != nil { + return 0, 0, 0, err + } + if len(endKey) > 0 && bytes.Compare(key, endKey) >= 0 { + break + } + digest.Reset() + digest.Write(key) + digest.Write(value) + crc64Xor ^= digest.Sum64() + totalKvs += 1 + totalBytes += (uint64)(len(key) + len(value)) + } + return crc64Xor, totalKvs, totalBytes, nil +} + // RawDeleteRange implements the RawKV interface. func (mvcc *MVCCLevelDB) RawDeleteRange(cf string, startKey, endKey []byte) { tikverr.Log(mvcc.doRawDeleteRange(cf, startKey, endKey)) diff --git a/internal/mockstore/mocktikv/rpc.go b/internal/mockstore/mocktikv/rpc.go index f17d00d60a..7b5b0cc1b3 100644 --- a/internal/mockstore/mocktikv/rpc.go +++ b/internal/mockstore/mocktikv/rpc.go @@ -596,6 +596,48 @@ func (h kvHandler) handleKvRawScan(req *kvrpcpb.RawScanRequest) *kvrpcpb.RawScan } } +func (h kvHandler) handleKvRawChecksum(req *kvrpcpb.RawChecksumRequest) *kvrpcpb.RawChecksumResponse { + rawKV, ok := h.mvccStore.(RawKV) + if !ok { + errStr := "not implemented" + return &kvrpcpb.RawChecksumResponse{ + RegionError: &errorpb.Error{ + Message: errStr, + }, + } + } + + crc64Xor := uint64(0) + totalKvs := uint64(0) + totalBytes := uint64(0) + for _, r := range req.Ranges { + upperBound := h.endKey + if len(r.EndKey) > 0 && (len(upperBound) == 0 || bytes.Compare(r.EndKey, upperBound) < 0) { + upperBound = r.EndKey + } + rangeCrc64Xor, rangeKvs, rangeBytes, err := rawKV.RawChecksum( + "CF_DEFAULT", + r.StartKey, + upperBound, + ) + if err != nil { + return &kvrpcpb.RawChecksumResponse{ + RegionError: &errorpb.Error{ + Message: err.Error(), + }, + } + } + crc64Xor ^= rangeCrc64Xor + totalKvs += rangeKvs + totalBytes += rangeBytes + } + return &kvrpcpb.RawChecksumResponse{ + Checksum: crc64Xor, + TotalKvs: totalKvs, + TotalBytes: totalBytes, + } +} + func (h kvHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.SplitRegionResponse { keys := req.GetSplitKeys() resp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)+1)} @@ -933,6 +975,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R return resp, nil } resp.Resp = kvHandler{session}.HandleKvRawCompareAndSwap(r) + case tikvrpc.CmdRawChecksum: + r := req.RawChecksum() + if err := session.checkRequest(reqCtx, r.Size()); err != nil { + resp.Resp = &kvrpcpb.RawScanResponse{RegionError: err} + return resp, nil + } + resp.Resp = kvHandler{session}.handleKvRawChecksum(r) case tikvrpc.CmdUnsafeDestroyRange: panic("unimplemented") case tikvrpc.CmdRegisterLockObserver: diff --git a/metrics/shortcuts.go b/metrics/shortcuts.go index ebdf740347..0b8a84ef13 100644 --- a/metrics/shortcuts.go +++ b/metrics/shortcuts.go @@ -53,6 +53,7 @@ var ( RawkvCmdHistogramWithRawReversScan prometheus.Observer RawkvSizeHistogramWithKey prometheus.Observer RawkvSizeHistogramWithValue prometheus.Observer + RawkvCmdHistogramWithRawChecksum prometheus.Observer BackoffHistogramRPC prometheus.Observer BackoffHistogramLock prometheus.Observer @@ -146,6 +147,7 @@ func initShortcuts() { RawkvCmdHistogramWithRawReversScan = TiKVRawkvCmdHistogram.WithLabelValues("raw_reverse_scan") RawkvSizeHistogramWithKey = TiKVRawkvSizeHistogram.WithLabelValues("key") RawkvSizeHistogramWithValue = TiKVRawkvSizeHistogram.WithLabelValues("value") + RawkvCmdHistogramWithRawChecksum = TiKVRawkvSizeHistogram.WithLabelValues("raw_checksum") BackoffHistogramRPC = TiKVBackoffHistogram.WithLabelValues("tikvRPC") BackoffHistogramLock = TiKVBackoffHistogram.WithLabelValues("txnLock") diff --git a/rawkv/rawkv.go b/rawkv/rawkv.go index a93c07f145..116c9bc1e9 100644 --- a/rawkv/rawkv.go +++ b/rawkv/rawkv.go @@ -504,6 +504,47 @@ func (c *Client) ReverseScan(ctx context.Context, startKey, endKey []byte, limit return } +// Checksum do checksum of continuous kv pairs in range [startKey, endKey). +// If endKey is empty, it means unbounded. +// If you want to exclude the startKey or include the endKey, push a '\0' to the key. For example, to scan +// (startKey, endKey], you can write: +// `Scan(ctx, push(startKey, '\0'), push(endKey, '\0'), limit)`. +func (c *Client) Checksum(ctx context.Context, startKey, endKey []byte, options ...RawOption, +) (crc64Xor, totalKvs, totalBytes uint64, err error) { + + start := time.Now() + defer func() { metrics.RawkvCmdHistogramWithRawChecksum.Observe(time.Since(start).Seconds()) }() + + crc64Xor = 0 + totalKvs = 0 + totalBytes = 0 + for len(endKey) == 0 || bytes.Compare(startKey, endKey) < 0 { + req := tikvrpc.NewRequest(tikvrpc.CmdRawChecksum, &kvrpcpb.RawChecksumRequest{ + Algorithm: kvrpcpb.ChecksumAlgorithm_Crc64_Xor, + Ranges: []*kvrpcpb.KeyRange{{ + StartKey: startKey, + EndKey: endKey, + }}, + }) + resp, loc, err := c.sendReq(ctx, startKey, req, false) + if err != nil { + return 0, 0, 0, err + } + if resp.Resp == nil { + return 0, 0, 0, errors.WithStack(tikverr.ErrBodyMissing) + } + cmdResp := resp.Resp.(*kvrpcpb.RawChecksumResponse) + crc64Xor ^= cmdResp.GetChecksum() + totalKvs += cmdResp.GetTotalKvs() + totalBytes += cmdResp.GetTotalBytes() + startKey = loc.EndKey + if len(startKey) == 0 { + break + } + } + return +} + // CompareAndSwap results in an atomic compare-and-set operation for the given key while SetAtomicForCAS(true) // If the value retrieved is equal to previousValue, newValue is written. // It returns the previous value and whether the value is successfully swapped. diff --git a/rawkv/rawkv_test.go b/rawkv/rawkv_test.go index fb83f6056f..64731aee57 100644 --- a/rawkv/rawkv_test.go +++ b/rawkv/rawkv_test.go @@ -38,6 +38,7 @@ import ( "bytes" "context" "fmt" + "hash/crc64" "testing" "github.com/stretchr/testify/suite" @@ -571,3 +572,56 @@ func (s *testRawkvSuite) TestCompareAndSwap() { s.Nil(err) s.Equal(string(v), string(newValue)) } + +func (s *testRawkvSuite) TestRawChecksum() { + mvccStore := mocktikv.MustNewMVCCStore() + defer mvccStore.Close() + + client := &Client{ + clusterID: 0, + regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), + rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), + } + defer client.Close() + + cf := "CF_DEFAULT" + paris := map[string]string{ + "db": "TiDB", + "key2": "value2", + "key1": "value1", + "key4": "value4", + "key3": "value3", + "kv": "TiKV", + } + keys := make([]key, 0) + values := make([]value, 0) + for k, v := range paris { + keys = append(keys, []byte(k)) + values = append(values, []byte(v)) + } + + expectCrc64Xor := uint64(0) + expectTotalKvs := uint64(0) + expectTotalBytes := uint64(0) + digest := crc64.New(crc64.MakeTable(crc64.ECMA)) + for i, key := range keys { + digest.Reset() + digest.Write(key) + digest.Write(values[i]) + expectCrc64Xor ^= digest.Sum64() + expectTotalKvs += 1 + expectTotalBytes += (uint64)(len(key) + len(values[i])) + } + + // BatchPut + err := client.BatchPut(context.Background(), keys, values, SetColumnFamily(cf)) + s.Nil(err) + + // test Checksum + startKey, endKey := []byte("db"), []byte(nil) + crc64Xor, totalKvs, totalBytes, err := client.Checksum(context.Background(), startKey, endKey, SetColumnFamily(cf)) + s.Nil(err) + s.Equal(expectCrc64Xor, crc64Xor) + s.Equal(expectTotalKvs, totalKvs) + s.Equal(expectTotalBytes, totalBytes) +} diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 8ef8214e51..eae3f1a7a0 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -83,6 +83,7 @@ const ( CmdRawScan CmdGetKeyTTL CmdRawCompareAndSwap + CmdRawChecksum CmdUnsafeDestroyRange @@ -155,6 +156,8 @@ func (t CmdType) String() string { return "RawDeleteRange" case CmdRawScan: return "RawScan" + case CmdRawChecksum: + return "RawChecksum" case CmdUnsafeDestroyRange: return "UnsafeDestroyRange" case CmdRegisterLockObserver: @@ -385,6 +388,11 @@ func (req *Request) RawCompareAndSwap() *kvrpcpb.RawCASRequest { return req.Req.(*kvrpcpb.RawCASRequest) } +// RawChecksum returns RawChecksumRequest in request. +func (req *Request) RawChecksum() *kvrpcpb.RawChecksumRequest { + return req.Req.(*kvrpcpb.RawChecksumRequest) +} + // RegisterLockObserver returns RegisterLockObserverRequest in request. func (req *Request) RegisterLockObserver() *kvrpcpb.RegisterLockObserverRequest { return req.Req.(*kvrpcpb.RegisterLockObserverRequest) @@ -705,6 +713,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { req.RawGetKeyTTL().Context = ctx case CmdRawCompareAndSwap: req.RawCompareAndSwap().Context = ctx + case CmdRawChecksum: + req.RawChecksum().Context = ctx case CmdRegisterLockObserver: req.RegisterLockObserver().Context = ctx case CmdCheckLockObserver: @@ -843,6 +853,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) { p = &kvrpcpb.RawCASResponse{ RegionError: e, } + case CmdRawChecksum: + p = &kvrpcpb.RawChecksumResponse{ + RegionError: e, + } case CmdCop: p = &coprocessor.Response{ RegionError: e, @@ -959,6 +973,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp resp.Resp, err = client.RawGetKeyTTL(ctx, req.RawGetKeyTTL()) case CmdRawCompareAndSwap: resp.Resp, err = client.RawCompareAndSwap(ctx, req.RawCompareAndSwap()) + case CmdRawChecksum: + resp.Resp, err = client.RawChecksum(ctx, req.RawChecksum()) case CmdRegisterLockObserver: resp.Resp, err = client.RegisterLockObserver(ctx, req.RegisterLockObserver()) case CmdCheckLockObserver: From d490c632fa7609ccbb8a6b0d40b026b5374b50f8 Mon Sep 17 00:00:00 2001 From: haojinming Date: Thu, 2 Jun 2022 16:30:58 +0800 Subject: [PATCH 2/4] fix golang lint Signed-off-by: haojinming --- internal/mockstore/mocktikv/mvcc_leveldb.go | 2 +- rawkv/rawkv_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/mockstore/mocktikv/mvcc_leveldb.go b/internal/mockstore/mocktikv/mvcc_leveldb.go index 05f5dd0e03..aa17fde3e1 100644 --- a/internal/mockstore/mocktikv/mvcc_leveldb.go +++ b/internal/mockstore/mocktikv/mvcc_leveldb.go @@ -1811,7 +1811,7 @@ func (mvcc *MVCCLevelDB) RawChecksum(cf string, startKey, endKey []byte) (uint64 digest.Write(key) digest.Write(value) crc64Xor ^= digest.Sum64() - totalKvs += 1 + totalKvs++ totalBytes += (uint64)(len(key) + len(value)) } return crc64Xor, totalKvs, totalBytes, nil diff --git a/rawkv/rawkv_test.go b/rawkv/rawkv_test.go index 64731aee57..7c7f678b73 100644 --- a/rawkv/rawkv_test.go +++ b/rawkv/rawkv_test.go @@ -609,7 +609,7 @@ func (s *testRawkvSuite) TestRawChecksum() { digest.Write(key) digest.Write(values[i]) expectCrc64Xor ^= digest.Sum64() - expectTotalKvs += 1 + expectTotalKvs++ expectTotalBytes += (uint64)(len(key) + len(values[i])) } From 471e579944d8b94c227e8b5600d22c33a9d7944d Mon Sep 17 00:00:00 2001 From: haojinming Date: Fri, 10 Jun 2022 11:36:52 +0800 Subject: [PATCH 3/4] fix review comments Signed-off-by: haojinming --- rawkv/rawkv.go | 26 ++++++++++++++++---------- rawkv/rawkv_test.go | 8 ++++---- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/rawkv/rawkv.go b/rawkv/rawkv.go index 116c9bc1e9..10342fe6c9 100644 --- a/rawkv/rawkv.go +++ b/rawkv/rawkv.go @@ -74,6 +74,15 @@ type rawOptions struct { KeyOnly bool } +type RawChecksum struct { + // Crc64Xor is the checksum result with crc64 algorithm + Crc64Xor uint64 + // TotalKvs is the total number of kvpairs + TotalKvs uint64 + // TotalBytes is the total bytes of kvpairs, including prefix in APIV2 + TotalBytes uint64 +} + // RawOption represents possible options that can be cotrolled by the user // to tweak the API behavior. // @@ -508,16 +517,13 @@ func (c *Client) ReverseScan(ctx context.Context, startKey, endKey []byte, limit // If endKey is empty, it means unbounded. // If you want to exclude the startKey or include the endKey, push a '\0' to the key. For example, to scan // (startKey, endKey], you can write: -// `Scan(ctx, push(startKey, '\0'), push(endKey, '\0'), limit)`. +// `Checksum(ctx, push(startKey, '\0'), push(endKey, '\0'))`. func (c *Client) Checksum(ctx context.Context, startKey, endKey []byte, options ...RawOption, -) (crc64Xor, totalKvs, totalBytes uint64, err error) { +) (check RawChecksum, err error) { start := time.Now() defer func() { metrics.RawkvCmdHistogramWithRawChecksum.Observe(time.Since(start).Seconds()) }() - crc64Xor = 0 - totalKvs = 0 - totalBytes = 0 for len(endKey) == 0 || bytes.Compare(startKey, endKey) < 0 { req := tikvrpc.NewRequest(tikvrpc.CmdRawChecksum, &kvrpcpb.RawChecksumRequest{ Algorithm: kvrpcpb.ChecksumAlgorithm_Crc64_Xor, @@ -528,15 +534,15 @@ func (c *Client) Checksum(ctx context.Context, startKey, endKey []byte, options }) resp, loc, err := c.sendReq(ctx, startKey, req, false) if err != nil { - return 0, 0, 0, err + return RawChecksum{0, 0, 0}, err } if resp.Resp == nil { - return 0, 0, 0, errors.WithStack(tikverr.ErrBodyMissing) + return RawChecksum{0, 0, 0}, errors.WithStack(tikverr.ErrBodyMissing) } cmdResp := resp.Resp.(*kvrpcpb.RawChecksumResponse) - crc64Xor ^= cmdResp.GetChecksum() - totalKvs += cmdResp.GetTotalKvs() - totalBytes += cmdResp.GetTotalBytes() + check.Crc64Xor ^= cmdResp.GetChecksum() + check.TotalKvs += cmdResp.GetTotalKvs() + check.TotalBytes += cmdResp.GetTotalBytes() startKey = loc.EndKey if len(startKey) == 0 { break diff --git a/rawkv/rawkv_test.go b/rawkv/rawkv_test.go index 7c7f678b73..f5eefaa2dc 100644 --- a/rawkv/rawkv_test.go +++ b/rawkv/rawkv_test.go @@ -619,9 +619,9 @@ func (s *testRawkvSuite) TestRawChecksum() { // test Checksum startKey, endKey := []byte("db"), []byte(nil) - crc64Xor, totalKvs, totalBytes, err := client.Checksum(context.Background(), startKey, endKey, SetColumnFamily(cf)) + check, err := client.Checksum(context.Background(), startKey, endKey, SetColumnFamily(cf)) s.Nil(err) - s.Equal(expectCrc64Xor, crc64Xor) - s.Equal(expectTotalKvs, totalKvs) - s.Equal(expectTotalBytes, totalBytes) + s.Equal(expectCrc64Xor, check.Crc64Xor) + s.Equal(expectTotalKvs, check.TotalKvs) + s.Equal(expectTotalBytes, check.TotalBytes) } From 01e1e672726bba19f3f6033b6850e28278213c57 Mon Sep 17 00:00:00 2001 From: haojinming Date: Tue, 21 Jun 2022 14:36:09 +0800 Subject: [PATCH 4/4] fix glangci-lint Signed-off-by: haojinming --- rawkv/rawkv.go | 1 + 1 file changed, 1 insertion(+) diff --git a/rawkv/rawkv.go b/rawkv/rawkv.go index 10342fe6c9..26117c6fd4 100644 --- a/rawkv/rawkv.go +++ b/rawkv/rawkv.go @@ -74,6 +74,7 @@ type rawOptions struct { KeyOnly bool } +// RawChecksum represents the checksum result of raw kv pairs in TiKV cluster. type RawChecksum struct { // Crc64Xor is the checksum result with crc64 algorithm Crc64Xor uint64