From b68cde0854f73b50fbb0fd6a05f01b5d3274ff5b Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Mon, 20 Mar 2023 20:07:54 +0800 Subject: [PATCH 1/3] refine keyspace request encoding Signed-off-by: iosmanthus --- internal/apicodec/codec.go | 12 +++-- internal/apicodec/codec_v1.go | 2 +- internal/apicodec/codec_v2.go | 79 ++++++++++++++---------------- internal/apicodec/codec_v2_test.go | 23 +++++++++ 4 files changed, 70 insertions(+), 46 deletions(-) diff --git a/internal/apicodec/codec.go b/internal/apicodec/codec.go index be1a1365f3..8e80b1584e 100644 --- a/internal/apicodec/codec.go +++ b/internal/apicodec/codec.go @@ -86,7 +86,7 @@ func DecodeKey(encoded []byte, version kvrpcpb.APIVersion) ([]byte, []byte, erro return nil, nil, errors.Errorf("unsupported api version %s", version.String()) } -func attachAPICtx(c Codec, req *tikvrpc.Request) (*tikvrpc.Request, error) { +func attachAPICtx(c Codec, req *tikvrpc.Request) *tikvrpc.Request { // Shallow copy the request to avoid concurrent modification. r := *req @@ -97,9 +97,13 @@ func attachAPICtx(c Codec, req *tikvrpc.Request) (*tikvrpc.Request, error) { switch r.Type { case tikvrpc.CmdMPPTask: mpp := *r.DispatchMPPTask() - mpp.Meta.KeyspaceId = ctx.KeyspaceId - mpp.Meta.ApiVersion = ctx.ApiVersion + // Shallow copy the request to avoid concurrent modification. + meta := *mpp.Meta + meta.KeyspaceId = ctx.KeyspaceId + meta.ApiVersion = ctx.ApiVersion + mpp.Meta = &meta r.Req = &mpp + case tikvrpc.CmdCompact: compact := *r.Compact() compact.KeyspaceId = ctx.KeyspaceId @@ -109,5 +113,5 @@ func attachAPICtx(c Codec, req *tikvrpc.Request) (*tikvrpc.Request, error) { tikvrpc.AttachContext(&r, ctx) - return &r, nil + return &r } diff --git a/internal/apicodec/codec_v1.go b/internal/apicodec/codec_v1.go index d2434c2cd6..4549213d16 100644 --- a/internal/apicodec/codec_v1.go +++ b/internal/apicodec/codec_v1.go @@ -35,7 +35,7 @@ func (c *codecV1) GetKeyspaceID() KeyspaceID { } func (c *codecV1) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error) { - return attachAPICtx(c, req) + return attachAPICtx(c, req), nil } func (c *codecV1) DecodeResponse(req *tikvrpc.Request, resp *tikvrpc.Response) (*tikvrpc.Response, error) { diff --git a/internal/apicodec/codec_v2.go b/internal/apicodec/codec_v2.go index 48005aa920..543aa6415f 100644 --- a/internal/apicodec/codec_v2.go +++ b/internal/apicodec/codec_v2.go @@ -109,170 +109,167 @@ func (c *codecV2) GetAPIVersion() kvrpcpb.APIVersion { // EncodeRequest encodes with the given Codec. // NOTE: req is reused on retry. MUST encode on cloned request, other than overwrite the original. func (c *codecV2) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error) { - newReq, err := attachAPICtx(c, req) - if err != nil { - return nil, err - } + // attachAPICtx will shallow copy the request. + req = attachAPICtx(c, req) // Encode requests based on command type. switch req.Type { // Transaction Request Types. case tikvrpc.CmdGet: r := *req.Get() r.Key = c.EncodeKey(r.Key) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdScan: r := *req.Scan() r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, r.Reverse) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdPrewrite: r := *req.Prewrite() r.Mutations = c.encodeMutations(r.Mutations) r.PrimaryLock = c.EncodeKey(r.PrimaryLock) r.Secondaries = c.encodeKeys(r.Secondaries) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdCommit: r := *req.Commit() r.Keys = c.encodeKeys(r.Keys) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdCleanup: r := *req.Cleanup() r.Key = c.EncodeKey(r.Key) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdBatchGet: r := *req.BatchGet() r.Keys = c.encodeKeys(r.Keys) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdBatchRollback: r := *req.BatchRollback() r.Keys = c.encodeKeys(r.Keys) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdScanLock: r := *req.ScanLock() r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdResolveLock: r := *req.ResolveLock() r.Keys = c.encodeKeys(r.Keys) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdGC: // TODO: Deprecate Central GC Mode. case tikvrpc.CmdDeleteRange: r := *req.DeleteRange() r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdPessimisticLock: r := *req.PessimisticLock() r.Mutations = c.encodeMutations(r.Mutations) r.PrimaryLock = c.EncodeKey(r.PrimaryLock) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdPessimisticRollback: r := *req.PessimisticRollback() r.Keys = c.encodeKeys(r.Keys) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdTxnHeartBeat: r := *req.TxnHeartBeat() r.PrimaryLock = c.EncodeKey(r.PrimaryLock) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdCheckTxnStatus: r := *req.CheckTxnStatus() r.PrimaryKey = c.EncodeKey(r.PrimaryKey) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdCheckSecondaryLocks: r := *req.CheckSecondaryLocks() r.Keys = c.encodeKeys(r.Keys) - newReq.Req = &r + req.Req = &r // Raw Request Types. case tikvrpc.CmdRawGet: r := *req.RawGet() r.Key = c.EncodeKey(r.Key) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdRawBatchGet: r := *req.RawBatchGet() r.Keys = c.encodeKeys(r.Keys) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdRawPut: r := *req.RawPut() r.Key = c.EncodeKey(r.Key) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdRawBatchPut: r := *req.RawBatchPut() r.Pairs = c.encodeParis(r.Pairs) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdRawDelete: r := *req.RawDelete() r.Key = c.EncodeKey(r.Key) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdRawBatchDelete: r := *req.RawBatchDelete() r.Keys = c.encodeKeys(r.Keys) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdRawDeleteRange: r := *req.RawDeleteRange() r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdRawScan: r := *req.RawScan() r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, r.Reverse) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdGetKeyTTL: r := *req.RawGetKeyTTL() r.Key = c.EncodeKey(r.Key) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdRawCompareAndSwap: r := *req.RawCompareAndSwap() r.Key = c.EncodeKey(r.Key) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdRawChecksum: r := *req.RawChecksum() r.Ranges = c.encodeKeyRanges(r.Ranges) - newReq.Req = &r + req.Req = &r // TiFlash Requests case tikvrpc.CmdBatchCop: r := *req.BatchCop() r.Regions = c.encodeRegionInfos(r.Regions) r.TableRegions = c.encodeTableRegions(r.TableRegions) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdMPPTask: r := *req.DispatchMPPTask() - r.Meta.KeyspaceId = uint32(c.GetKeyspaceID()) r.Regions = c.encodeRegionInfos(r.Regions) r.TableRegions = c.encodeTableRegions(r.TableRegions) - newReq.Req = &r + req.Req = &r // Other requests. case tikvrpc.CmdUnsafeDestroyRange: r := *req.UnsafeDestroyRange() r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdPhysicalScanLock: r := *req.PhysicalScanLock() r.StartKey = c.EncodeKey(r.StartKey) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdStoreSafeTS: r := *req.StoreSafeTS() r.KeyRange = c.encodeKeyRange(r.KeyRange) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdCop: r := *req.Cop() r.Ranges = c.encodeCopRanges(r.Ranges) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdCopStream: r := *req.Cop() r.Ranges = c.encodeCopRanges(r.Ranges) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdMvccGetByKey: r := *req.MvccGetByKey() r.Key = c.EncodeRegionKey(r.Key) - newReq.Req = &r + req.Req = &r case tikvrpc.CmdSplitRegion: r := *req.SplitRegion() r.SplitKeys = c.encodeKeys(r.SplitKeys) - newReq.Req = &r + req.Req = &r } - return newReq, nil + return req, nil } // DecodeResponse decode the resp with the given codec. diff --git a/internal/apicodec/codec_v2_test.go b/internal/apicodec/codec_v2_test.go index 104bc00950..a21470fb01 100644 --- a/internal/apicodec/codec_v2_test.go +++ b/internal/apicodec/codec_v2_test.go @@ -1,6 +1,8 @@ package apicodec import ( + "github.com/pingcap/kvproto/pkg/coprocessor" + "github.com/pingcap/kvproto/pkg/mpp" "math" "testing" @@ -270,3 +272,24 @@ func (suite *testCodecV2Suite) TestDecodeEpochNotMatch() { func (suite *testCodecV2Suite) TestGetKeyspaceID() { suite.Equal(KeyspaceID(testKeyspaceID), suite.codec.GetKeyspaceID()) } + +func (suite *testCodecV2Suite) TestEncodeMPPRequest() { + req, err := suite.codec.EncodeRequest(&tikvrpc.Request{ + Type: tikvrpc.CmdMPPTask, + Req: &mpp.DispatchTaskRequest{ + Meta: &mpp.TaskMeta{}, + Regions: []*coprocessor.RegionInfo{ + { + Ranges: []*coprocessor.KeyRange{{Start: []byte("a"), End: []byte("b")}}, + }, + }, + }, + }) + suite.Nil(err) + task, ok := req.Req.(*mpp.DispatchTaskRequest) + suite.True(ok) + suite.Equal(task.Meta.KeyspaceId, testKeyspaceID) + suite.Equal(task.Meta.ApiVersion, kvrpcpb.APIVersion_V2) + suite.Equal(task.Regions[0].Ranges[0].Start, suite.codec.EncodeKey([]byte("a"))) + suite.Equal(task.Regions[0].Ranges[0].End, suite.codec.EncodeKey([]byte("b"))) +} From ed75a6ddf61dbf8a643d3b5ea7cb667e882ca30c Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Mon, 20 Mar 2023 20:09:47 +0800 Subject: [PATCH 2/3] goimports -w ./ Signed-off-by: iosmanthus --- internal/apicodec/codec_v2_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/apicodec/codec_v2_test.go b/internal/apicodec/codec_v2_test.go index a21470fb01..e37ca06e89 100644 --- a/internal/apicodec/codec_v2_test.go +++ b/internal/apicodec/codec_v2_test.go @@ -1,14 +1,14 @@ package apicodec import ( - "github.com/pingcap/kvproto/pkg/coprocessor" - "github.com/pingcap/kvproto/pkg/mpp" "math" "testing" + "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/mpp" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/tikvrpc" ) From b35d399bc43d947acbdfc22fad52a5a4ff6a8b29 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Tue, 21 Mar 2023 14:32:11 +0800 Subject: [PATCH 3/3] address comments from yongman Signed-off-by: iosmanthus --- internal/apicodec/codec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/apicodec/codec.go b/internal/apicodec/codec.go index 8e80b1584e..7b6d73acb4 100644 --- a/internal/apicodec/codec.go +++ b/internal/apicodec/codec.go @@ -97,7 +97,7 @@ func attachAPICtx(c Codec, req *tikvrpc.Request) *tikvrpc.Request { switch r.Type { case tikvrpc.CmdMPPTask: mpp := *r.DispatchMPPTask() - // Shallow copy the request to avoid concurrent modification. + // Shallow copy the meta to avoid concurrent modification. meta := *mpp.Meta meta.KeyspaceId = ctx.KeyspaceId meta.ApiVersion = ctx.ApiVersion