Skip to content

Commit

Permalink
[RawKV] support checksum (#519)
Browse files Browse the repository at this point in the history
Signed-off-by: haojinming <jinming.hao@pingcap.com>

Co-authored-by: iosmanthus <dengliming@pingcap.com>
  • Loading branch information
haojinming and iosmanthus authored Jun 22, 2022
1 parent 98a4e27 commit 681fb6e
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 0 deletions.
1 change: 1 addition & 0 deletions internal/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ type RawKV interface {
RawBatchDelete(cf string, keys [][]byte)
RawDeleteRange(cf string, startKey, endKey []byte)
RawCompareAndSwap(cf string, key, expectedValue, newvalue []byte) ([]byte, bool, error)
RawChecksum(cf string, startKey, endKey []byte) (uint64, uint64, uint64, error)
}

// MVCCDebugger is for debugging.
Expand Down
39 changes: 39 additions & 0 deletions internal/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ package mocktikv

import (
"bytes"
"hash/crc64"
"math"
"sync"

Expand Down Expand Up @@ -1778,6 +1779,44 @@ func (mvcc *MVCCLevelDB) RawReverseScan(cf string, startKey, endKey []byte, limi
return pairs
}

// RawChecksum implements the RawKV interface.
func (mvcc *MVCCLevelDB) RawChecksum(cf string, startKey, endKey []byte) (uint64, uint64, uint64, error) {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()

db := mvcc.getDB(cf)
if db == nil {
return 0, 0, 0, nil
}

iter := db.NewIterator(&util.Range{
Start: startKey,
}, nil)

crc64Xor := uint64(0)
totalKvs := uint64(0)
totalBytes := uint64(0)
digest := crc64.New(crc64.MakeTable(crc64.ECMA))
for iter.Next() {
key := iter.Key()
value := iter.Value()
err := iter.Error()
if err != nil {
return 0, 0, 0, err
}
if len(endKey) > 0 && bytes.Compare(key, endKey) >= 0 {
break
}
digest.Reset()
digest.Write(key)
digest.Write(value)
crc64Xor ^= digest.Sum64()
totalKvs++
totalBytes += (uint64)(len(key) + len(value))
}
return crc64Xor, totalKvs, totalBytes, nil
}

// RawDeleteRange implements the RawKV interface.
func (mvcc *MVCCLevelDB) RawDeleteRange(cf string, startKey, endKey []byte) {
tikverr.Log(mvcc.doRawDeleteRange(cf, startKey, endKey))
Expand Down
49 changes: 49 additions & 0 deletions internal/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,48 @@ func (h kvHandler) handleKvRawScan(req *kvrpcpb.RawScanRequest) *kvrpcpb.RawScan
}
}

func (h kvHandler) handleKvRawChecksum(req *kvrpcpb.RawChecksumRequest) *kvrpcpb.RawChecksumResponse {
rawKV, ok := h.mvccStore.(RawKV)
if !ok {
errStr := "not implemented"
return &kvrpcpb.RawChecksumResponse{
RegionError: &errorpb.Error{
Message: errStr,
},
}
}

crc64Xor := uint64(0)
totalKvs := uint64(0)
totalBytes := uint64(0)
for _, r := range req.Ranges {
upperBound := h.endKey
if len(r.EndKey) > 0 && (len(upperBound) == 0 || bytes.Compare(r.EndKey, upperBound) < 0) {
upperBound = r.EndKey
}
rangeCrc64Xor, rangeKvs, rangeBytes, err := rawKV.RawChecksum(
"CF_DEFAULT",
r.StartKey,
upperBound,
)
if err != nil {
return &kvrpcpb.RawChecksumResponse{
RegionError: &errorpb.Error{
Message: err.Error(),
},
}
}
crc64Xor ^= rangeCrc64Xor
totalKvs += rangeKvs
totalBytes += rangeBytes
}
return &kvrpcpb.RawChecksumResponse{
Checksum: crc64Xor,
TotalKvs: totalKvs,
TotalBytes: totalBytes,
}
}

func (h kvHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.SplitRegionResponse {
keys := req.GetSplitKeys()
resp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)+1)}
Expand Down Expand Up @@ -933,6 +975,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
return resp, nil
}
resp.Resp = kvHandler{session}.HandleKvRawCompareAndSwap(r)
case tikvrpc.CmdRawChecksum:
r := req.RawChecksum()
if err := session.checkRequest(reqCtx, r.Size()); err != nil {
resp.Resp = &kvrpcpb.RawScanResponse{RegionError: err}
return resp, nil
}
resp.Resp = kvHandler{session}.handleKvRawChecksum(r)
case tikvrpc.CmdUnsafeDestroyRange:
panic("unimplemented")
case tikvrpc.CmdRegisterLockObserver:
Expand Down
2 changes: 2 additions & 0 deletions metrics/shortcuts.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var (
RawkvCmdHistogramWithRawReversScan prometheus.Observer
RawkvSizeHistogramWithKey prometheus.Observer
RawkvSizeHistogramWithValue prometheus.Observer
RawkvCmdHistogramWithRawChecksum prometheus.Observer

BackoffHistogramRPC prometheus.Observer
BackoffHistogramLock prometheus.Observer
Expand Down Expand Up @@ -152,6 +153,7 @@ func initShortcuts() {
RawkvCmdHistogramWithRawReversScan = TiKVRawkvCmdHistogram.WithLabelValues("raw_reverse_scan")
RawkvSizeHistogramWithKey = TiKVRawkvSizeHistogram.WithLabelValues("key")
RawkvSizeHistogramWithValue = TiKVRawkvSizeHistogram.WithLabelValues("value")
RawkvCmdHistogramWithRawChecksum = TiKVRawkvSizeHistogram.WithLabelValues("raw_checksum")

BackoffHistogramRPC = TiKVBackoffHistogram.WithLabelValues("tikvRPC")
BackoffHistogramLock = TiKVBackoffHistogram.WithLabelValues("txnLock")
Expand Down
48 changes: 48 additions & 0 deletions rawkv/rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ type rawOptions struct {
KeyOnly bool
}

// RawChecksum represents the checksum result of raw kv pairs in TiKV cluster.
type RawChecksum struct {
// Crc64Xor is the checksum result with crc64 algorithm
Crc64Xor uint64
// TotalKvs is the total number of kvpairs
TotalKvs uint64
// TotalBytes is the total bytes of kvpairs, including prefix in APIV2
TotalBytes uint64
}

// RawOption represents possible options that can be cotrolled by the user
// to tweak the API behavior.
//
Expand Down Expand Up @@ -504,6 +514,44 @@ func (c *Client) ReverseScan(ctx context.Context, startKey, endKey []byte, limit
return
}

// Checksum do checksum of continuous kv pairs in range [startKey, endKey).
// If endKey is empty, it means unbounded.
// If you want to exclude the startKey or include the endKey, push a '\0' to the key. For example, to scan
// (startKey, endKey], you can write:
// `Checksum(ctx, push(startKey, '\0'), push(endKey, '\0'))`.
func (c *Client) Checksum(ctx context.Context, startKey, endKey []byte, options ...RawOption,
) (check RawChecksum, err error) {

start := time.Now()
defer func() { metrics.RawkvCmdHistogramWithRawChecksum.Observe(time.Since(start).Seconds()) }()

for len(endKey) == 0 || bytes.Compare(startKey, endKey) < 0 {
req := tikvrpc.NewRequest(tikvrpc.CmdRawChecksum, &kvrpcpb.RawChecksumRequest{
Algorithm: kvrpcpb.ChecksumAlgorithm_Crc64_Xor,
Ranges: []*kvrpcpb.KeyRange{{
StartKey: startKey,
EndKey: endKey,
}},
})
resp, loc, err := c.sendReq(ctx, startKey, req, false)
if err != nil {
return RawChecksum{0, 0, 0}, err
}
if resp.Resp == nil {
return RawChecksum{0, 0, 0}, errors.WithStack(tikverr.ErrBodyMissing)
}
cmdResp := resp.Resp.(*kvrpcpb.RawChecksumResponse)
check.Crc64Xor ^= cmdResp.GetChecksum()
check.TotalKvs += cmdResp.GetTotalKvs()
check.TotalBytes += cmdResp.GetTotalBytes()
startKey = loc.EndKey
if len(startKey) == 0 {
break
}
}
return
}

// CompareAndSwap results in an atomic compare-and-set operation for the given key while SetAtomicForCAS(true)
// If the value retrieved is equal to previousValue, newValue is written.
// It returns the previous value and whether the value is successfully swapped.
Expand Down
54 changes: 54 additions & 0 deletions rawkv/rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"bytes"
"context"
"fmt"
"hash/crc64"
"testing"

"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -571,3 +572,56 @@ func (s *testRawkvSuite) TestCompareAndSwap() {
s.Nil(err)
s.Equal(string(v), string(newValue))
}

func (s *testRawkvSuite) TestRawChecksum() {
mvccStore := mocktikv.MustNewMVCCStore()
defer mvccStore.Close()

client := &Client{
clusterID: 0,
regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)),
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil),
}
defer client.Close()

cf := "CF_DEFAULT"
paris := map[string]string{
"db": "TiDB",
"key2": "value2",
"key1": "value1",
"key4": "value4",
"key3": "value3",
"kv": "TiKV",
}
keys := make([]key, 0)
values := make([]value, 0)
for k, v := range paris {
keys = append(keys, []byte(k))
values = append(values, []byte(v))
}

expectCrc64Xor := uint64(0)
expectTotalKvs := uint64(0)
expectTotalBytes := uint64(0)
digest := crc64.New(crc64.MakeTable(crc64.ECMA))
for i, key := range keys {
digest.Reset()
digest.Write(key)
digest.Write(values[i])
expectCrc64Xor ^= digest.Sum64()
expectTotalKvs++
expectTotalBytes += (uint64)(len(key) + len(values[i]))
}

// BatchPut
err := client.BatchPut(context.Background(), keys, values, SetColumnFamily(cf))
s.Nil(err)

// test Checksum
startKey, endKey := []byte("db"), []byte(nil)
check, err := client.Checksum(context.Background(), startKey, endKey, SetColumnFamily(cf))
s.Nil(err)
s.Equal(expectCrc64Xor, check.Crc64Xor)
s.Equal(expectTotalKvs, check.TotalKvs)
s.Equal(expectTotalBytes, check.TotalBytes)
}
16 changes: 16 additions & 0 deletions tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ const (
CmdRawScan
CmdGetKeyTTL
CmdRawCompareAndSwap
CmdRawChecksum

CmdUnsafeDestroyRange

Expand Down Expand Up @@ -156,6 +157,8 @@ func (t CmdType) String() string {
return "RawDeleteRange"
case CmdRawScan:
return "RawScan"
case CmdRawChecksum:
return "RawChecksum"
case CmdUnsafeDestroyRange:
return "UnsafeDestroyRange"
case CmdRegisterLockObserver:
Expand Down Expand Up @@ -388,6 +391,11 @@ func (req *Request) RawCompareAndSwap() *kvrpcpb.RawCASRequest {
return req.Req.(*kvrpcpb.RawCASRequest)
}

// RawChecksum returns RawChecksumRequest in request.
func (req *Request) RawChecksum() *kvrpcpb.RawChecksumRequest {
return req.Req.(*kvrpcpb.RawChecksumRequest)
}

// RegisterLockObserver returns RegisterLockObserverRequest in request.
func (req *Request) RegisterLockObserver() *kvrpcpb.RegisterLockObserverRequest {
return req.Req.(*kvrpcpb.RegisterLockObserverRequest)
Expand Down Expand Up @@ -713,6 +721,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
req.RawGetKeyTTL().Context = ctx
case CmdRawCompareAndSwap:
req.RawCompareAndSwap().Context = ctx
case CmdRawChecksum:
req.RawChecksum().Context = ctx
case CmdRegisterLockObserver:
req.RegisterLockObserver().Context = ctx
case CmdCheckLockObserver:
Expand Down Expand Up @@ -851,6 +861,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) {
p = &kvrpcpb.RawCASResponse{
RegionError: e,
}
case CmdRawChecksum:
p = &kvrpcpb.RawChecksumResponse{
RegionError: e,
}
case CmdCop:
p = &coprocessor.Response{
RegionError: e,
Expand Down Expand Up @@ -967,6 +981,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp
resp.Resp, err = client.RawGetKeyTTL(ctx, req.RawGetKeyTTL())
case CmdRawCompareAndSwap:
resp.Resp, err = client.RawCompareAndSwap(ctx, req.RawCompareAndSwap())
case CmdRawChecksum:
resp.Resp, err = client.RawChecksum(ctx, req.RawChecksum())
case CmdRegisterLockObserver:
resp.Resp, err = client.RegisterLockObserver(ctx, req.RegisterLockObserver())
case CmdCheckLockObserver:
Expand Down

0 comments on commit 681fb6e

Please sign in to comment.