From cb0f30011a7738c7f9320b6d60e600720be50b3f Mon Sep 17 00:00:00 2001 From: crazycs Date: Fri, 21 May 2021 10:29:21 +0800 Subject: [PATCH 01/15] add benchmark test Signed-off-by: crazycs --- go.mod | 2 + go.sum | 2 + tools/check/go.mod | 1 - .../resource_group_tag_test.go | 129 ++++++++++++++++++ 4 files changed, 133 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 82a394d8406de..ee8cf9c8d5e7c 100644 --- a/go.mod +++ b/go.mod @@ -92,3 +92,5 @@ go 1.13 // Fix panic in unit test with go >= 1.14, ref: etcd-io/bbolt#201 https://github.com/etcd-io/bbolt/pull/201 replace go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.5 + +replace github.com/pingcap/tipb => github.com/crazycs520/tipb v0.0.0-20210521015927-2b3fd46a9cb4 diff --git a/go.sum b/go.sum index cf26e5ebe4089..491d73513c554 100644 --- a/go.sum +++ b/go.sum @@ -111,6 +111,8 @@ github.com/corona10/goimagehash v1.0.2/go.mod h1:/l9umBhvcHQXVtQO1V6Gp1yD20STawk github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/crazycs520/tipb v0.0.0-20210521015927-2b3fd46a9cb4 h1:kYQX8FiFN02M17hGg0jcsM1/A3L6jTOtd/HO7i12bms= +github.com/crazycs520/tipb v0.0.0-20210521015927-2b3fd46a9cb4/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cznic/golex v0.0.0-20181122101858-9c343928389c/go.mod h1:+bmmJDNmKlhWNG+gwWCkaBoTy39Fs+bzRxVBzoTQbIc= diff --git a/tools/check/go.mod b/tools/check/go.mod index 8754267765791..e9ad86eaa4c5e 100644 --- a/tools/check/go.mod +++ b/tools/check/go.mod @@ -18,7 +18,6 @@ require ( gopkg.in/alecthomas/gometalinter.v3 v3.0.0 // indirect gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20170321130658-9670b87a702e // indirect - gopkg.in/yaml.v2 v2.2.2 // indirect honnef.co/go/tools v0.0.0-20180920025451-e3ad64cb4ed3 ) diff --git a/util/resourcegrouptag/resource_group_tag_test.go b/util/resourcegrouptag/resource_group_tag_test.go index a979b92fce315..6f7786196c15a 100644 --- a/util/resourcegrouptag/resource_group_tag_test.go +++ b/util/resourcegrouptag/resource_group_tag_test.go @@ -14,6 +14,10 @@ package resourcegrouptag import ( + "crypto/sha256" + "errors" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tipb/go-tipb" "math/rand" "testing" @@ -109,3 +113,128 @@ func genRandHex(length int) string { } return string(res) } + +func genRandDigest(str string) []byte { + hasher := sha256.New() + hasher.Write([]byte(str)) + return hasher.Sum(nil) +} + +func (s *testUtilsSuite) TestResourceGroupTagEncodingPB(c *C) { + digest1 := genRandDigest("abc") + digest2 := genRandDigest("abcdefg") + // Test for manualEncode + data := manualEncodeResourceGroupTag(digest1, digest2) + c.Assert(len(data), Equals, 69) + sqlDigest, planDigest, err := manualDecodeResourceGroupTag(data) + c.Assert(err, IsNil) + c.Assert(sqlDigest, DeepEquals, digest1) + c.Assert(planDigest, DeepEquals, digest2) + + // Test for protobuf + resourceTag := &tipb.ResourceGroupTag{ + SqlDigest: digest1, + PlanDigest: digest2, + } + buf, err := resourceTag.Marshal() + c.Assert(err, IsNil) + tag := &tipb.ResourceGroupTag{} + err = tag.Unmarshal(buf) + c.Assert(err, IsNil) + c.Assert(tag.SqlDigest, DeepEquals, digest1) + c.Assert(tag.PlanDigest, DeepEquals, digest2) +} + +func manualEncodeResourceGroupTag(sqlDigest []byte, planDigest []byte) []byte { + buf := make([]byte, 1, len(sqlDigest)+len(planDigest)+8) + buf[0] = 1 // version + if len(sqlDigest) > 0 { + buf = append(buf, 1) // sql digest flag + buf = codec.EncodeVarint(buf, int64(len(sqlDigest))) + buf = append(buf, sqlDigest...) + } + buf = append(buf, 2) // plan digest flag + buf = codec.EncodeVarint(buf, int64(len(planDigest))) + buf = append(buf, planDigest...) + return buf +} + +func manualDecodeResourceGroupTag(buf []byte) (sqlDigest []byte, planDigest []byte, err error) { + if len(buf) == 0 { + return nil, nil, errors.New("invalid") + } + if buf[0] != 1 { + return nil, nil, errors.New("invalid") + } + buf = buf[1:] + var l int64 + for len(buf) > 0 { + flag := buf[0] + buf, l, err = codec.DecodeVarint(buf[1:]) + if err != nil { + return nil, nil, errors.New("invalid") + } + if len(buf) < int(l) { + return nil, nil, errors.New("invalid") + } + data := make([]byte, l) + copy(data, buf[:l]) + buf = buf[l:] + switch flag { + case 1: // sql_digest + sqlDigest = data + case 2: // plan digest + planDigest = data + default: + return nil, nil, errors.New("invalid") + } + } + return +} + +func BenchmarkResourceGroupManualEncode(b *testing.B) { + digest1 := genRandDigest("abc") + digest2 := genRandDigest("abcdefg") + b.ResetTimer() + for i := 0; i < b.N; i++ { + manualEncodeResourceGroupTag(digest1, digest2) + } +} + +func BenchmarkResourceGroupTagPBEncode(b *testing.B) { + digest1 := genRandDigest("abc") + digest2 := genRandDigest("abcdefg") + b.ResetTimer() + for i := 0; i < b.N; i++ { + resourceTag := &tipb.ResourceGroupTag{ + SqlDigest: digest1, + PlanDigest: digest2, + } + resourceTag.Marshal() + } +} + +func BenchmarkResourceGroupTagManualDecode(b *testing.B) { + digest1 := genRandDigest("abc") + digest2 := genRandDigest("abcdefg") + data := manualEncodeResourceGroupTag(digest1, digest2) + b.ResetTimer() + for i := 0; i < b.N; i++ { + manualDecodeResourceGroupTag(data) + } +} + +func BenchmarkResourceGroupTagPBDecode(b *testing.B) { + digest1 := genRandDigest("abc") + digest2 := genRandDigest("abcdefg") + resourceTag := &tipb.ResourceGroupTag{ + SqlDigest: digest1, + PlanDigest: digest2, + } + data, _ := resourceTag.Marshal() + b.ResetTimer() + for i := 0; i < b.N; i++ { + tag := &tipb.ResourceGroupTag{} + tag.Unmarshal(data) + } +} From e434f6de904d1b47e5d5cc4afbb68d3105fb7ea0 Mon Sep 17 00:00:00 2001 From: crazycs Date: Fri, 21 May 2021 10:56:19 +0800 Subject: [PATCH 02/15] init Signed-off-by: crazycs --- .../resource_group_tag_test.go | 42 +++++++++++++++++-- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/util/resourcegrouptag/resource_group_tag_test.go b/util/resourcegrouptag/resource_group_tag_test.go index 6f7786196c15a..a996c72b8d7e6 100644 --- a/util/resourcegrouptag/resource_group_tag_test.go +++ b/util/resourcegrouptag/resource_group_tag_test.go @@ -15,7 +15,9 @@ package resourcegrouptag import ( "crypto/sha256" + "encoding/hex" "errors" + "fmt" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tipb/go-tipb" "math/rand" @@ -131,6 +133,14 @@ func (s *testUtilsSuite) TestResourceGroupTagEncodingPB(c *C) { c.Assert(sqlDigest, DeepEquals, digest1) c.Assert(planDigest, DeepEquals, digest2) + // Test for manualEncode sql_digest only + data = manualEncodeResourceGroupTag(digest1, nil) + c.Assert(len(data), Equals, 35) + sqlDigest, planDigest, err = manualDecodeResourceGroupTag(data) + c.Assert(err, IsNil) + c.Assert(sqlDigest, DeepEquals, digest1) + c.Assert(planDigest, IsNil) + // Test for protobuf resourceTag := &tipb.ResourceGroupTag{ SqlDigest: digest1, @@ -138,11 +148,26 @@ func (s *testUtilsSuite) TestResourceGroupTagEncodingPB(c *C) { } buf, err := resourceTag.Marshal() c.Assert(err, IsNil) + c.Assert(len(buf), Equals, 68) tag := &tipb.ResourceGroupTag{} err = tag.Unmarshal(buf) c.Assert(err, IsNil) c.Assert(tag.SqlDigest, DeepEquals, digest1) c.Assert(tag.PlanDigest, DeepEquals, digest2) + + // Test for protobuf sql_digest only + resourceTag = &tipb.ResourceGroupTag{ + SqlDigest: digest1, + } + buf, err = resourceTag.Marshal() + c.Assert(err, IsNil) + c.Assert(len(buf), Equals, 34) + tag = &tipb.ResourceGroupTag{} + err = tag.Unmarshal(buf) + c.Assert(err, IsNil) + c.Assert(tag.SqlDigest, DeepEquals, digest1) + c.Assert(tag.PlanDigest, IsNil) + } func manualEncodeResourceGroupTag(sqlDigest []byte, planDigest []byte) []byte { @@ -153,9 +178,11 @@ func manualEncodeResourceGroupTag(sqlDigest []byte, planDigest []byte) []byte { buf = codec.EncodeVarint(buf, int64(len(sqlDigest))) buf = append(buf, sqlDigest...) } - buf = append(buf, 2) // plan digest flag - buf = codec.EncodeVarint(buf, int64(len(planDigest))) - buf = append(buf, planDigest...) + if len(planDigest) > 0 { + buf = append(buf, 2) // plan digest flag + buf = codec.EncodeVarint(buf, int64(len(planDigest))) + buf = append(buf, planDigest...) + } return buf } @@ -238,3 +265,12 @@ func BenchmarkResourceGroupTagPBDecode(b *testing.B) { tag.Unmarshal(data) } } + +func BenchmarkHexDecode(b *testing.B) { + digest1 := genRandDigest("abc") + b.ResetTimer() + hash := fmt.Sprintf("%x", digest1) + for i := 0; i < b.N; i++ { + hex.DecodeString(hash) + } +} From 83b7ccd6192632fdff8171a87e3d6e6e7d74493e Mon Sep 17 00:00:00 2001 From: crazycs Date: Fri, 21 May 2021 14:43:13 +0800 Subject: [PATCH 03/15] init Signed-off-by: crazycs --- bindinfo/handle.go | 12 +-- bindinfo/session_handle.go | 6 +- executor/adapter.go | 8 +- executor/executor.go | 3 +- go.mod | 2 + go.sum | 2 + planner/core/cache.go | 3 +- planner/optimize.go | 6 +- session/session.go | 3 +- sessionctx/stmtctx/stmtctx.go | 6 +- util/deadlockhistory/deadlock_history.go | 2 +- util/deadlockhistory/deadlock_history_test.go | 23 +++-- util/resourcegrouptag/resource_group_tag.go | 85 ++++-------------- .../resource_group_tag_test.go | 87 +++++-------------- 14 files changed, 86 insertions(+), 162 deletions(-) diff --git a/bindinfo/handle.go b/bindinfo/handle.go index 6111910395d55..2281af3c88bd3 100644 --- a/bindinfo/handle.go +++ b/bindinfo/handle.go @@ -210,7 +210,7 @@ func (h *BindHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRecor } sqlDigest := parser.DigestNormalized(record.OriginalSQL) - h.setBindRecord(sqlDigest, record) + h.setBindRecord(sqlDigest.String(), record) }() // Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances. @@ -256,7 +256,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord) } record.Db = strings.ToLower(record.Db) - oldRecord := h.GetBindRecord(parser.DigestNormalized(record.OriginalSQL), record.OriginalSQL, record.Db) + oldRecord := h.GetBindRecord(parser.DigestNormalized(record.OriginalSQL).String(), record.OriginalSQL, record.Db) var duplicateBinding *Binding if oldRecord != nil { binding := oldRecord.FindBinding(record.Bindings[0].ID) @@ -294,7 +294,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord) return } - h.appendBindRecord(parser.DigestNormalized(record.OriginalSQL), record) + h.appendBindRecord(parser.DigestNormalized(record.OriginalSQL).String(), record) }() // Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances. @@ -367,7 +367,7 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e if binding != nil { record.Bindings = append(record.Bindings, *binding) } - h.removeBindRecord(parser.DigestNormalized(originalSQL), record) + h.removeBindRecord(parser.DigestNormalized(originalSQL).String(), record) }() // Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances. @@ -515,7 +515,7 @@ func (h *BindHandle) newBindRecord(row chunk.Row) (string, *BindRecord, error) { defer h.sctx.Unlock() h.sctx.GetSessionVars().CurrentDB = bindRecord.Db err := bindRecord.prepareHints(h.sctx.Context) - return hash, bindRecord, err + return hash.String(), bindRecord, err } // setBindRecord sets the BindRecord to the cache, if there already exists a BindRecord, @@ -624,7 +624,7 @@ func (h *BindHandle) CaptureBaselines() { } dbName := utilparser.GetDefaultDB(stmt, bindableStmt.Schema) normalizedSQL, digest := parser.NormalizeDigest(utilparser.RestoreWithDefaultDB(stmt, dbName, bindableStmt.Query)) - if r := h.GetBindRecord(digest, normalizedSQL, dbName); r != nil && r.HasUsingBinding() { + if r := h.GetBindRecord(digest.String(), normalizedSQL, dbName); r != nil && r.HasUsingBinding() { continue } bindSQL := GenerateBindSQL(context.TODO(), stmt, bindableStmt.PlanHint, true, dbName) diff --git a/bindinfo/session_handle.go b/bindinfo/session_handle.go index 2604d5b563f52..6b54aa9118f77 100644 --- a/bindinfo/session_handle.go +++ b/bindinfo/session_handle.go @@ -60,7 +60,7 @@ func (h *SessionHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRe } // update the BindMeta to the cache. - h.appendBindRecord(parser.DigestNormalized(record.OriginalSQL), record) + h.appendBindRecord(parser.DigestNormalized(record.OriginalSQL).String(), record) return nil } @@ -78,14 +78,14 @@ func (h *SessionHandle) DropBindRecord(originalSQL, db string, binding *Binding) } else { newRecord = record } - h.ch.setBindRecord(parser.DigestNormalized(record.OriginalSQL), newRecord) + h.ch.setBindRecord(parser.DigestNormalized(record.OriginalSQL).String(), newRecord) updateMetrics(metrics.ScopeSession, oldRecord, newRecord, false) return nil } // GetBindRecord return the BindMeta of the (normdOrigSQL,db) if BindMeta exist. func (h *SessionHandle) GetBindRecord(normdOrigSQL, db string) *BindRecord { - hash := parser.DigestNormalized(normdOrigSQL) + hash := parser.DigestNormalized(normdOrigSQL).String() bindRecords := h.ch[hash] for _, bindRecord := range bindRecords { if bindRecord.OriginalSQL == normdOrigSQL { diff --git a/executor/adapter.go b/executor/adapter.go index 784696996cb94..f36d822123543 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -923,7 +923,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { slowItems := &variable.SlowQueryLogItems{ TxnTS: txnTS, SQL: sql.String(), - Digest: digest, + Digest: digest.String(), TimeTotal: costTime, TimeParse: sessVars.DurationParse, TimeCompile: sessVars.DurationCompile, @@ -981,7 +981,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { } domain.GetDomain(a.Ctx).LogSlowQuery(&domain.SlowQueryInfo{ SQL: sql.String(), - Digest: digest, + Digest: digest.String(), Start: sessVars.StartTime, Duration: costTime, Detail: sessVars.StmtCtx.GetExecDetails(), @@ -1079,7 +1079,7 @@ func (a *ExecStmt) SummaryStmt(succ bool) { } prevSQL = sessVars.PrevStmt.String() } - sessVars.SetPrevStmtDigest(digest) + sessVars.SetPrevStmtDigest(digest.String()) // No need to encode every time, so encode lazily. planGenerator := func() (string, string) { @@ -1120,7 +1120,7 @@ func (a *ExecStmt) SummaryStmt(succ bool) { Charset: charset, Collation: collation, NormalizedSQL: normalizedSQL, - Digest: digest, + Digest: digest.String(), PrevSQL: prevSQL, PrevSQLDigest: prevSQLDigest, PlanGenerator: planGenerator, diff --git a/executor/executor.go b/executor/executor.go index 2b9b8f0f52954..4b039db365e38 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -16,6 +16,7 @@ package executor import ( "context" "fmt" + "github.com/pingcap/tidb/util/resourcegrouptag" "math" "runtime" "runtime/trace" @@ -64,7 +65,6 @@ import ( "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" - "github.com/pingcap/tidb/util/resourcegrouptag" "go.uber.org/zap" ) @@ -975,6 +975,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.LockCtx { _, sqlDigest := seVars.StmtCtx.SQLDigest() + return &tikvstore.LockCtx{ Killed: &seVars.Killed, ForUpdateTS: seVars.TxnCtx.GetForUpdateTS(), diff --git a/go.mod b/go.mod index ee8cf9c8d5e7c..8d60f8d47602c 100644 --- a/go.mod +++ b/go.mod @@ -94,3 +94,5 @@ go 1.13 replace go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.5 replace github.com/pingcap/tipb => github.com/crazycs520/tipb v0.0.0-20210521015927-2b3fd46a9cb4 + +replace github.com/pingcap/parser => github.com/crazycs520/parser v0.0.0-20210521055836-aac8c026347f diff --git a/go.sum b/go.sum index 491d73513c554..16e37153624e9 100644 --- a/go.sum +++ b/go.sum @@ -111,6 +111,8 @@ github.com/corona10/goimagehash v1.0.2/go.mod h1:/l9umBhvcHQXVtQO1V6Gp1yD20STawk github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/crazycs520/parser v0.0.0-20210521055836-aac8c026347f h1:rysp5oQNInNDCQajyHhskf7hqozsfMS71riVoAwf+H4= +github.com/crazycs520/parser v0.0.0-20210521055836-aac8c026347f/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= github.com/crazycs520/tipb v0.0.0-20210521015927-2b3fd46a9cb4 h1:kYQX8FiFN02M17hGg0jcsM1/A3L6jTOtd/HO7i12bms= github.com/crazycs520/tipb v0.0.0-20210521015927-2b3fd46a9cb4/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= diff --git a/planner/core/cache.go b/planner/core/cache.go index f97c207d189de..44408770abbeb 100644 --- a/planner/core/cache.go +++ b/planner/core/cache.go @@ -14,6 +14,7 @@ package core import ( + "github.com/pingcap/parser" "math" "sync/atomic" "time" @@ -199,7 +200,7 @@ type CachedPrepareStmt struct { Executor interface{} NormalizedSQL string NormalizedPlan string - SQLDigest string + SQLDigest *parser.Digest PlanDigest string ForUpdateRead bool } diff --git a/planner/optimize.go b/planner/optimize.go index ec9bfef67d0a7..c667d6b124ef9 100644 --- a/planner/optimize.go +++ b/planner/optimize.go @@ -306,7 +306,7 @@ func extractSelectAndNormalizeDigest(stmtNode ast.StmtNode, specifiledDB string) normalizeSQL := parser.Normalize(utilparser.RestoreWithDefaultDB(x.Stmt, specifiledDB, x.Text())) normalizeSQL = plannercore.EraseLastSemicolonInSQL(normalizeSQL) hash := parser.DigestNormalized(normalizeSQL) - return x.Stmt, normalizeSQL, hash, nil + return x.Stmt, normalizeSQL, hash.String(), nil case *ast.SetOprStmt: plannercore.EraseLastSemicolon(x) var normalizeExplainSQL string @@ -322,7 +322,7 @@ func extractSelectAndNormalizeDigest(stmtNode ast.StmtNode, specifiledDB string) } normalizeSQL := normalizeExplainSQL[idx:] hash := parser.DigestNormalized(normalizeSQL) - return x.Stmt, normalizeSQL, hash, nil + return x.Stmt, normalizeSQL, hash.String(), nil } case *ast.SelectStmt, *ast.SetOprStmt, *ast.DeleteStmt, *ast.UpdateStmt, *ast.InsertStmt: plannercore.EraseLastSemicolon(x) @@ -335,7 +335,7 @@ func extractSelectAndNormalizeDigest(stmtNode ast.StmtNode, specifiledDB string) return x, "", "", nil } normalizedSQL, hash := parser.NormalizeDigest(utilparser.RestoreWithDefaultDB(x, specifiledDB, x.Text())) - return x, normalizedSQL, hash, nil + return x, normalizedSQL, hash.String(), nil } return nil, "", "", nil } diff --git a/session/session.go b/session/session.go index 2874330a02fd3..af2a384d25f7f 100644 --- a/session/session.go +++ b/session/session.go @@ -1226,7 +1226,8 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu if oldPi != nil && oldPi.Info == pi.Info { pi.Time = oldPi.Time } - _, pi.Digest = s.sessionVars.StmtCtx.SQLDigest() + _, digest := s.sessionVars.StmtCtx.SQLDigest() + pi.Digest = digest.String() // DO NOT reset the currentPlan to nil until this query finishes execution, otherwise reentrant calls // of SetProcessInfo would override Plan and PlanExplainRows to nil. if command == mysql.ComSleep { diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index d8a75aec48610..18318d562435d 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -145,7 +145,7 @@ type StatementContext struct { digestMemo struct { sync.Once normalized string - digest string + digest *parser.Digest } // planNormalized use for cache the normalized plan, avoid duplicate builds. planNormalized string @@ -229,7 +229,7 @@ func (sc *StatementContext) ResetStmtCache() { // SQLDigest gets normalized and digest for provided sql. // it will cache result after first calling. -func (sc *StatementContext) SQLDigest() (normalized, sqlDigest string) { +func (sc *StatementContext) SQLDigest() (normalized string, sqlDigest *parser.Digest) { sc.digestMemo.Do(func() { sc.digestMemo.normalized, sc.digestMemo.digest = parser.NormalizeDigest(sc.OriginalSQL) }) @@ -237,7 +237,7 @@ func (sc *StatementContext) SQLDigest() (normalized, sqlDigest string) { } // InitSQLDigest sets the normalized and digest for sql. -func (sc *StatementContext) InitSQLDigest(normalized, digest string) { +func (sc *StatementContext) InitSQLDigest(normalized string, digest *parser.Digest) { sc.digestMemo.Do(func() { sc.digestMemo.normalized, sc.digestMemo.digest = normalized, digest }) diff --git a/util/deadlockhistory/deadlock_history.go b/util/deadlockhistory/deadlock_history.go index ddb78067ffe7c..c219442cf5bf1 100644 --- a/util/deadlockhistory/deadlock_history.go +++ b/util/deadlockhistory/deadlock_history.go @@ -183,7 +183,7 @@ func ErrDeadlockToDeadlockRecord(dl *tikverr.ErrDeadlock) *DeadlockRecord { } waitChain = append(waitChain, WaitChainItem{ TryLockTxn: rawItem.Txn, - SQLDigest: sqlDigest, + SQLDigest: hex.EncodeToString(sqlDigest), Key: rawItem.Key, AllSQLs: nil, TxnHoldingLock: rawItem.WaitForTxn, diff --git a/util/deadlockhistory/deadlock_history_test.go b/util/deadlockhistory/deadlock_history_test.go index 35cbb6c8513cd..3374f8620eb15 100644 --- a/util/deadlockhistory/deadlock_history_test.go +++ b/util/deadlockhistory/deadlock_history_test.go @@ -14,6 +14,9 @@ package deadlockhistory import ( + "crypto/sha256" + "github.com/pingcap/parser" + "github.com/pingcap/tipb/go-tipb" "testing" "time" @@ -22,7 +25,6 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/resourcegrouptag" ) type testDeadlockHistorySuite struct{} @@ -227,7 +229,18 @@ func (s *testDeadlockHistorySuite) TestGetDatum(c *C) { c.Assert(res[3][7].GetValue(), Equals, uint64(201)) // TRX_HOLDING_LOCK } +func genDigest(str string) *parser.Digest { + hasher := sha256.New() + hasher.Write([]byte(str)) + return parser.NewDigest(hasher.Sum(nil)) +} + func (s *testDeadlockHistorySuite) TestErrDeadlockToDeadlockRecord(c *C) { + digest1, digest2 := parser.NewDigest([]byte("aabbccdd")), parser.NewDigest([]byte("ddccbbaa")) + tag1 := tipb.ResourceGroupTag{SqlDigest: digest1.Bytes()} + tag2 := tipb.ResourceGroupTag{SqlDigest: digest2.Bytes()} + tag1Data, _ := tag1.Marshal() + tag2Data, _ := tag2.Marshal() err := &tikverr.ErrDeadlock{ Deadlock: &kvrpcpb.Deadlock{ LockTs: 101, @@ -238,13 +251,13 @@ func (s *testDeadlockHistorySuite) TestErrDeadlockToDeadlockRecord(c *C) { Txn: 100, WaitForTxn: 101, Key: []byte("k2"), - ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag("aabbccdd"), + ResourceGroupTag: tag1Data, }, { Txn: 101, WaitForTxn: 100, Key: []byte("k1"), - ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag("ddccbbaa"), + ResourceGroupTag: tag2Data, }, }, }, @@ -256,13 +269,13 @@ func (s *testDeadlockHistorySuite) TestErrDeadlockToDeadlockRecord(c *C) { WaitChain: []WaitChainItem{ { TryLockTxn: 100, - SQLDigest: "aabbccdd", + SQLDigest: digest1.String(), Key: []byte("k2"), TxnHoldingLock: 101, }, { TryLockTxn: 101, - SQLDigest: "ddccbbaa", + SQLDigest: digest2.String(), Key: []byte("k1"), TxnHoldingLock: 100, }, diff --git a/util/resourcegrouptag/resource_group_tag.go b/util/resourcegrouptag/resource_group_tag.go index cacbf574b91fb..3a0b20da5f90b 100644 --- a/util/resourcegrouptag/resource_group_tag.go +++ b/util/resourcegrouptag/resource_group_tag.go @@ -1,85 +1,32 @@ package resourcegrouptag import ( - "encoding/hex" - "github.com/pingcap/errors" - "github.com/pingcap/tidb/util/logutil" - "go.uber.org/zap" -) - -const ( - resourceGroupTagPrefixSQLDigest = byte(1) + "github.com/pingcap/parser" + "github.com/pingcap/tipb/go-tipb" ) // EncodeResourceGroupTag encodes sqlDigest into resource group tag. -// A resource group tag can be carried in the Context field of TiKV requests, which is a byte array, and sent to TiKV as -// diagnostic information. Currently it contains only the SQL Digest, and the codec method is naive but extendable. -// This function doesn't return error. When there's some error, which can only be caused by unexpected format of the -// arguments, it simply returns an empty result. -// The format: -// +-----------+-----------------------+----------------------------+---------------+----------------+---- -// | version=1 | field1 prefix (1byte) | field1 content (var bytes) | field2 prefix | field2 content | ... -// +-----------+-----------------------+----------------------------+---------------+----------------+---- -// The `version` section marks the codec version, which makes it easier for changing the format in the future. -// Each field starts with a byte to mark what field it is, and the length of the content depends on the field's -// definition. -// Currently there's only one field (SQL Digest), and its content starts with a byte `B` describing it's length, and -// then follows by exactly `B` bytes. -func EncodeResourceGroupTag(sqlDigest string) []byte { - if len(sqlDigest) == 0 { - return nil - } - if len(sqlDigest) >= 512 { - logutil.BgLogger().Warn("failed to encode sql digest to resource group tag: length too long", zap.String("sqlDigest", sqlDigest)) +func EncodeResourceGroupTag(sqlDigest *parser.Digest) []byte { + if sqlDigest == nil { return nil } - - res := make([]byte, 3+len(sqlDigest)/2) - - const encodingVersion = 1 - res[0] = encodingVersion - - res[1] = resourceGroupTagPrefixSQLDigest - // The SQL Digest is expected to be a hex string. Convert it back to bytes to save half of the memory. - res[2] = byte(len(sqlDigest) / 2) - _, err := hex.Decode(res[3:], []byte(sqlDigest)) - if err != nil { - logutil.BgLogger().Warn("failed to encode sql digest to resource group tag: invalid hex string", zap.String("sqlDigest", sqlDigest)) - return nil + tag := &tipb.ResourceGroupTag{ + SqlDigest: sqlDigest.Bytes(), } - - return res + b, _ := tag.Marshal() + return b } -// DecodeResourceGroupTag decodes a resource group tag into various information contained in it. Currently it contains -// only the SQL Digest. -func DecodeResourceGroupTag(data []byte) (sqlDigest string, err error) { +// DecodeResourceGroupTag decodes a resource group tag and return the sql digest. +func DecodeResourceGroupTag(data []byte) (sqlDigest []byte, err error) { if len(data) == 0 { - return "", nil - } - - encodingVersion := data[0] - if encodingVersion != 1 { - return "", errors.Errorf("unsupported resource group tag version %v", data[0]) + return nil, nil } - rem := data[1:] - - for len(rem) > 0 { - switch rem[0] { - case resourceGroupTagPrefixSQLDigest: - // There must be one more byte at rem[1] to represent the content's length, and the remaining bytes should - // not be shorter than the length specified by rem[1]. - if len(rem) < 2 || len(rem)-2 < int(rem[1]) { - return "", errors.Errorf("cannot parse resource group tag: field length mismatch, tag: %v", hex.EncodeToString(data)) - } - fieldLen := int(rem[1]) - sqlDigest = hex.EncodeToString(rem[2 : 2+fieldLen]) - rem = rem[2+fieldLen:] - default: - return "", errors.Errorf("resource group tag field not recognized, prefix: %v, tag: %v", rem[0], hex.EncodeToString(data)) - } + tag := &tipb.ResourceGroupTag{} + err = tag.Unmarshal(data) + if err != nil { + return nil, errors.Errorf("invalid resource group tag data %x", data) } - - return + return tag.SqlDigest, nil } diff --git a/util/resourcegrouptag/resource_group_tag_test.go b/util/resourcegrouptag/resource_group_tag_test.go index a996c72b8d7e6..cd32449843499 100644 --- a/util/resourcegrouptag/resource_group_tag_test.go +++ b/util/resourcegrouptag/resource_group_tag_test.go @@ -18,6 +18,7 @@ import ( "encoding/hex" "errors" "fmt" + "github.com/pingcap/parser" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tipb/go-tipb" "math/rand" @@ -35,96 +36,52 @@ func TestT(t *testing.T) { } func (s *testUtilsSuite) TestResourceGroupTagEncoding(c *C) { - sqlDigest := "" + sqlDigest := parser.NewDigest(nil) tag := EncodeResourceGroupTag(sqlDigest) c.Assert(len(tag), Equals, 0) decodedSQLDigest, err := DecodeResourceGroupTag(tag) c.Assert(err, IsNil) c.Assert(len(decodedSQLDigest), Equals, 0) - sqlDigest = "aa" + sqlDigest = parser.NewDigest([]byte{'a', 'a'}) tag = EncodeResourceGroupTag(sqlDigest) // version(1) + prefix(1) + length(1) + content(2hex -> 1byte) c.Assert(len(tag), Equals, 4) decodedSQLDigest, err = DecodeResourceGroupTag(tag) c.Assert(err, IsNil) - c.Assert(decodedSQLDigest, Equals, sqlDigest) + c.Assert(decodedSQLDigest, DeepEquals, sqlDigest.Bytes()) - sqlDigest = genRandHex(64) + sqlDigest = parser.NewDigest(genRandHex(64)) tag = EncodeResourceGroupTag(sqlDigest) decodedSQLDigest, err = DecodeResourceGroupTag(tag) c.Assert(err, IsNil) - c.Assert(decodedSQLDigest, Equals, sqlDigest) + c.Assert(decodedSQLDigest, DeepEquals, sqlDigest.Bytes()) - sqlDigest = genRandHex(510) + sqlDigest = parser.NewDigest(genRandHex(510)) tag = EncodeResourceGroupTag(sqlDigest) decodedSQLDigest, err = DecodeResourceGroupTag(tag) c.Assert(err, IsNil) - c.Assert(decodedSQLDigest, Equals, sqlDigest) - - // The max supported length is 255 bytes (510 hex digits). - sqlDigest = genRandHex(512) - tag = EncodeResourceGroupTag(sqlDigest) - c.Assert(len(tag), Equals, 0) - - // A hex string can't have odd length. - sqlDigest = genRandHex(15) - tag = EncodeResourceGroupTag(sqlDigest) - c.Assert(len(tag), Equals, 0) - - // Non-hexadecimal character is invalid - sqlDigest = "aabbccddgg" - tag = EncodeResourceGroupTag(sqlDigest) - c.Assert(len(tag), Equals, 0) - - // A tag should start with a supported version - tag = []byte("\x00") - _, err = DecodeResourceGroupTag(tag) - c.Assert(err, NotNil) - - // The fields should have format like `[prefix, length, content...]`, otherwise decoding it should returns error. - tag = []byte("\x01\x01") - _, err = DecodeResourceGroupTag(tag) - c.Assert(err, NotNil) - - tag = []byte("\x01\x01\x02") - _, err = DecodeResourceGroupTag(tag) - c.Assert(err, NotNil) - - tag = []byte("\x01\x01\x02AB") - decodedSQLDigest, err = DecodeResourceGroupTag(tag) - c.Assert(err, IsNil) - c.Assert(decodedSQLDigest, Equals, "4142") - - tag = []byte("\x01\x01\x00") - decodedSQLDigest, err = DecodeResourceGroupTag(tag) - c.Assert(err, IsNil) - c.Assert(len(decodedSQLDigest), Equals, 0) - - // Unsupported field - tag = []byte("\x01\x99") - _, err = DecodeResourceGroupTag(tag) - c.Assert(err, NotNil) + c.Assert(decodedSQLDigest, DeepEquals, sqlDigest.Bytes()) } -func genRandHex(length int) string { +func genRandHex(length int) []byte { const chars = "0123456789abcdef" res := make([]byte, length) for i := 0; i < length; i++ { res[i] = chars[rand.Intn(len(chars))] } - return string(res) + return res } -func genRandDigest(str string) []byte { +func genDigest(str string) []byte { hasher := sha256.New() hasher.Write([]byte(str)) return hasher.Sum(nil) } func (s *testUtilsSuite) TestResourceGroupTagEncodingPB(c *C) { - digest1 := genRandDigest("abc") - digest2 := genRandDigest("abcdefg") + digest1 := genDigest("abc") + digest2 := genDigest("abcdefg") // Test for manualEncode data := manualEncodeResourceGroupTag(digest1, digest2) c.Assert(len(data), Equals, 69) @@ -220,8 +177,8 @@ func manualDecodeResourceGroupTag(buf []byte) (sqlDigest []byte, planDigest []by } func BenchmarkResourceGroupManualEncode(b *testing.B) { - digest1 := genRandDigest("abc") - digest2 := genRandDigest("abcdefg") + digest1 := genDigest("abc") + digest2 := genDigest("abcdefg") b.ResetTimer() for i := 0; i < b.N; i++ { manualEncodeResourceGroupTag(digest1, digest2) @@ -229,8 +186,8 @@ func BenchmarkResourceGroupManualEncode(b *testing.B) { } func BenchmarkResourceGroupTagPBEncode(b *testing.B) { - digest1 := genRandDigest("abc") - digest2 := genRandDigest("abcdefg") + digest1 := genDigest("abc") + digest2 := genDigest("abcdefg") b.ResetTimer() for i := 0; i < b.N; i++ { resourceTag := &tipb.ResourceGroupTag{ @@ -242,8 +199,8 @@ func BenchmarkResourceGroupTagPBEncode(b *testing.B) { } func BenchmarkResourceGroupTagManualDecode(b *testing.B) { - digest1 := genRandDigest("abc") - digest2 := genRandDigest("abcdefg") + digest1 := genDigest("abc") + digest2 := genDigest("abcdefg") data := manualEncodeResourceGroupTag(digest1, digest2) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -252,8 +209,8 @@ func BenchmarkResourceGroupTagManualDecode(b *testing.B) { } func BenchmarkResourceGroupTagPBDecode(b *testing.B) { - digest1 := genRandDigest("abc") - digest2 := genRandDigest("abcdefg") + digest1 := genDigest("abc") + digest2 := genDigest("abcdefg") resourceTag := &tipb.ResourceGroupTag{ SqlDigest: digest1, PlanDigest: digest2, @@ -267,7 +224,7 @@ func BenchmarkResourceGroupTagPBDecode(b *testing.B) { } func BenchmarkHexDecode(b *testing.B) { - digest1 := genRandDigest("abc") + digest1 := genDigest("abc") b.ResetTimer() hash := fmt.Sprintf("%x", digest1) for i := 0; i < b.N; i++ { From d63cc22451fa5502099e3a15d62040782964e274 Mon Sep 17 00:00:00 2001 From: crazycs Date: Fri, 21 May 2021 19:11:07 +0800 Subject: [PATCH 04/15] set resource group task for each kv request Signed-off-by: crazycs --- config/config.go | 15 +++++++++++++++ distsql/request_builder.go | 10 ++++++++++ executor/adapter.go | 14 +++++++------- executor/analyze.go | 14 ++++++++++++++ executor/batch_point_get.go | 12 +++++++++--- executor/checksum.go | 7 +++++++ executor/executor.go | 4 ++-- executor/insert.go | 4 ++++ executor/point_get.go | 4 ++++ kv/kv.go | 2 ++ kv/option.go | 2 ++ planner/core/cache.go | 2 +- planner/core/encode.go | 8 ++++---- sessionctx/stmtctx/stmtctx.go | 19 ++++++++++++++++--- store/driver/txn/snapshot.go | 2 ++ store/driver/txn/txn_driver.go | 2 ++ store/tikv/scan.go | 7 ++++--- store/tikv/snapshot.go | 7 +++++++ util/deadlockhistory/deadlock_history_test.go | 11 ++--------- util/resourcegrouptag/resource_group_tag.go | 13 +++++++++---- .../resource_group_tag_test.go | 8 ++++---- 21 files changed, 127 insertions(+), 40 deletions(-) diff --git a/config/config.go b/config/config.go index 664cb5bceaa7d..10eafeb738458 100644 --- a/config/config.go +++ b/config/config.go @@ -137,6 +137,7 @@ type Config struct { DelayCleanTableLock uint64 `toml:"delay-clean-table-lock" json:"delay-clean-table-lock"` SplitRegionMaxNum uint64 `toml:"split-region-max-num" json:"split-region-max-num"` StmtSummary StmtSummary `toml:"stmt-summary" json:"stmt-summary"` + TopStmt TopStmt `toml:"top-stmt" json:"top-stmt"` // RepairMode indicates that the TiDB is in the repair mode for table meta. RepairMode bool `toml:"repair-mode" json:"repair-mode"` RepairTableList []string `toml:"repair-table-list" json:"repair-table-list"` @@ -527,6 +528,15 @@ type StmtSummary struct { HistorySize int `toml:"history-size" json:"history-size"` } +type TopStmt struct { + // Enable statement summary or not. + Enable bool `toml:"enable" json:"enable"` + // The refresh interval of statement summary. + RefreshInterval int `toml:"refresh-interval" json:"refresh-interval"` + // The maximum number of statements kept in memory. + MaxStmtCount uint `toml:"max-stmt-count" json:"max-stmt-count"` +} + // IsolationRead is the config for isolation read. type IsolationRead struct { // Engines filters tidb-server access paths by engine type. @@ -656,6 +666,11 @@ var defaultConf = Config{ RefreshInterval: 1800, HistorySize: 24, }, + TopStmt: TopStmt{ + Enable: true, + RefreshInterval: 1, + MaxStmtCount: 5000, + }, IsolationRead: IsolationRead{ Engines: []string{"tikv", "tiflash", "tidb"}, }, diff --git a/distsql/request_builder.go b/distsql/request_builder.go index aced1a71aaa7b..9864f25ded939 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -15,6 +15,7 @@ package distsql import ( "fmt" + "github.com/pingcap/tidb/config" "math" "sort" @@ -243,6 +244,9 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req }, } } + if config.GetGlobalConfig().TopStmt.Enable { + builder.SetResourceGroupTag(sv.StmtCtx.GetResourceGroupTag()) + } return builder } @@ -276,6 +280,12 @@ func (builder *RequestBuilder) SetFromInfoSchema(is infoschema.InfoSchema) *Requ return builder } +// SetResourceGroupTag sets the request resource group tag. +func (builder *RequestBuilder) SetResourceGroupTag(tag []byte) *RequestBuilder { + builder.Request.ResourceGroupTag = tag + return builder +} + func (builder *RequestBuilder) verifyTxnScope() error { if builder.txnScope == "" { builder.txnScope = kv.GlobalTxnScope diff --git a/executor/adapter.go b/executor/adapter.go index f36d822123543..18bd023e55f27 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -919,7 +919,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { statsInfos := plannercore.GetStatsInfo(a.Plan) memMax := sessVars.StmtCtx.MemTracker.MaxConsumed() diskMax := sessVars.StmtCtx.DiskTracker.MaxConsumed() - _, planDigest := getPlanDigest(a.Ctx, a.Plan) + planDigest := getPlanDigest(a.Ctx, a.Plan) slowItems := &variable.SlowQueryLogItems{ TxnTS: txnTS, SQL: sql.String(), @@ -1011,14 +1011,14 @@ func getPlanTree(sctx sessionctx.Context, p plannercore.Plan) string { } // getPlanDigest will try to get the select plan tree if the plan is select or the select plan of delete/update/insert statement. -func getPlanDigest(sctx sessionctx.Context, p plannercore.Plan) (normalized, planDigest string) { - normalized, planDigest = sctx.GetSessionVars().StmtCtx.GetPlanDigest() +func getPlanDigest(sctx sessionctx.Context, p plannercore.Plan) string { + normalized, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest() if len(normalized) > 0 { - return + return "" } normalized, planDigest = plannercore.NormalizePlan(p) sctx.GetSessionVars().StmtCtx.SetPlanDigest(normalized, planDigest) - return + return planDigest.String() } // getEncodedPlan gets the encoded plan, and generates the hint string if indicated. @@ -1092,11 +1092,11 @@ func (a *ExecStmt) SummaryStmt(succ bool) { var planDigestGen func() string if a.Plan.TP() == plancodec.TypePointGet { planDigestGen = func() string { - _, planDigest := getPlanDigest(a.Ctx, a.Plan) + planDigest := getPlanDigest(a.Ctx, a.Plan) return planDigest } } else { - _, planDigest = getPlanDigest(a.Ctx, a.Plan) + planDigest = getPlanDigest(a.Ctx, a.Plan) } execDetail := stmtCtx.GetExecDetails() diff --git a/executor/analyze.go b/executor/analyze.go index 633f6e4dcc69e..25325f47160f1 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "fmt" + "github.com/pingcap/tidb/config" "math" "math/rand" "runtime" @@ -355,6 +356,9 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang } else { kvReqBuilder = builder.SetIndexRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.tableID.GetStatisticsID()}, e.idxInfo.ID, ranges) } + if config.GetGlobalConfig().TopStmt.Enable { + kvReqBuilder.SetResourceGroupTag(e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTag()) + } kvReq, err := kvReqBuilder. SetAnalyzeRequest(e.analyzePB). SetStartTS(math.MaxUint64). @@ -653,6 +657,10 @@ func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error { func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectResult, error) { var builder distsql.RequestBuilder reqBuilder := builder.SetHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.tableID.GetStatisticsID()}, e.handleCols != nil && !e.handleCols.IsInt(), ranges, nil) + if config.GetGlobalConfig().TopStmt.Enable { + builder.SetResourceGroupTag(e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTag()) + } + // Always set KeepOrder of the request to be true, in order to compute // correct `correlation` of columns. kvReq, err := reqBuilder. @@ -1323,6 +1331,9 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } + if config.GetGlobalConfig().TopStmt.Enable { + snapshot.SetOption(kv.ResourceGroupTag, e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTag()) + } for _, t := range e.scanTasks { iter, err := snapshot.Iter(kv.Key(t.StartKey), kv.Key(t.EndKey)) if err != nil { @@ -1343,6 +1354,9 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) { snapshot.SetOption(kv.NotFillCache, true) snapshot.SetOption(kv.IsolationLevel, kv.RC) snapshot.SetOption(kv.Priority, kv.PriorityLow) + if config.GetGlobalConfig().TopStmt.Enable { + snapshot.SetOption(kv.ResourceGroupTag, e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTag()) + } if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 726603a0ff88f..17781dffabd97 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -16,6 +16,7 @@ package executor import ( "context" "fmt" + "github.com/pingcap/tidb/config" "sort" "sync/atomic" @@ -90,7 +91,9 @@ func (e *BatchPointGetExec) buildVirtualColumnInfo() { // Open implements the Executor interface. func (e *BatchPointGetExec) Open(context.Context) error { e.snapshotTS = e.startTS - txnCtx := e.ctx.GetSessionVars().TxnCtx + sessVars := e.ctx.GetSessionVars() + txnCtx := sessVars.TxnCtx + stmtCtx := sessVars.StmtCtx if e.lock { e.snapshotTS = txnCtx.GetForUpdateTS() } @@ -113,12 +116,12 @@ func (e *BatchPointGetExec) Open(context.Context) error { SnapshotRuntimeStats: snapshotStats, } snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats) - e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) + stmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } - snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) + snapshot.SetOption(kv.TaskID, stmtCtx.TaskID) isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness) if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != kv.GlobalTxnScope { @@ -129,6 +132,9 @@ func (e *BatchPointGetExec) Open(context.Context) error { }, }) } + if config.GetGlobalConfig().TopStmt.Enable { + snapshot.SetOption(kv.ResourceGroupTag, stmtCtx.GetResourceGroupTag()) + } var batchGetter kv.BatchGetter = snapshot if txn.Valid() { lock := e.tblInfo.Lock diff --git a/executor/checksum.go b/executor/checksum.go index 63f622d2f8140..c152b6d6d8664 100644 --- a/executor/checksum.go +++ b/executor/checksum.go @@ -18,6 +18,7 @@ import ( "strconv" "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" @@ -240,6 +241,9 @@ func (c *checksumContext) buildTableRequest(ctx sessionctx.Context, tableID int6 } var builder distsql.RequestBuilder + if config.GetGlobalConfig().TopStmt.Enable { + builder.SetResourceGroupTag(ctx.GetSessionVars().StmtCtx.GetResourceGroupTag()) + } return builder.SetHandleRanges(ctx.GetSessionVars().StmtCtx, tableID, c.TableInfo.IsCommonHandle, ranges, nil). SetChecksumRequest(checksum). SetStartTS(c.StartTs). @@ -256,6 +260,9 @@ func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, tableID int6 ranges := ranger.FullRange() var builder distsql.RequestBuilder + if config.GetGlobalConfig().TopStmt.Enable { + builder.SetResourceGroupTag(ctx.GetSessionVars().StmtCtx.GetResourceGroupTag()) + } return builder.SetIndexRanges(ctx.GetSessionVars().StmtCtx, tableID, indexInfo.ID, ranges). SetChecksumRequest(checksum). SetStartTS(c.StartTs). diff --git a/executor/executor.go b/executor/executor.go index 4b039db365e38..6eb617ea116a5 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -16,7 +16,6 @@ package executor import ( "context" "fmt" - "github.com/pingcap/tidb/util/resourcegrouptag" "math" "runtime" "runtime/trace" @@ -65,6 +64,7 @@ import ( "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/resourcegrouptag" "go.uber.org/zap" ) @@ -985,7 +985,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.Loc LockKeysDuration: &seVars.StmtCtx.LockKeysDuration, LockKeysCount: &seVars.StmtCtx.LockKeysCount, LockExpired: &seVars.TxnCtx.LockExpire, - ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(sqlDigest), + ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(sqlDigest, nil), OnDeadlock: func(deadlock *tikverr.ErrDeadlock) { // TODO: Support collecting retryable deadlocks according to the config. if !deadlock.IsRetryable { diff --git a/executor/insert.go b/executor/insert.go index 178aefed5fb8b..15927842127e7 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -17,6 +17,7 @@ import ( "context" "encoding/hex" "fmt" + "github.com/pingcap/tidb/config" "runtime/trace" "time" @@ -63,6 +64,9 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { if err != nil { return err } + if snapshot := txn.GetSnapshot(); snapshot != nil && config.GetGlobalConfig().TopStmt.Enable { + snapshot.SetOption(kv.ResourceGroupTag, sessVars.StmtCtx.GetResourceGroupTag()) + } txnSize := txn.Size() sessVars.StmtCtx.AddRecordRows(uint64(len(rows))) // If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored. diff --git a/executor/point_get.go b/executor/point_get.go index c5ff4b98fa2ba..076f1cb497999 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -16,6 +16,7 @@ package executor import ( "context" "fmt" + "github.com/pingcap/tidb/config" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -160,6 +161,9 @@ func (e *PointGetExecutor) Open(context.Context) error { }, }) } + if config.GetGlobalConfig().TopStmt.Enable { + e.snapshot.SetOption(kv.ResourceGroupTag, e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTag()) + } return nil } diff --git a/kv/kv.go b/kv/kv.go index 471dfe09a110b..56946d05fcb66 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -291,6 +291,8 @@ type Request struct { IsStaleness bool // MatchStoreLabels indicates the labels the store should be matched MatchStoreLabels []*metapb.StoreLabel + // ResourceGroupTag indicates the kv request task group. + ResourceGroupTag []byte } // ResultSubset represents a result subset from a single storage unit. diff --git a/kv/option.go b/kv/option.go index dc0d700666d5a..de5a1d8834c40 100644 --- a/kv/option.go +++ b/kv/option.go @@ -59,6 +59,8 @@ const ( IsStalenessReadOnly // MatchStoreLabels indicates the labels the store should be matched MatchStoreLabels + // ResourceGroupTag indicates the resource group of the kv request. + ResourceGroupTag ) // ReplicaReadType is the type of replica to read data from diff --git a/planner/core/cache.go b/planner/core/cache.go index 44408770abbeb..6328003fa955b 100644 --- a/planner/core/cache.go +++ b/planner/core/cache.go @@ -201,6 +201,6 @@ type CachedPrepareStmt struct { NormalizedSQL string NormalizedPlan string SQLDigest *parser.Digest - PlanDigest string + PlanDigest *parser.Digest ForUpdateRead bool } diff --git a/planner/core/encode.go b/planner/core/encode.go index d1cad479d52f8..e9f31ccb024e6 100644 --- a/planner/core/encode.go +++ b/planner/core/encode.go @@ -16,7 +16,7 @@ package core import ( "bytes" "crypto/sha256" - "fmt" + "github.com/pingcap/parser" "hash" "sync" @@ -120,10 +120,10 @@ type planDigester struct { } // NormalizePlan is used to normalize the plan and generate plan digest. -func NormalizePlan(p Plan) (normalized, digest string) { +func NormalizePlan(p Plan) (normalized string, digest *parser.Digest) { selectPlan := getSelectPlan(p) if selectPlan == nil { - return "", "" + return "", nil } d := digesterPool.Get().(*planDigester) defer digesterPool.Put(d) @@ -134,7 +134,7 @@ func NormalizePlan(p Plan) (normalized, digest string) { panic(err) } d.buf.Reset() - digest = fmt.Sprintf("%x", d.hasher.Sum(nil)) + digest = parser.NewDigest(d.hasher.Sum(nil)) d.hasher.Reset() return } diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 18318d562435d..5aef33f5deca7 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -14,6 +14,7 @@ package stmtctx import ( + "github.com/pingcap/tidb/util/resourcegrouptag" "math" "sort" "strconv" @@ -149,7 +150,7 @@ type StatementContext struct { } // planNormalized use for cache the normalized plan, avoid duplicate builds. planNormalized string - planDigest string + planDigest *parser.Digest encodedPlan string planHint string planHintSet bool @@ -165,6 +166,8 @@ type StatementContext struct { // stmtCache is used to store some statement-related values. stmtCache map[StmtCacheKey]interface{} + // resourceGroupTag cache for the current statement resource group tag. + resourceGroupTag []byte } // StmtHints are SessionVars related sql hints. @@ -244,12 +247,22 @@ func (sc *StatementContext) InitSQLDigest(normalized string, digest *parser.Dige } // GetPlanDigest gets the normalized plan and plan digest. -func (sc *StatementContext) GetPlanDigest() (normalized, planDigest string) { +func (sc *StatementContext) GetPlanDigest() (normalized string, planDigest *parser.Digest) { return sc.planNormalized, sc.planDigest } +// GetResourceGroupTag gets the resource group of the statement. +func (sc *StatementContext) GetResourceGroupTag() []byte { + if len(sc.resourceGroupTag) > 0 { + return sc.resourceGroupTag + } + _, sqlDigest := sc.SQLDigest() + sc.resourceGroupTag = resourcegrouptag.EncodeResourceGroupTag(sqlDigest, sc.planDigest) + return sc.resourceGroupTag +} + // SetPlanDigest sets the normalized plan and plan digest. -func (sc *StatementContext) SetPlanDigest(normalized, planDigest string) { +func (sc *StatementContext) SetPlanDigest(normalized string, planDigest *parser.Digest) { sc.planNormalized, sc.planDigest = normalized, planDigest } diff --git a/store/driver/txn/snapshot.go b/store/driver/txn/snapshot.go index 6692f45a749a3..892a85e9ccebb 100644 --- a/store/driver/txn/snapshot.go +++ b/store/driver/txn/snapshot.go @@ -88,6 +88,8 @@ func (s *tikvSnapshot) SetOption(opt int, val interface{}) { s.KVSnapshot.SetIsStatenessReadOnly(val.(bool)) case kv.MatchStoreLabels: s.KVSnapshot.SetMatchStoreLabels(val.([]*metapb.StoreLabel)) + case kv.ResourceGroupTag: + s.KVSnapshot.SetResourceGroupTag(val.([]byte)) } } diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 7f05f80139c12..6e66a73c86506 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -170,6 +170,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { txn.KVTxn.GetSnapshot().SetIsStatenessReadOnly(val.(bool)) case kv.MatchStoreLabels: txn.KVTxn.GetSnapshot().SetMatchStoreLabels(val.([]*metapb.StoreLabel)) + case kv.ResourceGroupTag: + txn.KVTxn.GetSnapshot().SetResourceGroupTag(val.([]byte)) } } diff --git a/store/tikv/scan.go b/store/tikv/scan.go index 94ece80ff067f..4d5c87161daad 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -192,9 +192,10 @@ func (s *Scanner) getData(bo *Backoffer) error { } sreq := &pb.ScanRequest{ Context: &pb.Context{ - Priority: s.snapshot.priority.ToPB(), - NotFillCache: s.snapshot.notFillCache, - IsolationLevel: s.snapshot.isolationLevel.ToPB(), + Priority: s.snapshot.priority.ToPB(), + NotFillCache: s.snapshot.notFillCache, + IsolationLevel: s.snapshot.isolationLevel.ToPB(), + ResourceGroupTag: s.snapshot.resourceGroupTag, }, StartKey: s.nextStartKey, EndKey: reqEndKey, diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 180ac59369aca..701a90abf5f09 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -108,6 +108,8 @@ type KVSnapshot struct { matchStoreLabels []*metapb.StoreLabel } sampleStep uint32 + // resourceGroupTag is use to set the kv request resource group tag. + resourceGroupTag []byte } // newTiKVSnapshot creates a snapshot of an TiKV store. @@ -629,6 +631,11 @@ func (s *KVSnapshot) SetMatchStoreLabels(labels []*metapb.StoreLabel) { s.mu.matchStoreLabels = labels } +// SetResourceGroupTag sets resource group of the kv request. +func (s *KVSnapshot) SetResourceGroupTag(tag []byte) { + s.resourceGroupTag = tag +} + // SnapCacheHitCount gets the snapshot cache hit count. Only for test. func (s *KVSnapshot) SnapCacheHitCount() int { return int(atomic.LoadInt64(&s.mu.hitCnt)) diff --git a/util/deadlockhistory/deadlock_history_test.go b/util/deadlockhistory/deadlock_history_test.go index 3374f8620eb15..dd9428a9f550a 100644 --- a/util/deadlockhistory/deadlock_history_test.go +++ b/util/deadlockhistory/deadlock_history_test.go @@ -14,17 +14,16 @@ package deadlockhistory import ( - "crypto/sha256" - "github.com/pingcap/parser" - "github.com/pingcap/tipb/go-tipb" "testing" "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/parser" tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/types" + "github.com/pingcap/tipb/go-tipb" ) type testDeadlockHistorySuite struct{} @@ -229,12 +228,6 @@ func (s *testDeadlockHistorySuite) TestGetDatum(c *C) { c.Assert(res[3][7].GetValue(), Equals, uint64(201)) // TRX_HOLDING_LOCK } -func genDigest(str string) *parser.Digest { - hasher := sha256.New() - hasher.Write([]byte(str)) - return parser.NewDigest(hasher.Sum(nil)) -} - func (s *testDeadlockHistorySuite) TestErrDeadlockToDeadlockRecord(c *C) { digest1, digest2 := parser.NewDigest([]byte("aabbccdd")), parser.NewDigest([]byte("ddccbbaa")) tag1 := tipb.ResourceGroupTag{SqlDigest: digest1.Bytes()} diff --git a/util/resourcegrouptag/resource_group_tag.go b/util/resourcegrouptag/resource_group_tag.go index 3a0b20da5f90b..7900759f0b43f 100644 --- a/util/resourcegrouptag/resource_group_tag.go +++ b/util/resourcegrouptag/resource_group_tag.go @@ -7,12 +7,17 @@ import ( ) // EncodeResourceGroupTag encodes sqlDigest into resource group tag. -func EncodeResourceGroupTag(sqlDigest *parser.Digest) []byte { - if sqlDigest == nil { +func EncodeResourceGroupTag(sqlDigest, planDigest *parser.Digest) []byte { + if sqlDigest == nil && planDigest == nil { return nil } - tag := &tipb.ResourceGroupTag{ - SqlDigest: sqlDigest.Bytes(), + + tag := &tipb.ResourceGroupTag{} + if sqlDigest != nil { + tag.SqlDigest = sqlDigest.Bytes() + } + if planDigest != nil { + tag.PlanDigest = planDigest.Bytes() } b, _ := tag.Marshal() return b diff --git a/util/resourcegrouptag/resource_group_tag_test.go b/util/resourcegrouptag/resource_group_tag_test.go index cd32449843499..9f68bd18bf6b1 100644 --- a/util/resourcegrouptag/resource_group_tag_test.go +++ b/util/resourcegrouptag/resource_group_tag_test.go @@ -37,14 +37,14 @@ func TestT(t *testing.T) { func (s *testUtilsSuite) TestResourceGroupTagEncoding(c *C) { sqlDigest := parser.NewDigest(nil) - tag := EncodeResourceGroupTag(sqlDigest) + tag := EncodeResourceGroupTag(sqlDigest, nil) c.Assert(len(tag), Equals, 0) decodedSQLDigest, err := DecodeResourceGroupTag(tag) c.Assert(err, IsNil) c.Assert(len(decodedSQLDigest), Equals, 0) sqlDigest = parser.NewDigest([]byte{'a', 'a'}) - tag = EncodeResourceGroupTag(sqlDigest) + tag = EncodeResourceGroupTag(sqlDigest, nil) // version(1) + prefix(1) + length(1) + content(2hex -> 1byte) c.Assert(len(tag), Equals, 4) decodedSQLDigest, err = DecodeResourceGroupTag(tag) @@ -52,13 +52,13 @@ func (s *testUtilsSuite) TestResourceGroupTagEncoding(c *C) { c.Assert(decodedSQLDigest, DeepEquals, sqlDigest.Bytes()) sqlDigest = parser.NewDigest(genRandHex(64)) - tag = EncodeResourceGroupTag(sqlDigest) + tag = EncodeResourceGroupTag(sqlDigest, nil) decodedSQLDigest, err = DecodeResourceGroupTag(tag) c.Assert(err, IsNil) c.Assert(decodedSQLDigest, DeepEquals, sqlDigest.Bytes()) sqlDigest = parser.NewDigest(genRandHex(510)) - tag = EncodeResourceGroupTag(sqlDigest) + tag = EncodeResourceGroupTag(sqlDigest, nil) decodedSQLDigest, err = DecodeResourceGroupTag(tag) c.Assert(err, IsNil) c.Assert(decodedSQLDigest, DeepEquals, sqlDigest.Bytes()) From 5769b9b79c81773688ef78beca2c0e0553cf9ef0 Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 24 May 2021 10:22:07 +0800 Subject: [PATCH 05/15] refine code and fix test Signed-off-by: crazycs --- distsql/request_builder.go | 12 ++++---- executor/analyze.go | 11 ++----- executor/batch_point_get.go | 2 +- executor/checksum.go | 9 ++---- executor/executor.go | 8 +++-- executor/insert.go | 2 +- executor/point_get.go | 2 +- planner/core/cache.go | 2 +- planner/core/encode.go | 2 +- sessionctx/stmtctx/stmtctx.go | 2 +- util/resourcegrouptag/resource_group_tag.go | 2 +- .../resource_group_tag_test.go | 29 +++++++++++++++---- 12 files changed, 47 insertions(+), 36 deletions(-) diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 9864f25ded939..40319c3834a31 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -15,13 +15,13 @@ package distsql import ( "fmt" - "github.com/pingcap/tidb/config" "math" "sort" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -244,9 +244,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req }, } } - if config.GetGlobalConfig().TopStmt.Enable { - builder.SetResourceGroupTag(sv.StmtCtx.GetResourceGroupTag()) - } + builder.SetResourceGroupTag(sv.StmtCtx) return builder } @@ -281,8 +279,10 @@ func (builder *RequestBuilder) SetFromInfoSchema(is infoschema.InfoSchema) *Requ } // SetResourceGroupTag sets the request resource group tag. -func (builder *RequestBuilder) SetResourceGroupTag(tag []byte) *RequestBuilder { - builder.Request.ResourceGroupTag = tag +func (builder *RequestBuilder) SetResourceGroupTag(sc *stmtctx.StatementContext) *RequestBuilder { + if config.GetGlobalConfig().TopStmt.Enable { + builder.Request.ResourceGroupTag = sc.GetResourceGroupTag() + } return builder } diff --git a/executor/analyze.go b/executor/analyze.go index 25325f47160f1..934ec353311f5 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -17,7 +17,6 @@ import ( "bytes" "context" "fmt" - "github.com/pingcap/tidb/config" "math" "math/rand" "runtime" @@ -35,6 +34,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/infoschema" @@ -356,9 +356,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang } else { kvReqBuilder = builder.SetIndexRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.tableID.GetStatisticsID()}, e.idxInfo.ID, ranges) } - if config.GetGlobalConfig().TopStmt.Enable { - kvReqBuilder.SetResourceGroupTag(e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTag()) - } + kvReqBuilder.SetResourceGroupTag(e.ctx.GetSessionVars().StmtCtx) kvReq, err := kvReqBuilder. SetAnalyzeRequest(e.analyzePB). SetStartTS(math.MaxUint64). @@ -657,10 +655,7 @@ func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error { func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectResult, error) { var builder distsql.RequestBuilder reqBuilder := builder.SetHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.tableID.GetStatisticsID()}, e.handleCols != nil && !e.handleCols.IsInt(), ranges, nil) - if config.GetGlobalConfig().TopStmt.Enable { - builder.SetResourceGroupTag(e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTag()) - } - + builder.SetResourceGroupTag(e.ctx.GetSessionVars().StmtCtx) // Always set KeepOrder of the request to be true, in order to compute // correct `correlation` of columns. kvReq, err := reqBuilder. diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 17781dffabd97..06bd7409e965e 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -16,7 +16,6 @@ package executor import ( "context" "fmt" - "github.com/pingcap/tidb/config" "sort" "sync/atomic" @@ -24,6 +23,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" diff --git a/executor/checksum.go b/executor/checksum.go index c152b6d6d8664..62543068820e9 100644 --- a/executor/checksum.go +++ b/executor/checksum.go @@ -18,7 +18,6 @@ import ( "strconv" "github.com/pingcap/parser/model" - "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" @@ -241,9 +240,7 @@ func (c *checksumContext) buildTableRequest(ctx sessionctx.Context, tableID int6 } var builder distsql.RequestBuilder - if config.GetGlobalConfig().TopStmt.Enable { - builder.SetResourceGroupTag(ctx.GetSessionVars().StmtCtx.GetResourceGroupTag()) - } + builder.SetResourceGroupTag(ctx.GetSessionVars().StmtCtx) return builder.SetHandleRanges(ctx.GetSessionVars().StmtCtx, tableID, c.TableInfo.IsCommonHandle, ranges, nil). SetChecksumRequest(checksum). SetStartTS(c.StartTs). @@ -260,9 +257,7 @@ func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, tableID int6 ranges := ranger.FullRange() var builder distsql.RequestBuilder - if config.GetGlobalConfig().TopStmt.Enable { - builder.SetResourceGroupTag(ctx.GetSessionVars().StmtCtx.GetResourceGroupTag()) - } + builder.SetResourceGroupTag(ctx.GetSessionVars().StmtCtx) return builder.SetIndexRanges(ctx.GetSessionVars().StmtCtx, tableID, indexInfo.ID, ranges). SetChecksumRequest(checksum). SetStartTS(c.StartTs). diff --git a/executor/executor.go b/executor/executor.go index 6eb617ea116a5..72e1c09c4cd07 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -16,6 +16,7 @@ package executor import ( "context" "fmt" + "github.com/pingcap/parser" "math" "runtime" "runtime/trace" @@ -974,8 +975,11 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { } func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.LockCtx { + var planDigest *parser.Digest _, sqlDigest := seVars.StmtCtx.SQLDigest() - + if config.GetGlobalConfig().TopStmt.Enable { + _, planDigest = seVars.StmtCtx.GetPlanDigest() + } return &tikvstore.LockCtx{ Killed: &seVars.Killed, ForUpdateTS: seVars.TxnCtx.GetForUpdateTS(), @@ -985,7 +989,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.Loc LockKeysDuration: &seVars.StmtCtx.LockKeysDuration, LockKeysCount: &seVars.StmtCtx.LockKeysCount, LockExpired: &seVars.TxnCtx.LockExpire, - ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(sqlDigest, nil), + ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(sqlDigest, planDigest), OnDeadlock: func(deadlock *tikverr.ErrDeadlock) { // TODO: Support collecting retryable deadlocks according to the config. if !deadlock.IsRetryable { diff --git a/executor/insert.go b/executor/insert.go index 15927842127e7..931fcf6f3fb42 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -17,12 +17,12 @@ import ( "context" "encoding/hex" "fmt" - "github.com/pingcap/tidb/config" "runtime/trace" "time" "github.com/opentracing/opentracing-go" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/table" diff --git a/executor/point_get.go b/executor/point_get.go index 076f1cb497999..c572f6985e885 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -16,13 +16,13 @@ package executor import ( "context" "fmt" - "github.com/pingcap/tidb/config" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/distsql" diff --git a/planner/core/cache.go b/planner/core/cache.go index 6328003fa955b..0e5a624b3d635 100644 --- a/planner/core/cache.go +++ b/planner/core/cache.go @@ -14,11 +14,11 @@ package core import ( - "github.com/pingcap/parser" "math" "sync/atomic" "time" + "github.com/pingcap/parser" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" diff --git a/planner/core/encode.go b/planner/core/encode.go index e9f31ccb024e6..e10b00d6c2844 100644 --- a/planner/core/encode.go +++ b/planner/core/encode.go @@ -16,11 +16,11 @@ package core import ( "bytes" "crypto/sha256" - "github.com/pingcap/parser" "hash" "sync" "github.com/pingcap/failpoint" + "github.com/pingcap/parser" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/plancodec" ) diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 5aef33f5deca7..1ad9058bf1b89 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -14,7 +14,6 @@ package stmtctx import ( - "github.com/pingcap/tidb/util/resourcegrouptag" "math" "sort" "strconv" @@ -29,6 +28,7 @@ import ( "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/resourcegrouptag" atomic2 "go.uber.org/atomic" "go.uber.org/zap" ) diff --git a/util/resourcegrouptag/resource_group_tag.go b/util/resourcegrouptag/resource_group_tag.go index 7900759f0b43f..525714cbae41e 100644 --- a/util/resourcegrouptag/resource_group_tag.go +++ b/util/resourcegrouptag/resource_group_tag.go @@ -6,7 +6,7 @@ import ( "github.com/pingcap/tipb/go-tipb" ) -// EncodeResourceGroupTag encodes sqlDigest into resource group tag. +// EncodeResourceGroupTag encodes sql digest and plan digest into resource group tag. func EncodeResourceGroupTag(sqlDigest, planDigest *parser.Digest) []byte { if sqlDigest == nil && planDigest == nil { return nil diff --git a/util/resourcegrouptag/resource_group_tag_test.go b/util/resourcegrouptag/resource_group_tag_test.go index 9f68bd18bf6b1..841a1f23649cc 100644 --- a/util/resourcegrouptag/resource_group_tag_test.go +++ b/util/resourcegrouptag/resource_group_tag_test.go @@ -15,16 +15,16 @@ package resourcegrouptag import ( "crypto/sha256" + "encoding/binary" "encoding/hex" "errors" "fmt" - "github.com/pingcap/parser" - "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/tipb/go-tipb" "math/rand" "testing" . "github.com/pingcap/check" + "github.com/pingcap/parser" + "github.com/pingcap/tipb/go-tipb" ) type testUtilsSuite struct{} @@ -127,17 +127,34 @@ func (s *testUtilsSuite) TestResourceGroupTagEncodingPB(c *C) { } +func encodeVarint(b []byte, v int64) []byte { + var data [binary.MaxVarintLen64]byte + n := binary.PutVarint(data[:], v) + return append(b, data[:n]...) +} + +func decodeVarint(b []byte) ([]byte, int64, error) { + v, n := binary.Varint(b) + if n > 0 { + return b[n:], v, nil + } + if n < 0 { + return nil, 0, errors.New("value larger than 64 bits") + } + return nil, 0, errors.New("insufficient bytes to decode value") +} + func manualEncodeResourceGroupTag(sqlDigest []byte, planDigest []byte) []byte { buf := make([]byte, 1, len(sqlDigest)+len(planDigest)+8) buf[0] = 1 // version if len(sqlDigest) > 0 { buf = append(buf, 1) // sql digest flag - buf = codec.EncodeVarint(buf, int64(len(sqlDigest))) + buf = encodeVarint(buf, int64(len(sqlDigest))) buf = append(buf, sqlDigest...) } if len(planDigest) > 0 { buf = append(buf, 2) // plan digest flag - buf = codec.EncodeVarint(buf, int64(len(planDigest))) + buf = encodeVarint(buf, int64(len(planDigest))) buf = append(buf, planDigest...) } return buf @@ -154,7 +171,7 @@ func manualDecodeResourceGroupTag(buf []byte) (sqlDigest []byte, planDigest []by var l int64 for len(buf) > 0 { flag := buf[0] - buf, l, err = codec.DecodeVarint(buf[1:]) + buf, l, err = decodeVarint(buf[1:]) if err != nil { return nil, nil, errors.New("invalid") } From 5624eb8d1b56a1d03db1c3cf11e7b3183be1c498 Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 24 May 2021 10:22:43 +0800 Subject: [PATCH 06/15] remove useless benchmark test Signed-off-by: crazycs --- .../resource_group_tag_test.go | 122 ------------------ 1 file changed, 122 deletions(-) diff --git a/util/resourcegrouptag/resource_group_tag_test.go b/util/resourcegrouptag/resource_group_tag_test.go index 841a1f23649cc..245412dd95c8a 100644 --- a/util/resourcegrouptag/resource_group_tag_test.go +++ b/util/resourcegrouptag/resource_group_tag_test.go @@ -126,125 +126,3 @@ func (s *testUtilsSuite) TestResourceGroupTagEncodingPB(c *C) { c.Assert(tag.PlanDigest, IsNil) } - -func encodeVarint(b []byte, v int64) []byte { - var data [binary.MaxVarintLen64]byte - n := binary.PutVarint(data[:], v) - return append(b, data[:n]...) -} - -func decodeVarint(b []byte) ([]byte, int64, error) { - v, n := binary.Varint(b) - if n > 0 { - return b[n:], v, nil - } - if n < 0 { - return nil, 0, errors.New("value larger than 64 bits") - } - return nil, 0, errors.New("insufficient bytes to decode value") -} - -func manualEncodeResourceGroupTag(sqlDigest []byte, planDigest []byte) []byte { - buf := make([]byte, 1, len(sqlDigest)+len(planDigest)+8) - buf[0] = 1 // version - if len(sqlDigest) > 0 { - buf = append(buf, 1) // sql digest flag - buf = encodeVarint(buf, int64(len(sqlDigest))) - buf = append(buf, sqlDigest...) - } - if len(planDigest) > 0 { - buf = append(buf, 2) // plan digest flag - buf = encodeVarint(buf, int64(len(planDigest))) - buf = append(buf, planDigest...) - } - return buf -} - -func manualDecodeResourceGroupTag(buf []byte) (sqlDigest []byte, planDigest []byte, err error) { - if len(buf) == 0 { - return nil, nil, errors.New("invalid") - } - if buf[0] != 1 { - return nil, nil, errors.New("invalid") - } - buf = buf[1:] - var l int64 - for len(buf) > 0 { - flag := buf[0] - buf, l, err = decodeVarint(buf[1:]) - if err != nil { - return nil, nil, errors.New("invalid") - } - if len(buf) < int(l) { - return nil, nil, errors.New("invalid") - } - data := make([]byte, l) - copy(data, buf[:l]) - buf = buf[l:] - switch flag { - case 1: // sql_digest - sqlDigest = data - case 2: // plan digest - planDigest = data - default: - return nil, nil, errors.New("invalid") - } - } - return -} - -func BenchmarkResourceGroupManualEncode(b *testing.B) { - digest1 := genDigest("abc") - digest2 := genDigest("abcdefg") - b.ResetTimer() - for i := 0; i < b.N; i++ { - manualEncodeResourceGroupTag(digest1, digest2) - } -} - -func BenchmarkResourceGroupTagPBEncode(b *testing.B) { - digest1 := genDigest("abc") - digest2 := genDigest("abcdefg") - b.ResetTimer() - for i := 0; i < b.N; i++ { - resourceTag := &tipb.ResourceGroupTag{ - SqlDigest: digest1, - PlanDigest: digest2, - } - resourceTag.Marshal() - } -} - -func BenchmarkResourceGroupTagManualDecode(b *testing.B) { - digest1 := genDigest("abc") - digest2 := genDigest("abcdefg") - data := manualEncodeResourceGroupTag(digest1, digest2) - b.ResetTimer() - for i := 0; i < b.N; i++ { - manualDecodeResourceGroupTag(data) - } -} - -func BenchmarkResourceGroupTagPBDecode(b *testing.B) { - digest1 := genDigest("abc") - digest2 := genDigest("abcdefg") - resourceTag := &tipb.ResourceGroupTag{ - SqlDigest: digest1, - PlanDigest: digest2, - } - data, _ := resourceTag.Marshal() - b.ResetTimer() - for i := 0; i < b.N; i++ { - tag := &tipb.ResourceGroupTag{} - tag.Unmarshal(data) - } -} - -func BenchmarkHexDecode(b *testing.B) { - digest1 := genDigest("abc") - b.ResetTimer() - hash := fmt.Sprintf("%x", digest1) - for i := 0; i < b.N; i++ { - hex.DecodeString(hash) - } -} From addd0ef0ff730c5b48b5f5a6d51be8ae56cb8c7c Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 24 May 2021 10:33:13 +0800 Subject: [PATCH 07/15] refine code Signed-off-by: crazycs --- executor/analyze.go | 8 ++------ executor/batch_point_get.go | 5 +---- executor/executor.go | 8 +++++++- executor/insert.go | 4 +--- executor/point_get.go | 4 +--- util/resourcegrouptag/resource_group_tag_test.go | 1 - 6 files changed, 12 insertions(+), 18 deletions(-) diff --git a/executor/analyze.go b/executor/analyze.go index 934ec353311f5..09be83cc15a0e 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -1326,9 +1326,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } - if config.GetGlobalConfig().TopStmt.Enable { - snapshot.SetOption(kv.ResourceGroupTag, e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTag()) - } + setResourceGroupTagForSnapshot(e.ctx.GetSessionVars().StmtCtx, snapshot) for _, t := range e.scanTasks { iter, err := snapshot.Iter(kv.Key(t.StartKey), kv.Key(t.EndKey)) if err != nil { @@ -1349,9 +1347,7 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) { snapshot.SetOption(kv.NotFillCache, true) snapshot.SetOption(kv.IsolationLevel, kv.RC) snapshot.SetOption(kv.Priority, kv.PriorityLow) - if config.GetGlobalConfig().TopStmt.Enable { - snapshot.SetOption(kv.ResourceGroupTag, e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTag()) - } + setResourceGroupTagForSnapshot(e.ctx.GetSessionVars().StmtCtx, snapshot) if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 06bd7409e965e..fb89a70a7bc4b 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" - "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" @@ -132,9 +131,7 @@ func (e *BatchPointGetExec) Open(context.Context) error { }, }) } - if config.GetGlobalConfig().TopStmt.Enable { - snapshot.SetOption(kv.ResourceGroupTag, stmtCtx.GetResourceGroupTag()) - } + setResourceGroupTagForSnapshot(stmtCtx, snapshot) var batchGetter kv.BatchGetter = snapshot if txn.Valid() { lock := e.tblInfo.Lock diff --git a/executor/executor.go b/executor/executor.go index 72e1c09c4cd07..7f488cbf28ce9 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -16,7 +16,6 @@ package executor import ( "context" "fmt" - "github.com/pingcap/parser" "math" "runtime" "runtime/trace" @@ -30,6 +29,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/parser" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/auth" "github.com/pingcap/parser/model" @@ -1799,3 +1799,9 @@ func FillVirtualColumnValue(virtualRetTypes []*types.FieldType, virtualColumnInd } return nil } + +func setResourceGroupTagForSnapshot(sc *stmtctx.StatementContext, snapshot kv.Snapshot) { + if snapshot != nil && config.GetGlobalConfig().TopStmt.Enable { + snapshot.SetOption(kv.ResourceGroupTag, sc.GetResourceGroupTag()) + } +} diff --git a/executor/insert.go b/executor/insert.go index 931fcf6f3fb42..38d15b76ac65a 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -64,9 +64,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { if err != nil { return err } - if snapshot := txn.GetSnapshot(); snapshot != nil && config.GetGlobalConfig().TopStmt.Enable { - snapshot.SetOption(kv.ResourceGroupTag, sessVars.StmtCtx.GetResourceGroupTag()) - } + setResourceGroupTagForSnapshot(sessVars.StmtCtx, txn.GetSnapshot()) txnSize := txn.Size() sessVars.StmtCtx.AddRecordRows(uint64(len(rows))) // If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored. diff --git a/executor/point_get.go b/executor/point_get.go index c572f6985e885..9de1d8053884d 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -161,9 +161,7 @@ func (e *PointGetExecutor) Open(context.Context) error { }, }) } - if config.GetGlobalConfig().TopStmt.Enable { - e.snapshot.SetOption(kv.ResourceGroupTag, e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTag()) - } + setResourceGroupTagForSnapshot(e.ctx.GetSessionVars().StmtCtx, e.snapshot) return nil } diff --git a/util/resourcegrouptag/resource_group_tag_test.go b/util/resourcegrouptag/resource_group_tag_test.go index 245412dd95c8a..4b9582194ad57 100644 --- a/util/resourcegrouptag/resource_group_tag_test.go +++ b/util/resourcegrouptag/resource_group_tag_test.go @@ -124,5 +124,4 @@ func (s *testUtilsSuite) TestResourceGroupTagEncodingPB(c *C) { c.Assert(err, IsNil) c.Assert(tag.SqlDigest, DeepEquals, digest1) c.Assert(tag.PlanDigest, IsNil) - } From 3f0c2a36e45a2257fbdd72e7fba90ece7c091159 Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 24 May 2021 11:07:50 +0800 Subject: [PATCH 08/15] add resource tag for txn commit rpc request Signed-off-by: crazycs --- config/config.go | 12 +++++++++--- distsql/request_builder.go | 2 +- executor/analyze.go | 5 ++--- executor/batch_point_get.go | 2 +- executor/executor.go | 6 +++--- executor/insert.go | 3 +-- executor/point_get.go | 3 +-- executor/replace.go | 1 + executor/update.go | 7 +++++++ store/driver/txn/txn_driver.go | 2 +- store/tikv/2pc.go | 3 +++ store/tikv/cleanup.go | 2 +- store/tikv/commit.go | 2 +- store/tikv/prewrite.go | 2 +- store/tikv/txn.go | 7 +++++++ util/resourcegrouptag/resource_group_tag.go | 5 ++++- 16 files changed, 44 insertions(+), 20 deletions(-) diff --git a/config/config.go b/config/config.go index 10eafeb738458..ab6ff0749f1cd 100644 --- a/config/config.go +++ b/config/config.go @@ -137,7 +137,7 @@ type Config struct { DelayCleanTableLock uint64 `toml:"delay-clean-table-lock" json:"delay-clean-table-lock"` SplitRegionMaxNum uint64 `toml:"split-region-max-num" json:"split-region-max-num"` StmtSummary StmtSummary `toml:"stmt-summary" json:"stmt-summary"` - TopStmt TopStmt `toml:"top-stmt" json:"top-stmt"` + TopStmt TopSQL `toml:"top-sql" json:"top-sql"` // RepairMode indicates that the TiDB is in the repair mode for table meta. RepairMode bool `toml:"repair-mode" json:"repair-mode"` RepairTableList []string `toml:"repair-table-list" json:"repair-table-list"` @@ -528,7 +528,8 @@ type StmtSummary struct { HistorySize int `toml:"history-size" json:"history-size"` } -type TopStmt struct { +// TopSQL is the config for top sql. +type TopSQL struct { // Enable statement summary or not. Enable bool `toml:"enable" json:"enable"` // The refresh interval of statement summary. @@ -666,7 +667,7 @@ var defaultConf = Config{ RefreshInterval: 1800, HistorySize: 24, }, - TopStmt: TopStmt{ + TopStmt: TopSQL{ Enable: true, RefreshInterval: 1, MaxStmtCount: 5000, @@ -958,6 +959,11 @@ func TableLockEnabled() bool { return GetGlobalConfig().EnableTableLock } +// TopSQLEnabled uses to check whether enabled the top SQL feature. +func TopSQLEnabled() bool { + return GetGlobalConfig().TopStmt.Enable +} + // TableLockDelayClean uses to get the time of delay clean table lock. var TableLockDelayClean = func() uint64 { return GetGlobalConfig().DelayCleanTableLock diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 40319c3834a31..99879cf345bac 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -280,7 +280,7 @@ func (builder *RequestBuilder) SetFromInfoSchema(is infoschema.InfoSchema) *Requ // SetResourceGroupTag sets the request resource group tag. func (builder *RequestBuilder) SetResourceGroupTag(sc *stmtctx.StatementContext) *RequestBuilder { - if config.GetGlobalConfig().TopStmt.Enable { + if config.TopSQLEnabled() { builder.Request.ResourceGroupTag = sc.GetResourceGroupTag() } return builder diff --git a/executor/analyze.go b/executor/analyze.go index 09be83cc15a0e..993cfb6575b5a 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -34,7 +34,6 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" - "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/infoschema" @@ -1326,7 +1325,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } - setResourceGroupTagForSnapshot(e.ctx.GetSessionVars().StmtCtx, snapshot) + setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot) for _, t := range e.scanTasks { iter, err := snapshot.Iter(kv.Key(t.StartKey), kv.Key(t.EndKey)) if err != nil { @@ -1347,7 +1346,7 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) { snapshot.SetOption(kv.NotFillCache, true) snapshot.SetOption(kv.IsolationLevel, kv.RC) snapshot.SetOption(kv.Priority, kv.PriorityLow) - setResourceGroupTagForSnapshot(e.ctx.GetSessionVars().StmtCtx, snapshot) + setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot) if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index fb89a70a7bc4b..bcd7b5bef2008 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -131,7 +131,7 @@ func (e *BatchPointGetExec) Open(context.Context) error { }, }) } - setResourceGroupTagForSnapshot(stmtCtx, snapshot) + setResourceGroupTagForTxn(stmtCtx, snapshot) var batchGetter kv.BatchGetter = snapshot if txn.Valid() { lock := e.tblInfo.Lock diff --git a/executor/executor.go b/executor/executor.go index 7f488cbf28ce9..1d136bac8a2f9 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -977,7 +977,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.LockCtx { var planDigest *parser.Digest _, sqlDigest := seVars.StmtCtx.SQLDigest() - if config.GetGlobalConfig().TopStmt.Enable { + if config.TopSQLEnabled() { _, planDigest = seVars.StmtCtx.GetPlanDigest() } return &tikvstore.LockCtx{ @@ -1800,8 +1800,8 @@ func FillVirtualColumnValue(virtualRetTypes []*types.FieldType, virtualColumnInd return nil } -func setResourceGroupTagForSnapshot(sc *stmtctx.StatementContext, snapshot kv.Snapshot) { - if snapshot != nil && config.GetGlobalConfig().TopStmt.Enable { +func setResourceGroupTagForTxn(sc *stmtctx.StatementContext, snapshot kv.Snapshot) { + if snapshot != nil && config.TopSQLEnabled() { snapshot.SetOption(kv.ResourceGroupTag, sc.GetResourceGroupTag()) } } diff --git a/executor/insert.go b/executor/insert.go index 38d15b76ac65a..351f04c2ca5eb 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -22,7 +22,6 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/parser/mysql" - "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/table" @@ -64,7 +63,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { if err != nil { return err } - setResourceGroupTagForSnapshot(sessVars.StmtCtx, txn.GetSnapshot()) + setResourceGroupTagForTxn(sessVars.StmtCtx, txn) txnSize := txn.Size() sessVars.StmtCtx.AddRecordRows(uint64(len(rows))) // If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored. diff --git a/executor/point_get.go b/executor/point_get.go index 9de1d8053884d..99b5c52161baa 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" - "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/distsql" @@ -161,7 +160,7 @@ func (e *PointGetExecutor) Open(context.Context) error { }, }) } - setResourceGroupTagForSnapshot(e.ctx.GetSessionVars().StmtCtx, e.snapshot) + setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, e.snapshot) return nil } diff --git a/executor/replace.go b/executor/replace.go index 8f35be4d05dbd..03dc4bfad0543 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -224,6 +224,7 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error { defer snapshot.DelOption(kv.CollectRuntimeStats) } } + setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, txn) prefetchStart := time.Now() // Use BatchGet to fill cache. // It's an optimization and could be removed without affecting correctness. diff --git a/executor/update.go b/executor/update.go index 7c4b07ab8e6f6..0d74d9a1218c2 100644 --- a/executor/update.go +++ b/executor/update.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" plannercore "github.com/pingcap/tidb/planner/core" @@ -263,6 +264,12 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) { txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) } } + if config.TopSQLEnabled() { + txn, err := e.ctx.Txn(false) + if err == nil { + txn.SetOption(kv.ResourceGroupTag, e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTag()) + } + } for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ { chunkRow := chk.GetRow(rowIdx) datumRow := chunkRow.GetDatumRow(fields) diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 6e66a73c86506..a722557f8fc4e 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -171,7 +171,7 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { case kv.MatchStoreLabels: txn.KVTxn.GetSnapshot().SetMatchStoreLabels(val.([]*metapb.StoreLabel)) case kv.ResourceGroupTag: - txn.KVTxn.GetSnapshot().SetResourceGroupTag(val.([]byte)) + txn.KVTxn.SetResourceGroupTag(val.([]byte)) } } diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index c01d97981dd09..a299ba357d3e8 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -107,6 +107,8 @@ type twoPhaseCommitter struct { doingAmend bool binlog BinlogExecutor + + resourceGroupTag []byte } type memBufferMutations struct { @@ -428,6 +430,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { c.lockTTL = txnLockTTL(txn.startTime, size) c.priority = txn.priority.ToPB() c.syncLog = txn.syncLog + c.resourceGroupTag = txn.resourceGroupTag c.setDetail(commitDetail) return nil } diff --git a/store/tikv/cleanup.go b/store/tikv/cleanup.go index 0260d770cdd44..e21c1211af9bf 100644 --- a/store/tikv/cleanup.go +++ b/store/tikv/cleanup.go @@ -40,7 +40,7 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batc req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &pb.BatchRollbackRequest{ Keys: batch.mutations.GetKeys(), StartVersion: c.startTS, - }, pb.Context{Priority: c.priority, SyncLog: c.syncLog}) + }, pb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag}) resp, err := c.store.SendReq(bo, req, batch.region, ReadTimeoutShort) if err != nil { return errors.Trace(err) diff --git a/store/tikv/commit.go b/store/tikv/commit.go index 10c60d9f6d4bd..8e876a6f11468 100644 --- a/store/tikv/commit.go +++ b/store/tikv/commit.go @@ -46,7 +46,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch StartVersion: c.startTS, Keys: keys, CommitVersion: c.commitTS, - }, pb.Context{Priority: c.priority, SyncLog: c.syncLog}) + }, pb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag}) sender := NewRegionRequestSender(c.store.regionCache, c.store.GetTiKVClient()) resp, err := sender.SendReq(bo, req, batch.region, ReadTimeoutShort) diff --git a/store/tikv/prewrite.go b/store/tikv/prewrite.go index 49ddc1525b748..305806c931149 100644 --- a/store/tikv/prewrite.go +++ b/store/tikv/prewrite.go @@ -116,7 +116,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u req.TryOnePc = true } - return tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req, pb.Context{Priority: c.priority, SyncLog: c.syncLog}) + return tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req, pb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag}) } func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchMutations) error { diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 988f6501be553..ae4762d6909f5 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -133,6 +133,7 @@ type KVTxn struct { causalConsistency bool scope string kvFilter KVFilter + resourceGroupTag []byte } // ExtractStartTs use `option` to get the proper startTS for a transaction @@ -293,6 +294,12 @@ func (txn *KVTxn) SetPriority(pri Priority) { txn.GetSnapshot().SetPriority(pri) } +// SetResourceGroupTag sets the resource tag for both write and read. +func (txn *KVTxn) SetResourceGroupTag(tag []byte) { + txn.resourceGroupTag = tag + txn.GetSnapshot().SetResourceGroupTag(tag) +} + // SetSchemaAmender sets an amender to update mutations after schema change. func (txn *KVTxn) SetSchemaAmender(sa SchemaAmender) { txn.schemaAmender = sa diff --git a/util/resourcegrouptag/resource_group_tag.go b/util/resourcegrouptag/resource_group_tag.go index 525714cbae41e..03150a0393ea4 100644 --- a/util/resourcegrouptag/resource_group_tag.go +++ b/util/resourcegrouptag/resource_group_tag.go @@ -19,7 +19,10 @@ func EncodeResourceGroupTag(sqlDigest, planDigest *parser.Digest) []byte { if planDigest != nil { tag.PlanDigest = planDigest.Bytes() } - b, _ := tag.Marshal() + b, err := tag.Marshal() + if err != nil { + return nil + } return b } From c6b926ca26b5814ab4e110a0da5e8fb5f7af5f0f Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 24 May 2021 15:15:07 +0800 Subject: [PATCH 09/15] add test and fix some bug Signed-off-by: crazycs --- config/config.go | 6 +- executor/adapter.go | 11 ++-- executor/executor_test.go | 105 ++++++++++++++++++++++++++++++++ planner/core/encode.go | 2 +- sessionctx/stmtctx/stmtctx.go | 4 +- store/copr/batch_coprocessor.go | 13 ++-- store/copr/coprocessor.go | 13 ++-- store/mockstore/unistore/rpc.go | 9 +++ store/tikv/scan.go | 7 ++- store/tikv/snapshot.go | 14 +++-- 10 files changed, 154 insertions(+), 30 deletions(-) diff --git a/config/config.go b/config/config.go index ab6ff0749f1cd..83490c345dae8 100644 --- a/config/config.go +++ b/config/config.go @@ -137,7 +137,7 @@ type Config struct { DelayCleanTableLock uint64 `toml:"delay-clean-table-lock" json:"delay-clean-table-lock"` SplitRegionMaxNum uint64 `toml:"split-region-max-num" json:"split-region-max-num"` StmtSummary StmtSummary `toml:"stmt-summary" json:"stmt-summary"` - TopStmt TopSQL `toml:"top-sql" json:"top-sql"` + TopSQL TopSQL `toml:"top-sql" json:"top-sql"` // RepairMode indicates that the TiDB is in the repair mode for table meta. RepairMode bool `toml:"repair-mode" json:"repair-mode"` RepairTableList []string `toml:"repair-table-list" json:"repair-table-list"` @@ -667,7 +667,7 @@ var defaultConf = Config{ RefreshInterval: 1800, HistorySize: 24, }, - TopStmt: TopSQL{ + TopSQL: TopSQL{ Enable: true, RefreshInterval: 1, MaxStmtCount: 5000, @@ -961,7 +961,7 @@ func TableLockEnabled() bool { // TopSQLEnabled uses to check whether enabled the top SQL feature. func TopSQLEnabled() bool { - return GetGlobalConfig().TopStmt.Enable + return GetGlobalConfig().TopSQL.Enable } // TableLockDelayClean uses to get the time of delay clean table lock. diff --git a/executor/adapter.go b/executor/adapter.go index 18bd023e55f27..82a50ba7448f6 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -333,6 +333,8 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { return nil, err } + getPlanDigest(a.Ctx, a.Plan) + if err = e.Open(ctx); err != nil { terror.Call(e.Close) return nil, err @@ -1012,12 +1014,13 @@ func getPlanTree(sctx sessionctx.Context, p plannercore.Plan) string { // getPlanDigest will try to get the select plan tree if the plan is select or the select plan of delete/update/insert statement. func getPlanDigest(sctx sessionctx.Context, p plannercore.Plan) string { - normalized, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest() - if len(normalized) > 0 { - return "" + sc := sctx.GetSessionVars().StmtCtx + normalized, planDigest := sc.GetPlanDigest() + if planDigest != nil { + return planDigest.String() } normalized, planDigest = plannercore.NormalizePlan(p) - sctx.GetSessionVars().StmtCtx.SetPlanDigest(normalized, planDigest) + sc.SetPlanDigest(normalized, planDigest) return planDigest.String() } diff --git a/executor/executor_test.go b/executor/executor_test.go index a796a27326a70..969cbd20a6587 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -32,6 +32,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/kvrpcpb" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/parser" "github.com/pingcap/parser/auth" @@ -59,6 +60,7 @@ import ( "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/store/copr" "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/store/mockstore/unistore" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/mockstore/cluster" "github.com/pingcap/tidb/store/tikv/oracle" @@ -147,6 +149,7 @@ var _ = SerialSuites(&testSerialSuite{&baseTestSuite{}}) var _ = SerialSuites(&testStaleTxnSerialSuite{&baseTestSuite{}}) var _ = SerialSuites(&testCoprCache{}) var _ = SerialSuites(&testPrepareSuite{}) +var _ = SerialSuites(&testResourceTagSuite{&baseTestSuite{}}) type testSuite struct{ *baseTestSuite } type testSuiteP1 struct{ *baseTestSuite } @@ -167,6 +170,7 @@ type testCoprCache struct { cls cluster.Cluster } type testPrepareSuite struct{ testData testutil.TestData } +type testResourceTagSuite struct{ *baseTestSuite } type baseTestSuite struct { cluster cluster.Cluster @@ -8295,3 +8299,104 @@ func (s testSerialSuite) TestExprBlackListForEnum(c *C) { rows = tk.MustQuery("desc format='brief' select * from t where b = 1 and a > 'a'").Rows() c.Assert(checkFuncPushDown(rows, "index:idx(b, a)"), IsTrue) } + +func (s *testResourceTagSuite) TestResourceGroupTag(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(a int, b int, unique index idx(a));") + tbInfo := testGetTableByName(c, tk.Se, "test", "t") + + // Enable Top SQL + cfg := config.GetGlobalConfig() + newCfg := *cfg + newCfg.TopSQL.Enable = true + config.StoreGlobalConfig(&newCfg) + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/unistoreRPCClientSendHook", `return(true)`), IsNil) + defer failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/unistoreRPCClientSendHook") + + var sqlDigest, planDigest *parser.Digest + unistore.UnistoreRPCClientSendHook = func(req *tikvrpc.Request) { + var startKey []byte + var ctx *kvrpcpb.Context + switch req.Type { + case tikvrpc.CmdGet: + request := req.Get() + startKey = request.Key + ctx = request.Context + case tikvrpc.CmdBatchGet: + request := req.BatchGet() + startKey = request.Keys[0] + ctx = request.Context + case tikvrpc.CmdPrewrite: + request := req.Prewrite() + startKey = request.Mutations[0].Key + ctx = request.Context + case tikvrpc.CmdCommit: + request := req.Commit() + startKey = request.Keys[0] + ctx = request.Context + case tikvrpc.CmdCop: + request := req.Cop() + startKey = request.Ranges[0].Start + ctx = request.Context + case tikvrpc.CmdPessimisticLock: + request := req.PessimisticLock() + startKey = request.PrimaryLock + ctx = request.Context + } + tid := tablecodec.DecodeTableID(startKey) + if tid != tbInfo.Meta().ID { + return + } + if ctx == nil { + return + } + tag := &tipb.ResourceGroupTag{} + err := tag.Unmarshal(ctx.ResourceGroupTag) + c.Assert(err, IsNil) + sqlDigest = parser.NewDigest(tag.SqlDigest) + planDigest = parser.NewDigest(tag.PlanDigest) + } + + resetVars := func() { + sqlDigest = parser.NewDigest(nil) + planDigest = parser.NewDigest(nil) + } + + cases := []struct { + sql string + ignore bool + }{ + {sql: "insert into t values(1,1),(2,2),(3,3)"}, + {sql: "select * from t use index (idx) where a=1"}, + {sql: "select * from t use index (idx) where a in (1,2,3)"}, + {sql: "select * from t use index (idx) where a>1"}, + {sql: "select * from t where b>1"}, + {sql: "begin pessimistic", ignore: true}, + {sql: "insert into t values(4,4)"}, + {sql: "commit", ignore: true}, + } + for _, ca := range cases { + resetVars() + commentf := Commentf("%v", ca.sql) + if strings.HasPrefix(ca.sql, "select") { + tk.MustQuery(ca.sql) + } else { + tk.MustExec(ca.sql) + } + if ca.ignore { + continue + } + _, expectSQLDigest := parser.NormalizeDigest(ca.sql) + c.Assert(sqlDigest.String(), Equals, expectSQLDigest.String(), commentf) + + info := tk.Se.ShowProcess() + c.Assert(info, NotNil) + p, ok := info.Plan.(plannercore.Plan) + c.Assert(ok, IsTrue) + _, expectPlanDigest := plannercore.NormalizePlan(p) + c.Assert(planDigest.String(), Equals, expectPlanDigest.String()) + } +} diff --git a/planner/core/encode.go b/planner/core/encode.go index e10b00d6c2844..8dc6ddeca9473 100644 --- a/planner/core/encode.go +++ b/planner/core/encode.go @@ -123,7 +123,7 @@ type planDigester struct { func NormalizePlan(p Plan) (normalized string, digest *parser.Digest) { selectPlan := getSelectPlan(p) if selectPlan == nil { - return "", nil + return "", parser.NewDigest(nil) } d := digesterPool.Get().(*planDigester) defer digesterPool.Put(d) diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 1ad9058bf1b89..c6333bc47d820 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -263,7 +263,9 @@ func (sc *StatementContext) GetResourceGroupTag() []byte { // SetPlanDigest sets the normalized plan and plan digest. func (sc *StatementContext) SetPlanDigest(normalized string, planDigest *parser.Digest) { - sc.planNormalized, sc.planDigest = normalized, planDigest + if planDigest != nil { + sc.planNormalized, sc.planDigest = normalized, planDigest + } } // GetEncodedPlan gets the encoded plan, it is used to avoid repeated encode. diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 6efbf76775186..0f9d73d98eb0c 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -507,12 +507,13 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, ta } req := tikvrpc.NewRequest(task.cmdType, &copReq, kvrpcpb.Context{ - IsolationLevel: isolationLevelToPB(b.req.IsolationLevel), - Priority: priorityToPB(b.req.Priority), - NotFillCache: b.req.NotFillCache, - RecordTimeStat: true, - RecordScanStat: true, - TaskId: b.req.TaskID, + IsolationLevel: isolationLevelToPB(b.req.IsolationLevel), + Priority: priorityToPB(b.req.Priority), + NotFillCache: b.req.NotFillCache, + RecordTimeStat: true, + RecordScanStat: true, + TaskId: b.req.TaskID, + ResourceGroupTag: b.req.ResourceGroupTag, }) req.StoreTp = tikvrpc.TiFlash diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index c66c3cda9af35..8834824432bfd 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -699,12 +699,13 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch } req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, options.GetTiKVReplicaReadType(worker.req.ReplicaRead), &worker.replicaReadSeed, kvrpcpb.Context{ - IsolationLevel: isolationLevelToPB(worker.req.IsolationLevel), - Priority: priorityToPB(worker.req.Priority), - NotFillCache: worker.req.NotFillCache, - RecordTimeStat: true, - RecordScanStat: true, - TaskId: worker.req.TaskID, + IsolationLevel: isolationLevelToPB(worker.req.IsolationLevel), + Priority: priorityToPB(worker.req.Priority), + NotFillCache: worker.req.NotFillCache, + RecordTimeStat: true, + RecordScanStat: true, + TaskId: worker.req.TaskID, + ResourceGroupTag: worker.req.ResourceGroupTag, }) req.StoreTp = getEndPointType(task.storeType) startTime := time.Now() diff --git a/store/mockstore/unistore/rpc.go b/store/mockstore/unistore/rpc.go index 378b6b23b56a4..1cde24dd5ed94 100644 --- a/store/mockstore/unistore/rpc.go +++ b/store/mockstore/unistore/rpc.go @@ -59,6 +59,9 @@ type RPCClient struct { rpcCli Client } +// UnistoreRPCClientSendHook exports for test. +var UnistoreRPCClientSendHook func(*tikvrpc.Request) + // SendRequest sends a request to mock cluster. func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { failpoint.Inject("rpcServerBusy", func(val failpoint.Value) { @@ -67,6 +70,12 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R } }) + failpoint.Inject("unistoreRPCClientSendHook", func(val failpoint.Value) { + if val.(bool) && UnistoreRPCClientSendHook != nil { + UnistoreRPCClientSendHook(req) + } + }) + if req.StoreTp == tikvrpc.TiDB { return c.redirectRequestToRPCServer(ctx, addr, req, timeout) } diff --git a/store/tikv/scan.go b/store/tikv/scan.go index 4d5c87161daad..64b9a4728f551 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -211,9 +211,10 @@ func (s *Scanner) getData(bo *Backoffer) error { } s.snapshot.mu.RLock() req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdScan, sreq, s.snapshot.mu.replicaRead, &s.snapshot.replicaReadSeed, pb.Context{ - Priority: s.snapshot.priority.ToPB(), - NotFillCache: s.snapshot.notFillCache, - TaskId: s.snapshot.mu.taskID, + Priority: s.snapshot.priority.ToPB(), + NotFillCache: s.snapshot.notFillCache, + TaskId: s.snapshot.mu.taskID, + ResourceGroupTag: s.snapshot.resourceGroupTag, }) s.snapshot.mu.RUnlock() resp, err := sender.SendReq(bo, req, loc.Region, ReadTimeoutMedium) diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 701a90abf5f09..9828537b7cb79 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -312,9 +312,10 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collec Keys: pending, Version: s.version, }, s.mu.replicaRead, &s.replicaReadSeed, pb.Context{ - Priority: s.priority.ToPB(), - NotFillCache: s.notFillCache, - TaskId: s.mu.taskID, + Priority: s.priority.ToPB(), + NotFillCache: s.notFillCache, + TaskId: s.mu.taskID, + ResourceGroupTag: s.resourceGroupTag, }) isStaleness = s.mu.isStaleness matchStoreLabels = s.mu.matchStoreLabels @@ -464,9 +465,10 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte, Key: k, Version: s.version, }, s.mu.replicaRead, &s.replicaReadSeed, pb.Context{ - Priority: s.priority.ToPB(), - NotFillCache: s.notFillCache, - TaskId: s.mu.taskID, + Priority: s.priority.ToPB(), + NotFillCache: s.notFillCache, + TaskId: s.mu.taskID, + ResourceGroupTag: s.resourceGroupTag, }) isStaleness = s.mu.isStaleness matchStoreLabels = s.mu.matchStoreLabels From 81eda6e47249e2946b2b85830c500549f855bf94 Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 24 May 2021 21:27:44 +0800 Subject: [PATCH 10/15] add replace test Signed-off-by: crazycs --- executor/executor_test.go | 34 +++++++++++++++++++++++++--------- executor/update.go | 4 ++-- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index 969cbd20a6587..ca217d91c11ff 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8317,6 +8317,7 @@ func (s *testResourceTagSuite) TestResourceGroupTag(c *C) { defer failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/unistoreRPCClientSendHook") var sqlDigest, planDigest *parser.Digest + checkFn := func() {} unistore.UnistoreRPCClientSendHook = func(req *tikvrpc.Request) { var startKey []byte var ctx *kvrpcpb.Context @@ -8358,6 +8359,7 @@ func (s *testResourceTagSuite) TestResourceGroupTag(c *C) { c.Assert(err, IsNil) sqlDigest = parser.NewDigest(tag.SqlDigest) planDigest = parser.NewDigest(tag.PlanDigest) + checkFn() } resetVars := func() { @@ -8377,10 +8379,32 @@ func (s *testResourceTagSuite) TestResourceGroupTag(c *C) { {sql: "begin pessimistic", ignore: true}, {sql: "insert into t values(4,4)"}, {sql: "commit", ignore: true}, + {sql: "update t set a=5,b=5 where a=5"}, + {sql: "replace into t values(6,6)"}, } for _, ca := range cases { resetVars() commentf := Commentf("%v", ca.sql) + + _, expectSQLDigest := parser.NormalizeDigest(ca.sql) + var expectPlanDigest *parser.Digest + checkCnt := 0 + checkFn = func() { + if ca.ignore { + return + } + if expectPlanDigest == nil { + info := tk.Se.ShowProcess() + c.Assert(info, NotNil) + p, ok := info.Plan.(plannercore.Plan) + c.Assert(ok, IsTrue) + _, expectPlanDigest = plannercore.NormalizePlan(p) + } + c.Assert(sqlDigest.String(), Equals, expectSQLDigest.String(), commentf) + c.Assert(planDigest.String(), Equals, expectPlanDigest.String()) + checkCnt++ + } + if strings.HasPrefix(ca.sql, "select") { tk.MustQuery(ca.sql) } else { @@ -8389,14 +8413,6 @@ func (s *testResourceTagSuite) TestResourceGroupTag(c *C) { if ca.ignore { continue } - _, expectSQLDigest := parser.NormalizeDigest(ca.sql) - c.Assert(sqlDigest.String(), Equals, expectSQLDigest.String(), commentf) - - info := tk.Se.ShowProcess() - c.Assert(info, NotNil) - p, ok := info.Plan.(plannercore.Plan) - c.Assert(ok, IsTrue) - _, expectPlanDigest := plannercore.NormalizePlan(p) - c.Assert(planDigest.String(), Equals, expectPlanDigest.String()) + c.Assert(checkCnt > 0, IsTrue, commentf) } } diff --git a/executor/update.go b/executor/update.go index 0d74d9a1218c2..954aa43c8067c 100644 --- a/executor/update.go +++ b/executor/update.go @@ -259,13 +259,13 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) { memUsageOfChk = chk.MemoryUsage() e.memTracker.Consume(memUsageOfChk) if e.collectRuntimeStatsEnabled() { - txn, err := e.ctx.Txn(false) + txn, err := e.ctx.Txn(true) if err == nil && txn.GetSnapshot() != nil { txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) } } if config.TopSQLEnabled() { - txn, err := e.ctx.Txn(false) + txn, err := e.ctx.Txn(true) if err == nil { txn.SetOption(kv.ResourceGroupTag, e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTag()) } From 14124b76c867e7a3c75a963009c6d7eb6d58cd16 Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 24 May 2021 21:33:54 +0800 Subject: [PATCH 11/15] fix test Signed-off-by: crazycs --- .../resource_group_tag_test.go | 23 ++----------------- 1 file changed, 2 insertions(+), 21 deletions(-) diff --git a/util/resourcegrouptag/resource_group_tag_test.go b/util/resourcegrouptag/resource_group_tag_test.go index 4b9582194ad57..f5334aacbd17f 100644 --- a/util/resourcegrouptag/resource_group_tag_test.go +++ b/util/resourcegrouptag/resource_group_tag_test.go @@ -15,15 +15,12 @@ package resourcegrouptag import ( "crypto/sha256" - "encoding/binary" - "encoding/hex" - "errors" - "fmt" "math/rand" "testing" . "github.com/pingcap/check" "github.com/pingcap/parser" + "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tipb/go-tipb" ) @@ -75,29 +72,13 @@ func genRandHex(length int) []byte { func genDigest(str string) []byte { hasher := sha256.New() - hasher.Write([]byte(str)) + hasher.Write(hack.Slice(str)) return hasher.Sum(nil) } func (s *testUtilsSuite) TestResourceGroupTagEncodingPB(c *C) { digest1 := genDigest("abc") digest2 := genDigest("abcdefg") - // Test for manualEncode - data := manualEncodeResourceGroupTag(digest1, digest2) - c.Assert(len(data), Equals, 69) - sqlDigest, planDigest, err := manualDecodeResourceGroupTag(data) - c.Assert(err, IsNil) - c.Assert(sqlDigest, DeepEquals, digest1) - c.Assert(planDigest, DeepEquals, digest2) - - // Test for manualEncode sql_digest only - data = manualEncodeResourceGroupTag(digest1, nil) - c.Assert(len(data), Equals, 35) - sqlDigest, planDigest, err = manualDecodeResourceGroupTag(data) - c.Assert(err, IsNil) - c.Assert(sqlDigest, DeepEquals, digest1) - c.Assert(planDigest, IsNil) - // Test for protobuf resourceTag := &tipb.ResourceGroupTag{ SqlDigest: digest1, From 7a689848b32681314deaaad6be2b1faa63cd0d16 Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 25 May 2021 11:38:41 +0800 Subject: [PATCH 12/15] update go.mod for parser, tipb Signed-off-by: crazycs --- go.mod | 9 ++------- go.sum | 12 ++++-------- 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index 8d60f8d47602c..d9c91b3faf122 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,6 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334 github.com/joho/sqltocsv v0.0.0-20210208114054-cb2c3a95fb99 // indirect - github.com/klauspost/cpuid v1.2.1 github.com/kr/text v0.2.0 // indirect github.com/mattn/go-runewidth v0.0.10 // indirect github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 @@ -47,10 +46,10 @@ require ( github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 github.com/pingcap/kvproto v0.0.0-20210507054410-a8152f8a876c github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 - github.com/pingcap/parser v0.0.0-20210518053259-92fa6fe07eb6 + github.com/pingcap/parser v0.0.0-20210525032559-c37778aff307 github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible - github.com/pingcap/tipb v0.0.0-20210422074242-57dd881b81b1 + github.com/pingcap/tipb v0.0.0-20210525032549-b80be13ddf6c github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 @@ -92,7 +91,3 @@ go 1.13 // Fix panic in unit test with go >= 1.14, ref: etcd-io/bbolt#201 https://github.com/etcd-io/bbolt/pull/201 replace go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.5 - -replace github.com/pingcap/tipb => github.com/crazycs520/tipb v0.0.0-20210521015927-2b3fd46a9cb4 - -replace github.com/pingcap/parser => github.com/crazycs520/parser v0.0.0-20210521055836-aac8c026347f diff --git a/go.sum b/go.sum index 16e37153624e9..a5fe95ddc4b10 100644 --- a/go.sum +++ b/go.sum @@ -111,10 +111,6 @@ github.com/corona10/goimagehash v1.0.2/go.mod h1:/l9umBhvcHQXVtQO1V6Gp1yD20STawk github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= -github.com/crazycs520/parser v0.0.0-20210521055836-aac8c026347f h1:rysp5oQNInNDCQajyHhskf7hqozsfMS71riVoAwf+H4= -github.com/crazycs520/parser v0.0.0-20210521055836-aac8c026347f/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= -github.com/crazycs520/tipb v0.0.0-20210521015927-2b3fd46a9cb4 h1:kYQX8FiFN02M17hGg0jcsM1/A3L6jTOtd/HO7i12bms= -github.com/crazycs520/tipb v0.0.0-20210521015927-2b3fd46a9cb4/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cznic/golex v0.0.0-20181122101858-9c343928389c/go.mod h1:+bmmJDNmKlhWNG+gwWCkaBoTy39Fs+bzRxVBzoTQbIc= @@ -447,16 +443,16 @@ github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIf github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= -github.com/pingcap/parser v0.0.0-20210518053259-92fa6fe07eb6 h1:wsH3psMH5ksDowsN9VUE9ZqSrX6oF4AYQQfOunkvSfU= -github.com/pingcap/parser v0.0.0-20210518053259-92fa6fe07eb6/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= +github.com/pingcap/parser v0.0.0-20210525032559-c37778aff307 h1:v7SipssMu4X1tVQOe3PIVE73keJNHCFXe4Cza5uNDZ8= +github.com/pingcap/parser v0.0.0-20210525032559-c37778aff307/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 h1:A9KL9R+lWSVPH8IqUuH1QSTRJ5FGoY1bT2IcfPKsWD8= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8= github.com/pingcap/tidb-dashboard v0.0.0-20210312062513-eef5d6404638/go.mod h1:OzFN8H0EDMMqeulPhPMw2i2JaiZWOKFQ7zdRPhENNgo= github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible h1:ceznmu/lLseGHP/jKyOa/3u/5H3wtLLLqkH2V3ssSjg= github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tipb v0.0.0-20210422074242-57dd881b81b1 h1:Kcp3jIcQrqG+pT1JQ0oWyRncVKQtDgnMFzRt3zJBaBo= -github.com/pingcap/tipb v0.0.0-20210422074242-57dd881b81b1/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo= +github.com/pingcap/tipb v0.0.0-20210525032549-b80be13ddf6c h1:El3pMBpJHuSkItkHsnBqsaaHzJwFBNDt3Aul98AhREY= +github.com/pingcap/tipb v0.0.0-20210525032549-b80be13ddf6c/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= From aa98e5a03bf3a79f88c1a126b9ae5b88a5c45a18 Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 25 May 2021 12:53:08 +0800 Subject: [PATCH 13/15] fix test Signed-off-by: crazycs --- bindinfo/bind_test.go | 9 +++++---- infoschema/tables_test.go | 4 ++-- planner/core/plan_test.go | 6 +++--- session/session_test.go | 2 +- sessionctx/stmtctx/stmtctx.go | 5 ++++- sessionctx/variable/session_test.go | 2 +- 6 files changed, 16 insertions(+), 12 deletions(-) diff --git a/bindinfo/bind_test.go b/bindinfo/bind_test.go index 22b60187f0a77..4175ddce77eb9 100644 --- a/bindinfo/bind_test.go +++ b/bindinfo/bind_test.go @@ -156,7 +156,8 @@ func normalizeWithDefaultDB(c *C, sql, db string) (string, string) { testParser := parser.New() stmt, err := testParser.ParseOneStmt(sql, "", "") c.Assert(err, IsNil) - return parser.NormalizeDigest(utilparser.RestoreWithDefaultDB(stmt, "test", "")) + normalized, digest := parser.NormalizeDigest(utilparser.RestoreWithDefaultDB(stmt, "test", "")) + return normalized, digest.String() } func (s *testSuite) TestBindParse(c *C) { @@ -182,7 +183,7 @@ func (s *testSuite) TestBindParse(c *C) { c.Check(bindHandle.Size(), Equals, 1) sql, hash := parser.NormalizeDigest("select * from test . t") - bindData := bindHandle.GetBindRecord(hash, sql, "test") + bindData := bindHandle.GetBindRecord(hash.String(), sql, "test") c.Check(bindData, NotNil) c.Check(bindData.OriginalSQL, Equals, "select * from `test` . `t`") bind := bindData.Bindings[0] @@ -656,7 +657,7 @@ func (s *testSuite) TestBindingSymbolList(c *C) { // Normalize sql, hash := parser.NormalizeDigest("select a, b from test . t where a = 1 limit 0, 1") - bindData := s.domain.BindHandle().GetBindRecord(hash, sql, "test") + bindData := s.domain.BindHandle().GetBindRecord(hash.String(), sql, "test") c.Assert(bindData, NotNil) c.Check(bindData.OriginalSQL, Equals, "select `a` , `b` from `test` . `t` where `a` = ? limit ...") bind := bindData.Bindings[0] @@ -776,7 +777,7 @@ func (s *testSuite) TestErrorBind(c *C) { c.Assert(err, IsNil, Commentf("err %v", err)) sql, hash := parser.NormalizeDigest("select * from test . t where i > ?") - bindData := s.domain.BindHandle().GetBindRecord(hash, sql, "test") + bindData := s.domain.BindHandle().GetBindRecord(hash.String(), sql, "test") c.Check(bindData, NotNil) c.Check(bindData.OriginalSQL, Equals, "select * from `test` . `t` where `i` > ?") bind := bindData.Bindings[0] diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index 2d6506b56d5f4..761bb75fb76ed 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -1517,7 +1517,7 @@ func (s *testTableSuite) TestTrx(c *C) { sm := &mockSessionManager{nil, make([]*txninfo.TxnInfo, 1)} sm.txnInfo[0] = &txninfo.TxnInfo{ StartTS: 424768545227014155, - CurrentSQLDigest: digest, + CurrentSQLDigest: digest.String(), State: txninfo.TxnRunningNormal, BlockStartTime: nil, EntriesCount: 1, @@ -1528,7 +1528,7 @@ func (s *testTableSuite) TestTrx(c *C) { } tk.Se.SetSessionManager(sm) tk.MustQuery("select * from information_schema.TIDB_TRX;").Check( - testkit.Rows("424768545227014155 2021-05-07 12:56:48 " + digest + " Normal 1 19 2 root test"), + testkit.Rows("424768545227014155 2021-05-07 12:56:48 " + digest.String() + " Normal 1 19 2 root test"), ) } diff --git a/planner/core/plan_test.go b/planner/core/plan_test.go index 53f63f25fbc18..4634be3e7f853 100644 --- a/planner/core/plan_test.go +++ b/planner/core/plan_test.go @@ -171,12 +171,12 @@ func (s *testPlanNormalize) TestNormalizedPlanForDiffStore(c *C) { normalizedPlanRows := getPlanRows(normalizedPlan) c.Assert(err, IsNil) s.testData.OnRecord(func() { - output[i].Digest = digest + output[i].Digest = digest.String() output[i].Plan = normalizedPlanRows }) compareStringSlice(c, normalizedPlanRows, output[i].Plan) - c.Assert(digest != lastDigest, IsTrue) - lastDigest = digest + c.Assert(digest.String() != lastDigest, IsTrue) + lastDigest = digest.String() } } diff --git a/session/session_test.go b/session/session_test.go index 9d2d63cb02804..b8d8538193c7d 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -4390,7 +4390,7 @@ func (s *testTxnStateSuite) TestBasic(c *C) { tk.MustExec("select * from t for update;") info = tk.Se.TxnInfo() _, expectedDigest := parser.NormalizeDigest("select * from t for update;") - c.Assert(info.CurrentSQLDigest, Equals, expectedDigest) + c.Assert(info.CurrentSQLDigest, Equals, expectedDigest.String()) c.Assert(info.State, Equals, txninfo.TxnRunningNormal) c.Assert(info.BlockStartTime, IsNil) // len and size will be covered in TestLenAndSize diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index c6333bc47d820..3b726e85bf11d 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -256,7 +256,10 @@ func (sc *StatementContext) GetResourceGroupTag() []byte { if len(sc.resourceGroupTag) > 0 { return sc.resourceGroupTag } - _, sqlDigest := sc.SQLDigest() + normalized, sqlDigest := sc.SQLDigest() + if len(normalized) == 0 { + return nil + } sc.resourceGroupTag = resourcegrouptag.EncodeResourceGroupTag(sqlDigest, sc.planDigest) return sc.resourceGroupTag } diff --git a/sessionctx/variable/session_test.go b/sessionctx/variable/session_test.go index ee9030056bf05..00b728557d188 100644 --- a/sessionctx/variable/session_test.go +++ b/sessionctx/variable/session_test.go @@ -231,7 +231,7 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) { logItems := &variable.SlowQueryLogItems{ TxnTS: txnTS, SQL: sql, - Digest: digest, + Digest: digest.String(), TimeTotal: costTime, TimeParse: time.Duration(10), TimeCompile: time.Duration(10), From dbecfaf8d83f43951a8e876167d0f73e741baeb1 Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 25 May 2021 13:04:09 +0800 Subject: [PATCH 14/15] fix lint and test Signed-off-by: crazycs --- executor/adapter.go | 4 ++-- executor/partition_table_test.go | 8 ++++---- planner/core/plan_test.go | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 4db42710456b2..c5d9b0406602c 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -1015,11 +1015,11 @@ func getPlanTree(sctx sessionctx.Context, p plannercore.Plan) string { // getPlanDigest will try to get the select plan tree if the plan is select or the select plan of delete/update/insert statement. func getPlanDigest(sctx sessionctx.Context, p plannercore.Plan) string { sc := sctx.GetSessionVars().StmtCtx - normalized, planDigest := sc.GetPlanDigest() + _, planDigest := sc.GetPlanDigest() if planDigest != nil { return planDigest.String() } - normalized, planDigest = plannercore.NormalizePlan(p) + normalized, planDigest := plannercore.NormalizePlan(p) sc.SetPlanDigest(normalized, planDigest) return planDigest.String() } diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index b9823a32a647e..82ec887ce2692 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -122,7 +122,7 @@ func (s *partitionTableSuite) TestPointGetwithRangeAndListPartitionTable(c *C) { tk.MustExec("set @@session.tidb_enable_list_partition = ON") // list partition table - tk.MustExec(`create table tlist(a int, b int, unique index idx_a(a), index idx_b(b)) partition by list(a)( + tk.MustExec(`create table tlist(a int, b int, unique index idx_a(a), index idx_b(b)) partition by list(a)( partition p0 values in (NULL, 1, 2, 3, 4), partition p1 values in (5, 6, 7, 8), partition p2 values in (9, 10, 11, 12));`) @@ -172,15 +172,15 @@ func (s *partitionTableSuite) TestPointGetwithRangeAndListPartitionTable(c *C) { } // test table dual - queryRange1 := fmt.Sprintf("select a from trange1 where a=200") + queryRange1 := "select a from trange1 where a=200" c.Assert(tk.HasPlan(queryRange1, "TableDual"), IsTrue) // check if TableDual is used tk.MustQuery(queryRange1).Check(testkit.Rows()) - queryRange2 := fmt.Sprintf("select a from trange2 where a=200") + queryRange2 := "select a from trange2 where a=200" c.Assert(tk.HasPlan(queryRange2, "TableDual"), IsTrue) // check if TableDual is used tk.MustQuery(queryRange2).Check(testkit.Rows()) - queryList := fmt.Sprintf("select a from tlist where a=200") + queryList := "select a from tlist where a=200" c.Assert(tk.HasPlan(queryList, "TableDual"), IsTrue) // check if TableDual is used tk.MustQuery(queryList).Check(testkit.Rows()) } diff --git a/planner/core/plan_test.go b/planner/core/plan_test.go index 4634be3e7f853..6c29eef90f5ae 100644 --- a/planner/core/plan_test.go +++ b/planner/core/plan_test.go @@ -404,10 +404,10 @@ func testNormalizeDigest(tk *testkit.TestKit, c *C, sql1, sql2 string, isSame bo comment := Commentf("sql1: %v, sql2: %v\n%v !=\n%v\n", sql1, sql2, normalized1, normalized2) if isSame { c.Assert(normalized1, Equals, normalized2, comment) - c.Assert(digest1, Equals, digest2, comment) + c.Assert(digest1.String(), Equals, digest2.String(), comment) } else { c.Assert(normalized1 != normalized2, IsTrue, comment) - c.Assert(digest1 != digest2, IsTrue, comment) + c.Assert(digest1.String() != digest2.String(), IsTrue, comment) } } From d5fd72f0e26d2aac810a722f17fe419ac3479cf0 Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 25 May 2021 14:23:04 +0800 Subject: [PATCH 15/15] fix race Signed-off-by: crazycs --- sessionctx/stmtctx/stmtctx.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 3b726e85bf11d..ea8bd70b8c0f2 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -167,7 +167,7 @@ type StatementContext struct { // stmtCache is used to store some statement-related values. stmtCache map[StmtCacheKey]interface{} // resourceGroupTag cache for the current statement resource group tag. - resourceGroupTag []byte + resourceGroupTag atomic.Value } // StmtHints are SessionVars related sql hints. @@ -253,15 +253,17 @@ func (sc *StatementContext) GetPlanDigest() (normalized string, planDigest *pars // GetResourceGroupTag gets the resource group of the statement. func (sc *StatementContext) GetResourceGroupTag() []byte { - if len(sc.resourceGroupTag) > 0 { - return sc.resourceGroupTag + tag, _ := sc.resourceGroupTag.Load().([]byte) + if len(tag) > 0 { + return tag } normalized, sqlDigest := sc.SQLDigest() if len(normalized) == 0 { return nil } - sc.resourceGroupTag = resourcegrouptag.EncodeResourceGroupTag(sqlDigest, sc.planDigest) - return sc.resourceGroupTag + tag = resourcegrouptag.EncodeResourceGroupTag(sqlDigest, sc.planDigest) + sc.resourceGroupTag.Store(tag) + return tag } // SetPlanDigest sets the normalized plan and plan digest.