Skip to content


Merge branch 'master' of into wait-ts…
Browse files Browse the repository at this point in the history
  • Loading branch information
TonsnakeLin committed Dec 13, 2022
2 parents 1324e27 + 509fe6d commit 37ebf08
Show file tree
Hide file tree
Showing 112 changed files with 13,852 additions and 10,657 deletions.
105 changes: 0 additions & 105 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1388,108 +1388,3 @@ func TestDropBindBySQLDigest(t *testing.T) {
tk.MustGetErrMsg(fmt.Sprintf("drop binding for sql digest '%s'", "1"), "can't find any binding for '1'")
tk.MustGetErrMsg(fmt.Sprintf("drop binding for sql digest '%s'", ""), "sql digest is empty")

func TestCreateBindingFromHistory(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t1(id int primary key, a int, b int, key(a))")
tk.MustExec("create table t2(id int primary key, a int, b int, key(a))")

var testCases = []struct {
sqls []string
hint string
sqls: []string{
"select %s * from t1, t2 where =",
"select %s * from test.t1, t2 where =",
"select %s * from test.t1, test.t2 where =",
"select %s * from t1, test.t2 where =",
hint: "/*+ merge_join(t1, t2) */",
sqls: []string{
"select %s * from t1 where a = 1",
"select %s * from test.t1 where a = 1",
hint: "/*+ ignore_index(t, a) */",

for _, testCase := range testCases {
for _, bind := range testCase.sqls {
bindSQL := fmt.Sprintf(bind, testCase.hint)
planDigest := tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", bindSQL)).Rows()
tk.MustExec(fmt.Sprintf("create session binding from history using plan digest '%s'", planDigest[0][0]))
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 1)
require.Equal(t, planDigest[0][0], showRes[0][10])
for _, sql := range testCase.sqls {
tk.MustExec(fmt.Sprintf(sql, ""))
tk.MustQuery("select @@last_plan_from_binding").Check(testkit.Rows("1"))
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 1)
tk.MustExec(fmt.Sprintf("drop binding for sql digest '%s'", showRes[0][9]))

// exception cases
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", "1"), "can't find any plans for '1'")
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", ""), "plan digest is empty")
tk.MustExec("create binding for select * from t1, t2 where = using select /*+ merge_join(t1, t2) */ * from t1, t2 where =")
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, showRes[0][10], "") // plan digest should be nil by create for

func TestCreateBindingForPrepareFromHistory(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id int primary key, a int, key(a))")

tk.MustExec("prepare stmt from 'select /*+ ignore_index(t,a) */ * from t where a = ?'")
tk.MustExec("set @a = 1")
tk.MustExec("execute stmt using @a")
planDigest := tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", "select /*+ ignore_index(t,a) */ * from t where a = ? [arguments: 1]")).Rows()
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 0)
tk.MustExec(fmt.Sprintf("create binding from history using plan digest '%s'", planDigest[0][0]))
showRes = tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 1)
require.Equal(t, planDigest[0][0], showRes[0][10])
tk.MustExec("execute stmt using @a")
tk.MustQuery("select @@last_plan_from_binding").Check(testkit.Rows("1"))

func TestErrorCasesCreateBindingFromHistory(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))

tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2, t3")
tk.MustExec("create table t1(id int)")
tk.MustExec("create table t2(id int)")
tk.MustExec("create table t3(id int)")

sql := "select * from t1 where in (select id from t2)"
planDigest := tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", sql)).Rows()
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", planDigest[0][0]), "can't create binding for query with sub query")

sql = "select * from t1, t2, t3 where = and ="
planDigest = tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", sql)).Rows()
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", planDigest[0][0]), "can't create binding for query with more than two table join")
5 changes: 2 additions & 3 deletions br/pkg/storage/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
Expand Down Expand Up @@ -70,7 +69,7 @@ func TestMemStoreBasic(t *testing.T) {
require.Nil(t, err)
r2, err := store.Open(ctx, "/hello.txt")
require.Nil(t, err)
fileContent, err = ioutil.ReadAll(r)
fileContent, err = io.ReadAll(r)
require.Nil(t, err)
require.True(t, bytes.Equal([]byte("hello world 3"), fileContent))
require.Nil(t, r.Close())
Expand All @@ -83,7 +82,7 @@ func TestMemStoreBasic(t *testing.T) {

_, err = r2.Seek(5, io.SeekStart)
require.Nil(t, err)
fileContent, err = ioutil.ReadAll(r2)
fileContent, err = io.ReadAll(r2)
require.Nil(t, err)
require.True(t, bytes.Equal([]byte(" world 3"), fileContent))

Expand Down
6 changes: 3 additions & 3 deletions cmd/explaintest/r/index_merge.result
Original file line number Diff line number Diff line change
Expand Up @@ -455,9 +455,9 @@ c1 c2 c3
///// MEMORY Table
explain select count(c1) from (select /*+ use_index_merge(t_alias), stream_agg() */ count(1) c1 from information_schema.statements_summary where sum_latency >= 0 or max_latency >= 0 order by 1) dt;
id estRows task access object operator info
StreamAgg_10 1.00 root funcs:count(Column#93)->Column#94
└─Sort_11 1.00 root Column#93
└─StreamAgg_14 1.00 root funcs:count(1)->Column#93
StreamAgg_10 1.00 root funcs:count(Column#96)->Column#97
└─Sort_11 1.00 root Column#96
└─StreamAgg_14 1.00 root funcs:count(1)->Column#96
└─MemTableScan_18 10000.00 root table:STATEMENTS_SUMMARY
show warnings;
Level Code Message
Expand Down
3 changes: 0 additions & 3 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ go_test(
Expand Down Expand Up @@ -234,7 +233,6 @@ go_test(
Expand All @@ -256,7 +254,6 @@ go_test(
Expand Down
5 changes: 2 additions & 3 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,7 @@ func loadDDLReorgVars(ctx context.Context, sessPool *sessionPool) error {
return ddlutil.LoadDDLReorgVars(ctx, sCtx)

func makeupDecodeColMap(sessCtx sessionctx.Context, t table.Table) (map[int64]decoder.Column, error) {
dbName := model.NewCIStr(sessCtx.GetSessionVars().CurrentDB)
func makeupDecodeColMap(sessCtx sessionctx.Context, dbName model.CIStr, t table.Table) (map[int64]decoder.Column, error) {
writableColInfos := make([]*model.ColumnInfo, 0, len(t.WritableCols()))
for _, col := range t.WritableCols() {
writableColInfos = append(writableColInfos, col.ColumnInfo)
Expand Down Expand Up @@ -860,7 +859,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic

startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey
sessCtx := newContext(
decodeColMap, err := makeupDecodeColMap(sessCtx, t)
decodeColMap, err := makeupDecodeColMap(sessCtx, reorgInfo.dbInfo.Name, t)
if err != nil {
return errors.Trace(err)
Expand Down
6 changes: 5 additions & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,11 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J
oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) {
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, BuildElements(changingCol, changingIdxs), false)
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return false, ver, errors.Trace(err)
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
Expand Down
8 changes: 8 additions & 0 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1712,6 +1712,14 @@ func TestCreateExpressionIndex(t *testing.T) {
require.NoError(t, checkErr)
tk.MustExec("admin check table t")
tk.MustQuery("select * from t order by a, b").Check(testkit.Rows("0 9", "0 11", "0 11", "1 7", "2 7", "5 7", "8 8", "10 10", "10 10"))

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(name varchar(20))")
tk.MustExec("insert into t values ('Abc'), ('Bcd'), ('abc')")
tk.MustExec("create index idx on test.t((lower(")
tk.MustExec("admin check table t")

func TestCreateUniqueExpressionIndex(t *testing.T) {
Expand Down
6 changes: 5 additions & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,11 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) {
elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, elements, mergingTmpIdx)
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return false, ver, errors.Trace(err)
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, dbInfo, tbl, elements, mergingTmpIdx)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
Expand Down
4 changes: 2 additions & 2 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,8 +719,8 @@ func TestMultiSchemaChangeAddDropIndexes(t *testing.T) {
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int, index (a), index(b), index(c));")
tk.MustExec("insert into t values (1, 2, 3);")
tk.MustExec("alter table t add index aa(a), drop index a, add index cc(c), drop index b, drop index c, add index bb(b);")
tk.MustQuery("select * from t use index(aa, bb, cc);").Check(testkit.Rows("1 2 3"))
tk.MustExec("alter table t add index xa(a), drop index a, add index xc(c), drop index b, drop index c, add index xb(b);")
tk.MustQuery("select * from t use index(xa, xb, xc);").Check(testkit.Rows("1 2 3"))
tk.MustGetErrCode("select * from t use index(a);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index(b);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index(c);", errno.ErrKeyDoesNotExist)
Expand Down
6 changes: 5 additions & 1 deletion ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1743,6 +1743,10 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
if err != nil {
return ver, errors.Trace(err)
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
// If table has global indexes, we need reorg to clean up them.
if pt, ok := tbl.(table.PartitionedTable); ok && hasGlobalIndex(tblInfo) {
// Build elements for compatible with modify column type. elements will not be used when reorganizing.
Expand All @@ -1753,7 +1757,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job), d, rh, job, tbl, physicalTableIDs, elements)
reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job), d, rh, job, dbInfo, tbl, physicalTableIDs, elements)

if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
Expand Down
2 changes: 1 addition & 1 deletion ddl/placement_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ func checkTiflashReplicaSet(t *testing.T, do *domain.Domain, db, tb string, cnt

CheckPlacementRule(infosync.GetMockTiFlash(), *infosync.MakeNewRule(tbl.Meta().ID, 1, nil))
infosync.GetMockTiFlash().CheckPlacementRule(*infosync.MakeNewRule(tbl.Meta().ID, 1, nil))
require.NotNil(t, tiflashReplica)
require.Equal(t, cnt, tiflashReplica.Count)
Expand Down
7 changes: 5 additions & 2 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ type reorgInfo struct {
// PhysicalTableID is used to trace the current partition we are handling.
// If the table is not partitioned, PhysicalTableID would be TableID.
PhysicalTableID int64
dbInfo *model.DBInfo
elements []*meta.Element
currElement *meta.Element
Expand Down Expand Up @@ -585,7 +586,7 @@ func getValidCurrentVersion(store kv.Storage) (ver kv.Version, err error) {
return ver, nil

func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job,
func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, dbInfo *model.DBInfo,
tbl table.Table, elements []*meta.Element, mergingTmpIdx bool) (*reorgInfo, error) {
var (
element *meta.Element
Expand Down Expand Up @@ -685,11 +686,12 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job,
info.currElement = element
info.elements = elements
info.mergingTmpIdx = mergingTmpIdx
info.dbInfo = dbInfo

return &info, nil

func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) {
func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, dbInfo *model.DBInfo, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) {
var (
element *meta.Element
start kv.Key
Expand Down Expand Up @@ -745,6 +747,7 @@ func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, jo
info.PhysicalTableID = pid
info.currElement = element
info.elements = elements
info.dbInfo = dbInfo

return &info, nil
Expand Down
35 changes: 35 additions & 0 deletions ddl/tiflashtest/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

name = "tiflashtest_test",
srcs = [
flaky = True,
deps = [

0 comments on commit 37ebf08

Please sign in to comment.