From 3246a21641432ba18ad709db7e09d9eb0134a868 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 28 Feb 2023 21:57:09 +0800 Subject: [PATCH 01/13] executor: fix revoke USAGE (#41774) (#41780) close pingcap/tidb#41773 --- executor/grant.go | 13 +++---------- executor/revoke.go | 3 +++ executor/revoke_test.go | 16 ++++++++++++++++ 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/executor/grant.go b/executor/grant.go index 7db7cc2227913..b22755e39f8b3 100644 --- a/executor/grant.go +++ b/executor/grant.go @@ -444,6 +444,9 @@ func (e *GrantExec) grantLevelPriv(priv *ast.PrivElem, user *ast.UserSpec, inter if priv.Priv == mysql.ExtendedPriv { return e.grantDynamicPriv(priv.Name, user, internalSession) } + if priv.Priv == mysql.UsagePriv { + return nil + } switch e.Level.Level { case ast.GrantLevelGlobal: return e.grantGlobalLevel(priv, user, internalSession) @@ -481,10 +484,6 @@ func (e *GrantExec) grantDynamicPriv(privName string, user *ast.UserSpec, intern // grantGlobalLevel manipulates mysql.user table. func (e *GrantExec) grantGlobalLevel(priv *ast.PrivElem, user *ast.UserSpec, internalSession sessionctx.Context) error { - if priv.Priv == 0 || priv.Priv == mysql.UsagePriv { - return nil - } - sql := new(strings.Builder) sqlexec.MustFormatSQL(sql, `UPDATE %n.%n SET `, mysql.SystemDB, mysql.UserTable) err := composeGlobalPrivUpdate(sql, priv.Priv, "Y") @@ -499,9 +498,6 @@ func (e *GrantExec) grantGlobalLevel(priv *ast.PrivElem, user *ast.UserSpec, int // grantDBLevel manipulates mysql.db table. func (e *GrantExec) grantDBLevel(priv *ast.PrivElem, user *ast.UserSpec, internalSession sessionctx.Context) error { - if priv.Priv == mysql.UsagePriv { - return nil - } for _, v := range mysql.StaticGlobalOnlyPrivs { if v == priv.Priv { return ErrWrongUsage.GenWithStackByArgs("DB GRANT", "GLOBAL PRIVILEGES") @@ -534,9 +530,6 @@ func (e *GrantExec) grantDBLevel(priv *ast.PrivElem, user *ast.UserSpec, interna // grantTableLevel manipulates mysql.tables_priv table. func (e *GrantExec) grantTableLevel(priv *ast.PrivElem, user *ast.UserSpec, internalSession sessionctx.Context) error { - if priv.Priv == mysql.UsagePriv { - return nil - } dbName := e.Level.DBName if len(dbName) == 0 { dbName = e.ctx.GetSessionVars().CurrentDB diff --git a/executor/revoke.go b/executor/revoke.go index bfa17011dd874..3b36e36e78b94 100644 --- a/executor/revoke.go +++ b/executor/revoke.go @@ -178,6 +178,9 @@ func (e *RevokeExec) revokeOneUser(internalSession sessionctx.Context, user, hos } func (e *RevokeExec) revokePriv(internalSession sessionctx.Context, priv *ast.PrivElem, user, host string) error { + if priv.Priv == mysql.UsagePriv { + return nil + } switch e.Level.Level { case ast.GrantLevelGlobal: return e.revokeGlobalPriv(internalSession, priv, user, host) diff --git a/executor/revoke_test.go b/executor/revoke_test.go index b974016f3eec9..5ecd4264a7f3d 100644 --- a/executor/revoke_test.go +++ b/executor/revoke_test.go @@ -235,3 +235,19 @@ func TestRevokeOnNonExistTable(t *testing.T) { tk.MustExec("DROP TABLE t1;") tk.MustExec("REVOKE ALTER ON d1.t1 FROM issue28533;") } + +// Check https://github.com/pingcap/tidb/issues/41773. +func TestIssue41773(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table if not exists xx (id int)") + tk.MustExec("CREATE USER 't1234'@'%' IDENTIFIED BY 'sNGNQo12fEHe0n3vU';") + tk.MustExec("GRANT USAGE ON * TO 't1234'@'%';") + tk.MustExec("GRANT USAGE ON test.* TO 't1234'@'%';") + tk.MustExec("GRANT USAGE ON test.xx TO 't1234'@'%';") + tk.MustExec("REVOKE USAGE ON * FROM 't1234'@'%';") + tk.MustExec("REVOKE USAGE ON test.* FROM 't1234'@'%';") + tk.MustExec("REVOKE USAGE ON test.xx FROM 't1234'@'%';") +} From 17718a7c5d35b1119f2c271ba112fc10db653f69 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 1 Mar 2023 15:59:09 +0800 Subject: [PATCH 02/13] types: clip to zero when convert negative decimal to uint (#41791) (#41817) close pingcap/tidb#41736 --- types/convert.go | 5 ++--- types/convert_test.go | 13 +++++++++++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/types/convert.go b/types/convert.go index 9433f26dbefc8..8431ba2bb6638 100644 --- a/types/convert.go +++ b/types/convert.go @@ -247,7 +247,7 @@ func convertDecimalStrToUint(sc *stmtctx.StatementContext, str string, upperBoun if intStr == "" { intStr = "0" } - if sc.ShouldClipToZero() && intStr[0] == '-' { + if intStr[0] == '-' { return 0, overflow(str, tp) } @@ -256,8 +256,7 @@ func convertDecimalStrToUint(sc *stmtctx.StatementContext, str string, upperBoun round++ } - upperBound -= round - upperStr := strconv.FormatUint(upperBound, 10) + upperStr := strconv.FormatUint(upperBound-round, 10) if len(intStr) > len(upperStr) || (len(intStr) == len(upperStr) && intStr > upperStr) { return upperBound, overflow(str, tp) diff --git a/types/convert_test.go b/types/convert_test.go index b6c37e2488ac8..6bdd1001158cf 100644 --- a/types/convert_test.go +++ b/types/convert_test.go @@ -1275,8 +1275,9 @@ func TestConvertDecimalStrToUint(t *testing.T) { {"9223372036854775807.4999", 9223372036854775807, true}, {"18446744073709551614.55", 18446744073709551615, true}, {"18446744073709551615.344", 18446744073709551615, true}, - {"18446744073709551615.544", 0, false}, + {"18446744073709551615.544", 18446744073709551615, false}, {"-111.111", 0, false}, + {"-10000000000000000000.0", 0, false}, } for _, ca := range cases { result, err := convertDecimalStrToUint(&stmtctx.StatementContext{}, ca.input, math.MaxUint64, 0) @@ -1284,7 +1285,15 @@ func TestConvertDecimalStrToUint(t *testing.T) { require.Error(t, err) } else { require.NoError(t, err) - require.Equal(t, ca.result, result) } + require.Equal(t, ca.result, result, "input=%v", ca.input) } + + result, err := convertDecimalStrToUint(&stmtctx.StatementContext{}, "-99.0", math.MaxUint8, 0) + require.Error(t, err) + require.Equal(t, uint64(0), result) + + result, err = convertDecimalStrToUint(&stmtctx.StatementContext{}, "-100.0", math.MaxUint8, 0) + require.Error(t, err) + require.Equal(t, uint64(0), result) } From c7c468610d7f97b9e0e919d7bbb32ed3379fdab3 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 6 Mar 2023 17:21:11 +0800 Subject: [PATCH 03/13] sysvar: introduce variable tidb_enable_inl_join_inner_multi_pattern (#41319) (#41326) ref pingcap/tidb#40505 --- executor/index_advise_test.go | 133 +++++++++++++++++++++++ planner/core/exhaust_physical_plans.go | 141 ++++++++++++++++++++----- sessionctx/variable/session.go | 3 + sessionctx/variable/sysvar.go | 9 ++ sessionctx/variable/tidb_vars.go | 3 + 5 files changed, 263 insertions(+), 26 deletions(-) diff --git a/executor/index_advise_test.go b/executor/index_advise_test.go index 5371ecd051bc1..b0f93bd2b48ad 100644 --- a/executor/index_advise_test.go +++ b/executor/index_advise_test.go @@ -69,3 +69,136 @@ func TestIndexAdvise(t *testing.T) { require.Equal(t, uint64(4), ia.MaxIndexNum.PerTable) require.Equal(t, uint64(5), ia.MaxIndexNum.PerDB) } + +func TestIndexJoinProjPattern(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`create table t1( +pnbrn_cnaps varchar(5) not null, +new_accno varchar(18) not null, +primary key(pnbrn_cnaps,new_accno) nonclustered +);`) + tk.MustExec(`create table t2( +pnbrn_cnaps varchar(5) not null, +txn_accno varchar(18) not null, +txn_dt date not null, +yn_frz varchar(1) default null +);`) + tk.MustExec(`insert into t1(pnbrn_cnaps,new_accno) values ("40001","123")`) + tk.MustExec(`insert into t2(pnbrn_cnaps, txn_accno, txn_dt, yn_frz) values ("40001","123","20221201","0");`) + + sql := `update +/*+ inl_join(a) */ +t2 b, +( +select t1.pnbrn_cnaps, +t1.new_accno +from t1 +where t1.pnbrn_cnaps = '40001' +) a +set b.yn_frz = '1' +where b.txn_dt = str_to_date('20221201', '%Y%m%d') +and b.pnbrn_cnaps = a.pnbrn_cnaps +and b.txn_accno = a.new_accno;` + rows := [][]interface{}{ + {"Update_8"}, + {"└─IndexJoin_13"}, + {" ├─TableReader_23(Build)"}, + {" │ └─Selection_22"}, + {" │ └─TableFullScan_21"}, + {" └─IndexReader_12(Probe)"}, + {" └─Selection_11"}, + {" └─IndexRangeScan_10"}, + } + tk.MustExec("set @@session.tidb_enable_inl_join_inner_multi_pattern='ON'") + tk.MustQuery("explain "+sql).CheckAt([]int{0}, rows) + rows = [][]interface{}{ + {"Update_8"}, + {"└─HashJoin_10"}, + {" ├─IndexReader_17(Build)"}, + {" │ └─IndexRangeScan_16"}, + {" └─TableReader_14(Probe)"}, + {" └─Selection_13"}, + {" └─TableFullScan_12"}, + } + tk.MustExec("set @@session.tidb_enable_inl_join_inner_multi_pattern='OFF'") + tk.MustQuery("explain "+sql).CheckAt([]int{0}, rows) + + tk.MustExec("set @@session.tidb_enable_inl_join_inner_multi_pattern='ON'") + tk.MustExec(sql) + tk.MustQuery("select yn_frz from t2").Check(testkit.Rows("1")) +} + +func TestIndexJoinSelPattern(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(` create table tbl_miss( +id bigint(20) unsigned not null +,txn_dt date default null +,perip_sys_uuid varchar(32) not null +,rvrs_idr varchar(1) not null +,primary key(id) clustered +,key idx1 (txn_dt, perip_sys_uuid, rvrs_idr) +); +`) + tk.MustExec(`insert into tbl_miss (id,txn_dt,perip_sys_uuid,rvrs_idr) values (1,"20221201","123","1");`) + tk.MustExec(`create table tbl_src( +txn_dt date default null +,uuid varchar(32) not null +,rvrs_idr char(1) +,expd_inf varchar(5000) +,primary key(uuid,rvrs_idr) nonclustered +); +`) + tk.MustExec(`insert into tbl_src (txn_dt,uuid,rvrs_idr) values ("20221201","123","1");`) + sql := `select /*+ use_index(mis,) inl_join(src) */ + * + from tbl_miss mis + ,tbl_src src + where src.txn_dt >= str_to_date('20221201', '%Y%m%d') + and mis.id between 1 and 10000 + and mis.perip_sys_uuid = src.uuid + and mis.rvrs_idr = src.rvrs_idr + and mis.txn_dt = src.txn_dt + and ( + case when isnull(src.expd_inf) = 1 then '' + else + substr(concat_ws('',src.expd_inf,'~~'), + instr(concat_ws('',src.expd_inf,'~~'),'~~a4') + 4, + instr(substr(concat_ws('',src.expd_inf,'~~'), + instr(concat_ws('',src.expd_inf,'~~'),'~~a4') + 4, length(concat_ws('',src.expd_inf,'~~'))),'~~') -1) + end + ) != '01';` + rows := [][]interface{}{ + {"HashJoin_9"}, + {"├─TableReader_12(Build)"}, + {"│ └─Selection_11"}, + {"│ └─TableRangeScan_10"}, + {"└─Selection_13(Probe)"}, + {" └─TableReader_16"}, + {" └─Selection_15"}, + {" └─TableFullScan_14"}, + } + tk.MustExec("set @@session.tidb_enable_inl_join_inner_multi_pattern='OFF'") + tk.MustQuery("explain "+sql).CheckAt([]int{0}, rows) + rows = [][]interface{}{ + {"IndexJoin_12"}, + {"├─TableReader_23(Build)"}, + {"│ └─Selection_22"}, + {"│ └─TableRangeScan_21"}, + {"└─IndexLookUp_11(Probe)"}, + {" ├─IndexRangeScan_8(Build)"}, + {" └─Selection_10(Probe)"}, + {" └─TableRowIDScan_9"}, + } + tk.MustExec("set @@session.tidb_enable_inl_join_inner_multi_pattern='ON'") + tk.MustQuery("explain "+sql).CheckAt([]int{0}, rows) + tk.MustExec("set @@session.tidb_enable_inl_join_inner_multi_pattern='ON'") + tk.MustQuery(sql).Check(testkit.Rows("1 2022-12-01 123 1 2022-12-01 123 1 ")) + tk.MustExec("set @@session.tidb_enable_inl_join_inner_multi_pattern='OFF'") + tk.MustQuery(sql).Check(testkit.Rows("1 2022-12-01 123 1 2022-12-01 123 1 ")) +} diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index fecce00ced731..9d7653d29a277 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -685,33 +685,77 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou } else { innerJoinKeys, outerJoinKeys, _, _ = p.GetJoinKeys() } - ds, isDataSource := innerChild.(*DataSource) - us, isUnionScan := innerChild.(*LogicalUnionScan) - if (!isDataSource && !isUnionScan) || (isDataSource && ds.preferStoreType&preferTiFlash != 0) { + innerChildWrapper := p.extractIndexJoinInnerChildPattern(innerChild) + if innerChildWrapper == nil { return nil } - if isUnionScan { - // The child of union scan may be union all for partition table. - ds, isDataSource = us.Children()[0].(*DataSource) + var avgInnerRowCnt float64 + if outerChild.statsInfo().RowCount > 0 { + avgInnerRowCnt = p.equalCondOutCnt / outerChild.statsInfo().RowCount + } + joins = p.buildIndexJoinInner2TableScan(prop, innerChildWrapper, innerJoinKeys, outerJoinKeys, outerIdx, avgInnerRowCnt) + if joins != nil { + return + } + return p.buildIndexJoinInner2IndexScan(prop, innerChildWrapper, innerJoinKeys, outerJoinKeys, outerIdx, avgInnerRowCnt) +} + +type indexJoinInnerChildWrapper struct { + ds *DataSource + us *LogicalUnionScan + proj *LogicalProjection + sel *LogicalSelection +} + +func (p *LogicalJoin) extractIndexJoinInnerChildPattern(innerChild LogicalPlan) *indexJoinInnerChildWrapper { + wrapper := &indexJoinInnerChildWrapper{} + switch child := innerChild.(type) { + case *DataSource: + wrapper.ds = child + case *LogicalUnionScan: + wrapper.us = child + ds, isDataSource := wrapper.us.Children()[0].(*DataSource) if !isDataSource { return nil } + wrapper.ds = ds // If one of the union scan children is a TiFlash table, then we can't choose index join. - for _, child := range us.Children() { + for _, child := range wrapper.us.Children() { if ds, ok := child.(*DataSource); ok && ds.preferStoreType&preferTiFlash != 0 { return nil } } + case *LogicalProjection: + if !p.ctx.GetSessionVars().EnableINLJoinInnerMultiPattern { + return nil + } + // For now, we only allow proj with all Column expression can be the inner side of index join + for _, expr := range child.Exprs { + if _, ok := expr.(*expression.Column); !ok { + return nil + } + } + wrapper.proj = child + ds, isDataSource := wrapper.proj.Children()[0].(*DataSource) + if !isDataSource { + return nil + } + wrapper.ds = ds + case *LogicalSelection: + if !p.ctx.GetSessionVars().EnableINLJoinInnerMultiPattern { + return nil + } + wrapper.sel = child + ds, isDataSource := wrapper.sel.Children()[0].(*DataSource) + if !isDataSource { + return nil + } + wrapper.ds = ds } - var avgInnerRowCnt float64 - if outerChild.statsInfo().RowCount > 0 { - avgInnerRowCnt = p.equalCondOutCnt / outerChild.statsInfo().RowCount - } - joins = p.buildIndexJoinInner2TableScan(prop, ds, innerJoinKeys, outerJoinKeys, outerIdx, us, avgInnerRowCnt) - if joins != nil { - return + if wrapper.ds == nil || wrapper.ds.preferStoreType&preferTiFlash != 0 { + return nil } - return p.buildIndexJoinInner2IndexScan(prop, ds, innerJoinKeys, outerJoinKeys, outerIdx, us, avgInnerRowCnt) + return wrapper } func (p *LogicalJoin) getIndexJoinBuildHelper(ds *DataSource, innerJoinKeys []*expression.Column, checkPathValid func(path *util.AccessPath) bool, outerJoinKeys []*expression.Column) (*indexJoinBuildHelper, []int) { @@ -751,8 +795,10 @@ func (p *LogicalJoin) getIndexJoinBuildHelper(ds *DataSource, innerJoinKeys []*e // fetched from the inner side for every tuple from the outer side. This will be // promised to be no worse than building IndexScan as the inner child. func (p *LogicalJoin) buildIndexJoinInner2TableScan( - prop *property.PhysicalProperty, ds *DataSource, innerJoinKeys, outerJoinKeys []*expression.Column, - outerIdx int, us *LogicalUnionScan, avgInnerRowCnt float64) (joins []PhysicalPlan) { + prop *property.PhysicalProperty, wrapper *indexJoinInnerChildWrapper, innerJoinKeys, outerJoinKeys []*expression.Column, + outerIdx int, avgInnerRowCnt float64) (joins []PhysicalPlan) { + ds := wrapper.ds + us := wrapper.us var tblPath *util.AccessPath for _, path := range ds.possibleAccessPaths { if path.IsTablePath() && path.StoreType == kv.TiKV { @@ -773,13 +819,13 @@ func (p *LogicalJoin) buildIndexJoinInner2TableScan( if helper == nil { return nil } - innerTask = p.constructInnerTableScanTask(ds, helper.chosenRanges.Range(), outerJoinKeys, us, false, false, avgInnerRowCnt) + innerTask = p.constructInnerTableScanTask(wrapper, helper.chosenRanges.Range(), outerJoinKeys, false, false, avgInnerRowCnt) // The index merge join's inner plan is different from index join, so we // should construct another inner plan for it. // Because we can't keep order for union scan, if there is a union scan in inner task, // we can't construct index merge join. if us == nil { - innerTask2 = p.constructInnerTableScanTask(ds, helper.chosenRanges.Range(), outerJoinKeys, us, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt) + innerTask2 = p.constructInnerTableScanTask(wrapper, helper.chosenRanges.Range(), outerJoinKeys, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt) } ranges = helper.chosenRanges } else { @@ -803,13 +849,13 @@ func (p *LogicalJoin) buildIndexJoinInner2TableScan( return nil } ranges := ranger.FullIntRange(mysql.HasUnsignedFlag(pkCol.RetType.GetFlag())) - innerTask = p.constructInnerTableScanTask(ds, ranges, outerJoinKeys, us, false, false, avgInnerRowCnt) + innerTask = p.constructInnerTableScanTask(wrapper, ranges, outerJoinKeys, false, false, avgInnerRowCnt) // The index merge join's inner plan is different from index join, so we // should construct another inner plan for it. // Because we can't keep order for union scan, if there is a union scan in inner task, // we can't construct index merge join. if us == nil { - innerTask2 = p.constructInnerTableScanTask(ds, ranges, outerJoinKeys, us, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt) + innerTask2 = p.constructInnerTableScanTask(wrapper, ranges, outerJoinKeys, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt) } } var ( @@ -837,8 +883,10 @@ func (p *LogicalJoin) buildIndexJoinInner2TableScan( } func (p *LogicalJoin) buildIndexJoinInner2IndexScan( - prop *property.PhysicalProperty, ds *DataSource, innerJoinKeys, outerJoinKeys []*expression.Column, - outerIdx int, us *LogicalUnionScan, avgInnerRowCnt float64) (joins []PhysicalPlan) { + prop *property.PhysicalProperty, wrapper *indexJoinInnerChildWrapper, innerJoinKeys, outerJoinKeys []*expression.Column, + outerIdx int, avgInnerRowCnt float64) (joins []PhysicalPlan) { + ds := wrapper.ds + us := wrapper.us helper, keyOff2IdxOff := p.getIndexJoinBuildHelper(ds, innerJoinKeys, func(path *util.AccessPath) bool { return !path.IsTablePath() }, outerJoinKeys) if helper == nil { return nil @@ -925,14 +973,14 @@ func (ijHelper *indexJoinBuildHelper) buildRangeDecidedByInformation(idxCols []* // constructInnerTableScanTask is specially used to construct the inner plan for PhysicalIndexJoin. func (p *LogicalJoin) constructInnerTableScanTask( - ds *DataSource, + wrapper *indexJoinInnerChildWrapper, ranges ranger.Ranges, outerJoinKeys []*expression.Column, - us *LogicalUnionScan, keepOrder bool, desc bool, rowCount float64, ) task { + ds := wrapper.ds // If `ds.tableInfo.GetPartitionInfo() != nil`, // it means the data source is a partition table reader. // If the inner task need to keep order, the partition table reader can't satisfy it. @@ -997,10 +1045,51 @@ func (p *LogicalJoin) constructInnerTableScanTask( ts.addPushedDownSelection(copTask, selStats) t := copTask.convertToRootTask(ds.ctx) reader := t.p - t.p = p.constructInnerUnionScan(us, reader) + t.p = p.constructInnerByWrapper(wrapper, reader) return t } +func (p *LogicalJoin) constructInnerByWrapper(wrapper *indexJoinInnerChildWrapper, child PhysicalPlan) PhysicalPlan { + if !p.ctx.GetSessionVars().EnableINLJoinInnerMultiPattern { + if wrapper.us != nil { + return p.constructInnerUnionScan(wrapper.us, child) + } + return child + } + if wrapper.us != nil { + return p.constructInnerUnionScan(wrapper.us, child) + } else if wrapper.proj != nil { + return p.constructInnerProj(wrapper.proj, child) + } else if wrapper.sel != nil { + return p.constructInnerSel(wrapper.sel, child) + } + return child +} + +func (p *LogicalJoin) constructInnerSel(sel *LogicalSelection, child PhysicalPlan) PhysicalPlan { + if sel == nil { + return child + } + physicalSel := PhysicalSelection{ + Conditions: sel.Conditions, + }.Init(sel.ctx, sel.stats, sel.blockOffset, nil) + physicalSel.SetChildren(child) + return physicalSel +} + +func (p *LogicalJoin) constructInnerProj(proj *LogicalProjection, child PhysicalPlan) PhysicalPlan { + if proj == nil { + return child + } + physicalProj := PhysicalProjection{ + Exprs: proj.Exprs, + CalculateNoDelay: proj.CalculateNoDelay, + AvoidColumnEvaluator: proj.AvoidColumnEvaluator, + }.Init(proj.ctx, proj.stats, proj.blockOffset, nil) + physicalProj.SetChildren(child) + return physicalProj +} + func (p *LogicalJoin) constructInnerUnionScan(us *LogicalUnionScan, reader PhysicalPlan) PhysicalPlan { if us == nil { return reader diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index f480365321b94..49cb29bd07cf6 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1055,6 +1055,9 @@ type SessionVars struct { // When it is false, ANALYZE reads the latest data. // When it is true, ANALYZE reads data on the snapshot at the beginning of ANALYZE. EnableAnalyzeSnapshot bool + + // EnableINLJoinInnerMultiPattern indicates whether enable multi pattern for index join inner side + EnableINLJoinInnerMultiPattern bool } // InitStatementContext initializes a StatementContext, the object is reused to reduce allocation. diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 1a1944f7abd01..c242df95612f1 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1584,6 +1584,15 @@ var defaultSysVars = []*SysVar{ s.EnableAnalyzeSnapshot = TiDBOptOn(val) return nil }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableINLJoinInnerMultiPattern, Value: BoolToOnOff(false), Type: TypeBool, + SetSession: func(s *SessionVars, val string) error { + s.EnableINLJoinInnerMultiPattern = TiDBOptOn(val) + return nil + }, + GetSession: func(s *SessionVars) (string, error) { + return BoolToOnOff(s.EnableINLJoinInnerMultiPattern), nil + }, + }, } // FeedbackProbability points to the FeedbackProbability in statistics package. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 59ac2dc6a1b6c..679fd26e0ba4b 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -652,6 +652,9 @@ const ( // When set to false, ANALYZE reads the latest data. // When set to true, ANALYZE reads data on the snapshot at the beginning of ANALYZE. TiDBEnableAnalyzeSnapshot = "tidb_enable_analyze_snapshot" + + // TiDBEnableINLJoinInnerMultiPattern indicates whether enable multi pattern for inner side of inl join + TiDBEnableINLJoinInnerMultiPattern = "tidb_enable_inl_join_inner_multi_pattern" ) // TiDB vars that have only global scope From f0a045837da0a84ed2ecd9878e3492db3bdfb931 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 14 Mar 2023 12:04:38 +0800 Subject: [PATCH 04/13] ddl: fix double/float data not being truncated for column modification operations when the number of decimal places is reduced. (#41555) (#41592) close pingcap/tidb#41281 --- ddl/column.go | 3 ++- ddl/column_type_change_test.go | 6 +++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/ddl/column.go b/ddl/column.go index 852b17d2e26c0..fd67f027a65b8 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -658,7 +658,8 @@ func needChangeColumnData(oldCol, newCol *model.ColumnInfo) bool { toUnsigned := mysql.HasUnsignedFlag(newCol.GetFlag()) originUnsigned := mysql.HasUnsignedFlag(oldCol.GetFlag()) needTruncationOrToggleSign := func() bool { - return (newCol.GetFlen() > 0 && newCol.GetFlen() < oldCol.GetFlen()) || (toUnsigned != originUnsigned) + return (newCol.GetFlen() > 0 && (newCol.GetFlen() < oldCol.GetFlen() || newCol.GetDecimal() < oldCol.GetDecimal())) || + (toUnsigned != originUnsigned) } // Ignore the potential max display length represented by integer's flen, use default flen instead. defaultOldColFlen, _ := mysql.GetDefaultFieldLengthAndDecimal(oldCol.GetType()) diff --git a/ddl/column_type_change_test.go b/ddl/column_type_change_test.go index 2be20d158afca..2d17f5133d79d 100644 --- a/ddl/column_type_change_test.go +++ b/ddl/column_type_change_test.go @@ -2366,11 +2366,15 @@ func TestColumnTypeChangeBetweenFloatAndDouble(t *testing.T) { prepare := func(createTableStmt string) { tk.MustExec("drop table if exists t;") tk.MustExec(createTableStmt) - tk.MustExec("insert into t values (36.4), (24.1);") + tk.MustExec("insert into t values (36.43), (24.1);") } prepare("create table t (a float(6,2));") tk.MustExec("alter table t modify a double(6,2)") + tk.MustQuery("select a from t;").Check(testkit.Rows("36.43", "24.1")) + + prepare("create table t (a float(6,2));") + tk.MustExec("alter table t modify a float(6,1)") tk.MustQuery("select a from t;").Check(testkit.Rows("36.4", "24.1")) prepare("create table t (a double(6,2));") From 51eab73cf1ee901e269bd69ce62eecbfcdd8bdef Mon Sep 17 00:00:00 2001 From: fzzf678 <108643977+fzzf678@users.noreply.github.com> Date: Tue, 14 Mar 2023 14:34:38 +0800 Subject: [PATCH 05/13] planner: skip plan cache if the plan contains Shuffle operators (#42168) close pingcap/tidb#38335 --- planner/core/common_plans.go | 23 +++++++++++++++++++++++ planner/core/prepare_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 2f874bdbe2214..55ab4b98edf54 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -588,6 +588,10 @@ REBUILD: if containTableDual(p) && varsNum > 0 { stmtCtx.SkipPlanCache = true } + // plan has shuffle operator will not be cached. + if !stmtCtx.SkipPlanCache && containShuffleOperator(p) { + stmtCtx.SkipPlanCache = true + } if prepared.UseCache && !stmtCtx.SkipPlanCache && !ignorePlanCache { // rebuild key to exclude kv.TiFlash when stmt is not read only if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(stmt, sessVars) { @@ -637,6 +641,25 @@ func containTableDual(p Plan) bool { return childContainTableDual } +func containShuffleOperator(p Plan) bool { + if _, isShuffle := p.(*PhysicalShuffle); isShuffle { + return true + } + if _, isShuffleRecv := p.(*PhysicalShuffleReceiverStub); isShuffleRecv { + return true + } + physicalPlan, ok := p.(PhysicalPlan) + if !ok { + return false + } + for _, child := range physicalPlan.Children() { + if containShuffleOperator(child) { + return true + } + } + return false +} + // tryCachePointPlan will try to cache point execution plan, there may be some // short paths for these executions, currently "point select" and "point update" func (e *Execute) tryCachePointPlan(ctx context.Context, sctx sessionctx.Context, diff --git a/planner/core/prepare_test.go b/planner/core/prepare_test.go index 71dd3cde99752..66c6b14c0aa54 100644 --- a/planner/core/prepare_test.go +++ b/planner/core/prepare_test.go @@ -2945,3 +2945,29 @@ func TestIssue37901(t *testing.T) { tk.MustExec(`execute st1 using @t`) tk.MustQuery(`select count(*) from t4`).Check(testkit.Rows("2")) } + +func TestIssue38335(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`CREATE TABLE PK_LP9463 ( + COL1 mediumint NOT NULL DEFAULT '77' COMMENT 'NUMERIC PK', + COL2 varchar(20) COLLATE utf8mb4_bin DEFAULT NULL, + COL4 datetime DEFAULT NULL, + COL3 bigint DEFAULT NULL, + COL5 float DEFAULT NULL, + PRIMARY KEY (COL1))`) + tk.MustExec(` +INSERT INTO PK_LP9463 VALUES (-7415279,'笚綷想摻癫梒偆荈湩窐曋繾鏫蘌憬稁渣½隨苆','1001-11-02 05:11:33',-3745331437675076296,-3.21618e38), +(-7153863,'鯷氤衡椻闍饑堀鱟垩啵緬氂哨笂序鉲秼摀巽茊','6800-06-20 23:39:12',-7871155140266310321,-3.04829e38), +(77,'娥藨潰眤徕菗柢礥蕶浠嶲憅榩椻鍙鑜堋ᛀ暵氎','4473-09-13 01:18:59',4076508026242316746,-1.9525e38), +(16614,'阖旕雐盬皪豧篣哙舄糗悄蟊鯴瞶珧赺潴嶽簤彉','2745-12-29 00:29:06',-4242415439257105874,2.71063e37)`) + tk.MustExec(`prepare stmt from 'SELECT *, rank() OVER (PARTITION BY col2 ORDER BY COL1) FROM PK_LP9463 WHERE col1 != ? AND col1 < ?'`) + tk.MustExec(`set @a=-8414766051197, @b=-8388608`) + tk.MustExec(`execute stmt using @a,@b`) + tk.MustExec(`set @a=16614, @b=16614`) + rows := tk.MustQuery(`execute stmt using @a,@b`).Sort() + tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0")) + tk.MustQuery(`SELECT *, rank() OVER (PARTITION BY col2 ORDER BY COL1) FROM PK_LP9463 WHERE col1 != 16614 and col1 < 16614`).Sort().Check(rows.Rows()) +} From e4e3678cf6ee2d737c52647b1c73de493b43e1c7 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 21 Mar 2023 10:38:41 +0800 Subject: [PATCH 06/13] planner: fix plan cache rebuild range error (#42220) (#42381) close pingcap/tidb#42150 --- planner/core/prepare_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/planner/core/prepare_test.go b/planner/core/prepare_test.go index 66c6b14c0aa54..899cd16f29649 100644 --- a/planner/core/prepare_test.go +++ b/planner/core/prepare_test.go @@ -2971,3 +2971,23 @@ INSERT INTO PK_LP9463 VALUES (-7415279,'笚綷想摻癫梒偆荈湩窐曋繾鏫 tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0")) tk.MustQuery(`SELECT *, rank() OVER (PARTITION BY col2 ORDER BY COL1) FROM PK_LP9463 WHERE col1 != 16614 and col1 < 16614`).Sort().Check(rows.Rows()) } + +func TestIssue42150(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec(`use test`) + tk.MustExec("drop table if exists t1, t2") + tk.MustExec("CREATE TABLE `t1` (`c_int` int(11) NOT NULL, `c_str` varchar(40) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL, `c_datetime` datetime DEFAULT NULL, `c_timestamp` timestamp NULL DEFAULT NULL, `c_double` double DEFAULT NULL, `c_decimal` decimal(12,6) DEFAULT NULL, `c_enum` enum('blue','green','red','yellow','white','orange','purple') NOT NULL, PRIMARY KEY (`c_int`,`c_enum`) /*T![clustered_index] CLUSTERED */, KEY `c_decimal` (`c_decimal`), UNIQUE KEY `c_datetime` (`c_datetime`), UNIQUE KEY `c_timestamp` (`c_timestamp`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;") + tk.MustExec("create table t (a int, b int, primary key(a), key(b))") + + tk.MustExec("prepare stmt from 'select c_enum from t1'") + tk.MustExec("execute stmt;") + tk.MustExec("execute stmt;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + tk.MustExec("prepare st from 'select a from t use index(b)'") + tk.MustExec("execute st") + tk.MustExec("execute st") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) +} From 6120e8fd6ed46cee5f3626ceea76bec32f581c2c Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 24 Mar 2023 09:20:42 +0800 Subject: [PATCH 07/13] *: Update the UniqueID of partition expression columns in LIST partitioning (#42193) (#42222) ref pingcap/tidb#42135 --- planner/core/partition_prune.go | 2 +- planner/core/partition_pruner_test.go | 37 +++++++++++++++++++ planner/core/rule_partition_processor.go | 47 +++++++++++++----------- table/tables/partition.go | 23 ++++++++++++ 4 files changed, 87 insertions(+), 22 deletions(-) diff --git a/planner/core/partition_prune.go b/planner/core/partition_prune.go index 3ab266340829d..2dec9c62d19e2 100644 --- a/planner/core/partition_prune.go +++ b/planner/core/partition_prune.go @@ -40,7 +40,7 @@ func PartitionPruning(ctx sessionctx.Context, tbl table.PartitionedTable, conds ret := s.convertToIntSlice(rangeOr, pi, partitionNames) return ret, nil case model.PartitionTypeList: - return s.pruneListPartition(ctx, tbl, partitionNames, conds) + return s.pruneListPartition(ctx, tbl, partitionNames, conds, columns) } return []int{FullRange}, nil } diff --git a/planner/core/partition_pruner_test.go b/planner/core/partition_pruner_test.go index 6fdf04ea5a873..59d44cdfdf9de 100644 --- a/planner/core/partition_pruner_test.go +++ b/planner/core/partition_pruner_test.go @@ -681,6 +681,43 @@ func TestRangePartitionPredicatePruner(t *testing.T) { } } +func TestIssue42135(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec(`create database issue42135`) + tk.MustExec(`use issue42135`) + tk.MustExec("CREATE TABLE `tx1` (`ID` varchar(13), `a` varchar(13), `b` varchar(4000), `ltype` int(5) NOT NULL)") + tk.MustExec("CREATE TABLE `tx2` (`ID` varchar(13), `rid` varchar(12), `a` varchar(9), `b` varchar(8), `c` longtext, `d` varchar(12), `ltype` int(5) NOT NULL) PARTITION BY LIST (`ltype`) (PARTITION `p1` VALUES IN (501), PARTITION `p2` VALUES IN (502))") + tk.MustExec("insert into tx1 values(1,1,1,501)") + tk.MustExec("insert into tx2 values(1,1,1,1,1,1,501)") + tk.MustExec(`analyze table tx1`) + tk.MustExec(`analyze table tx2`) + tk.MustQuery(`select * from tx1 inner join tx2 on tx1.ID=tx2.ID and tx1.ltype=tx2.ltype where tx2.rid='1'`).Check(testkit.Rows("1 1 1 501 1 1 1 1 1 1 501")) + tk.MustQuery(`explain format='brief' select * from tx1 inner join tx2 on tx1.ID=tx2.ID and tx1.ltype=tx2.ltype where tx2.rid='1'`).Check(testkit.Rows(""+ + "HashJoin 1.00 root inner join, equal:[eq(issue42135.tx1.id, issue42135.tx2.id) eq(issue42135.tx1.ltype, issue42135.tx2.ltype)]", + `├─TableReader(Build) 1.00 root partition:all data:Selection`, + `│ └─Selection 1.00 cop[tikv] eq(issue42135.tx2.rid, "1"), not(isnull(issue42135.tx2.id))`, + `│ └─TableFullScan 1.00 cop[tikv] table:tx2 keep order:false`, + `└─TableReader(Probe) 1.00 root data:Selection`, + ` └─Selection 1.00 cop[tikv] not(isnull(issue42135.tx1.id))`, + ` └─TableFullScan 1.00 cop[tikv] table:tx1 keep order:false`)) + + tk.MustExec(`drop table tx2`) + tk.MustExec("CREATE TABLE `tx2` (`ID` varchar(13), `rid` varchar(12), `a` varchar(9), `b` varchar(8), `c` longtext, `d` varchar(12), `ltype` int(5) NOT NULL) PARTITION BY LIST COLUMNS (`ltype`,d) (PARTITION `p1` VALUES IN ((501,1)), PARTITION `p2` VALUES IN ((502,1)))") + tk.MustExec("insert into tx2 values(1,1,1,1,1,1,501)") + tk.MustExec(`analyze table tx2`) + tk.MustQuery(`select * from tx1 inner join tx2 on tx1.ID=tx2.ID and tx1.ltype=tx2.ltype where tx2.rid='1'`).Check(testkit.Rows("1 1 1 501 1 1 1 1 1 1 501")) + tk.MustQuery(`explain format='brief' select * from tx1 inner join tx2 on tx1.ID=tx2.ID and tx1.ltype=tx2.ltype where tx2.rid='1'`).Check(testkit.Rows(""+ + "HashJoin 1.00 root inner join, equal:[eq(issue42135.tx1.id, issue42135.tx2.id) eq(issue42135.tx1.ltype, issue42135.tx2.ltype)]", + "├─TableReader(Build) 1.00 root partition:all data:Selection", + "│ └─Selection 1.00 cop[tikv] eq(issue42135.tx2.rid, \"1\"), not(isnull(issue42135.tx2.id))", + "│ └─TableFullScan 1.00 cop[tikv] table:tx2 keep order:false", + "└─TableReader(Probe) 1.00 root data:Selection", + " └─Selection 1.00 cop[tikv] not(isnull(issue42135.tx1.id))", + " └─TableFullScan 1.00 cop[tikv] table:tx1 keep order:false")) +} + func TestHashPartitionPruning(t *testing.T) { store, clean := testkit.CreateMockStore(t) defer clean() diff --git a/planner/core/rule_partition_processor.go b/planner/core/rule_partition_processor.go index 3ab21cee1403a..07eb87681904d 100644 --- a/planner/core/rule_partition_processor.go +++ b/planner/core/rule_partition_processor.go @@ -359,21 +359,30 @@ func (s *partitionProcessor) processHashPartition(ds *DataSource, pi *model.Part // listPartitionPruner uses to prune partition for list partition. type listPartitionPruner struct { *partitionProcessor - ctx sessionctx.Context - pi *model.PartitionInfo - partitionNames []model.CIStr - colIDToUniqueID map[int64]int64 - fullRange map[int]struct{} - listPrune *tables.ForListPruning + ctx sessionctx.Context + pi *model.PartitionInfo + partitionNames []model.CIStr + fullRange map[int]struct{} + listPrune *tables.ForListPruning } func newListPartitionPruner(ctx sessionctx.Context, tbl table.Table, partitionNames []model.CIStr, - s *partitionProcessor, conds []expression.Expression, pruneList *tables.ForListPruning) *listPartitionPruner { - colIDToUniqueID := make(map[int64]int64) - for _, cond := range conds { - condCols := expression.ExtractColumns(cond) - for _, c := range condCols { - colIDToUniqueID[c.ID] = c.UniqueID + s *partitionProcessor, conds []expression.Expression, pruneList *tables.ForListPruning, columns []*expression.Column) *listPartitionPruner { + pruneList = pruneList.Clone() + for i := range pruneList.PruneExprCols { + for j := range columns { + if columns[j].ID == pruneList.PruneExprCols[i].ID { + pruneList.PruneExprCols[i].UniqueID = columns[j].UniqueID + break + } + } + } + for i := range pruneList.ColPrunes { + for j := range columns { + if columns[j].ID == pruneList.ColPrunes[i].ExprCol.ID { + pruneList.ColPrunes[i].ExprCol.UniqueID = columns[j].UniqueID + break + } } } fullRange := make(map[int]struct{}) @@ -383,7 +392,6 @@ func newListPartitionPruner(ctx sessionctx.Context, tbl table.Table, partitionNa ctx: ctx, pi: tbl.Meta().Partition, partitionNames: partitionNames, - colIDToUniqueID: colIDToUniqueID, fullRange: fullRange, listPrune: pruneList, } @@ -526,9 +534,6 @@ func (l *listPartitionPruner) detachCondAndBuildRange(conds []expression.Express colLen := make([]int, 0, len(exprCols)) for _, c := range exprCols { c = c.Clone().(*expression.Column) - if uniqueID, ok := l.colIDToUniqueID[c.ID]; ok { - c.UniqueID = uniqueID - } cols = append(cols, c) colLen = append(colLen, types.UnspecifiedLength) } @@ -594,14 +599,14 @@ func (l *listPartitionPruner) findUsedListPartitions(conds []expression.Expressi } func (s *partitionProcessor) findUsedListPartitions(ctx sessionctx.Context, tbl table.Table, partitionNames []model.CIStr, - conds []expression.Expression) ([]int, error) { + conds []expression.Expression, columns []*expression.Column) ([]int, error) { pi := tbl.Meta().Partition partExpr, err := tbl.(partitionTable).PartitionExpr() if err != nil { return nil, err } - listPruner := newListPartitionPruner(ctx, tbl, partitionNames, s, conds, partExpr.ForListPruning) + listPruner := newListPartitionPruner(ctx, tbl, partitionNames, s, conds, partExpr.ForListPruning, columns) var used map[int]struct{} if partExpr.ForListPruning.ColPrunes == nil { used, err = listPruner.findUsedListPartitions(conds) @@ -624,8 +629,8 @@ func (s *partitionProcessor) findUsedListPartitions(ctx sessionctx.Context, tbl } func (s *partitionProcessor) pruneListPartition(ctx sessionctx.Context, tbl table.Table, partitionNames []model.CIStr, - conds []expression.Expression) ([]int, error) { - used, err := s.findUsedListPartitions(ctx, tbl, partitionNames, conds) + conds []expression.Expression, columns []*expression.Column) ([]int, error) { + used, err := s.findUsedListPartitions(ctx, tbl, partitionNames, conds, columns) if err != nil { return nil, err } @@ -872,7 +877,7 @@ func (s *partitionProcessor) processRangePartition(ds *DataSource, pi *model.Par } func (s *partitionProcessor) processListPartition(ds *DataSource, pi *model.PartitionInfo, opt *logicalOptimizeOp) (LogicalPlan, error) { - used, err := s.pruneListPartition(ds.SCtx(), ds.table, ds.partitionNames, ds.allConds) + used, err := s.pruneListPartition(ds.SCtx(), ds.table, ds.partitionNames, ds.allConds, ds.TblCols) if err != nil { return nil, err } diff --git a/table/tables/partition.go b/table/tables/partition.go index 5c961094740f4..e68c82a72771e 100644 --- a/table/tables/partition.go +++ b/table/tables/partition.go @@ -624,6 +624,29 @@ func generateListPartitionExpr(ctx sessionctx.Context, tblInfo *model.TableInfo, return ret, nil } +// Clone a copy of ForListPruning +func (lp *ForListPruning) Clone() *ForListPruning { + ret := *lp + if ret.LocateExpr != nil { + ret.LocateExpr = lp.LocateExpr.Clone() + } + if ret.PruneExpr != nil { + ret.PruneExpr = lp.PruneExpr.Clone() + } + ret.PruneExprCols = make([]*expression.Column, 0, len(lp.PruneExprCols)) + for i := range lp.PruneExprCols { + c := lp.PruneExprCols[i].Clone().(*expression.Column) + ret.PruneExprCols = append(ret.PruneExprCols, c) + } + ret.ColPrunes = make([]*ForListColumnPruning, 0, len(lp.ColPrunes)) + for i := range lp.ColPrunes { + l := *lp.ColPrunes[i] + l.ExprCol = l.ExprCol.Clone().(*expression.Column) + ret.ColPrunes = append(ret.ColPrunes, &l) + } + return &ret +} + func (lp *ForListPruning) buildListPruner(ctx sessionctx.Context, tblInfo *model.TableInfo, exprCols []*expression.Column, columns []*expression.Column, names types.NameSlice) error { pi := tblInfo.GetPartitionInfo() From 0ac10138876eebb9685ac493581515fb4b99dc26 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 24 Mar 2023 16:42:43 +0800 Subject: [PATCH 08/13] util/execdetails: fix potential risks of fatal error concurrent map read and map write (#42019) (#42088) close pingcap/tidb#39076 --- util/execdetails/execdetails.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index ab33b37fff610..633a3ffe5cb4d 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -568,6 +568,8 @@ func NewRuntimeStatsColl(reuse *RuntimeStatsColl) *RuntimeStatsColl { if reuse != nil { // Reuse map is cheaper than create a new map object. // Go compiler optimize this cleanup code pattern to a clearmap() function. + reuse.mu.Lock() + defer reuse.mu.Unlock() for k := range reuse.rootStats { delete(reuse.rootStats, k) } From 6c1e555795d051c985d6e0bdf7cd1be3e0ac24a7 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 28 Mar 2023 11:08:55 +0800 Subject: [PATCH 09/13] executor: Fix tidb crash on index merge reader (#40904) (#40981) close pingcap/tidb#40877 --- executor/index_merge_reader.go | 24 ++++++++++++++++-------- executor/index_merge_reader_test.go | 21 +++++++++++++++++++++ 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 939826edd7170..fabf68c1fe36d 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -590,8 +590,13 @@ func (e *IndexMergeReaderExecutor) startIndexMergeTableScanWorker(ctx context.Co defer trace.StartRegion(ctx, "IndexMergeTableScanWorker").End() var task *lookupTableTask util.WithRecovery( - func() { task = worker.pickAndExecTask(ctx1) }, - worker.handlePickAndExecTaskPanic(ctx1, task), + // Note we use the address of `task` as the argument of both `pickAndExecTask` and `handlePickAndExecTaskPanic` + // because `task` is expected to be assigned in `pickAndExecTask`, and this assignment should also be visible + // in `handlePickAndExecTaskPanic` since it will get `doneCh` from `task`. Golang always pass argument by value, + // so if we don't use the address of `task` as the argument, the assignment to `task` in `pickAndExecTask` is + // not visible in `handlePickAndExecTaskPanic` + func() { worker.pickAndExecTask(ctx1, &task) }, + worker.handlePickAndExecTaskPanic(ctx1, &task), ) cancel() e.tblWorkerWg.Done() @@ -895,12 +900,12 @@ type indexMergeTableScanWorker struct { memTracker *memory.Tracker } -func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context) (task *lookupTableTask) { +func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context, task **lookupTableTask) { var ok bool for { waitStart := time.Now() select { - case task, ok = <-w.workCh: + case *task, ok = <-w.workCh: if !ok { return } @@ -908,17 +913,18 @@ func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context) (task * return } execStart := time.Now() - err := w.executeTask(ctx, task) + err := w.executeTask(ctx, *task) if w.stats != nil { atomic.AddInt64(&w.stats.WaitTime, int64(execStart.Sub(waitStart))) atomic.AddInt64(&w.stats.FetchRow, int64(time.Since(execStart))) atomic.AddInt64(&w.stats.TableTaskNum, 1) } - task.doneCh <- err + failpoint.Inject("testIndexMergePickAndExecTaskPanic", nil) + (*task).doneCh <- err } } -func (w *indexMergeTableScanWorker) handlePickAndExecTaskPanic(ctx context.Context, task *lookupTableTask) func(r interface{}) { +func (w *indexMergeTableScanWorker) handlePickAndExecTaskPanic(ctx context.Context, task **lookupTableTask) func(r interface{}) { return func(r interface{}) { if r == nil { return @@ -926,7 +932,9 @@ func (w *indexMergeTableScanWorker) handlePickAndExecTaskPanic(ctx context.Conte err4Panic := errors.Errorf("panic in IndexMergeReaderExecutor indexMergeTableWorker: %v", r) logutil.Logger(ctx).Error(err4Panic.Error()) - task.doneCh <- err4Panic + if *task != nil { + (*task).doneCh <- err4Panic + } } } diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index c09d5429dcb5a..9d06eb3d70b68 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util" @@ -51,6 +52,26 @@ func TestSingleTableRead(t *testing.T) { tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ sum(a) from t1 where a < 2 or b > 4").Check(testkit.Rows("6")) } +func TestIndexMergePickAndExecTaskPanic(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2") + tk.MustExec("create table t1(id int primary key, a int, b int, c int, d int)") + tk.MustExec("create index t1a on t1(a)") + tk.MustExec("create index t1b on t1(b)") + tk.MustExec("insert into t1 values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5)") + tk.MustQuery("select /*+ use_index_merge(t1, primary, t1a) */ * from t1 where id < 2 or a > 4 order by id").Check(testkit.Rows("1 1 1 1 1", + "5 5 5 5 5")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePickAndExecTaskPanic", "panic(\"pickAndExecTaskPanic\")")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergePickAndExecTaskPanic")) + }() + err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, t1a) */ * from t1 where id < 2 or a > 4 order by id") + require.Contains(t, err.Error(), "pickAndExecTaskPanic") +} + func TestJoin(t *testing.T) { store, clean := testkit.CreateMockStore(t) defer clean() From 135aafd6b7b3ba47c15224bb8f0d81a45530a265 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 28 Mar 2023 12:24:54 +0800 Subject: [PATCH 10/13] planner: process over-optimization and skip plan-cache in some cases to avoid disaster plans (#38537) (#40213) close pingcap/tidb#38533 --- executor/explainfor_test.go | 2 +- planner/core/prepare_test.go | 46 ++++++++++++++++++++++++++++-------- util/ranger/detacher.go | 4 ++-- 3 files changed, 39 insertions(+), 13 deletions(-) diff --git a/executor/explainfor_test.go b/executor/explainfor_test.go index fc16cd93eea3f..7ceeb0eb5330a 100644 --- a/executor/explainfor_test.go +++ b/executor/explainfor_test.go @@ -949,7 +949,7 @@ func TestIndexMerge4PlanCache(t *testing.T) { tk.MustExec("set @a=9, @b=10, @c=11;") tk.MustQuery("execute stmt using @a, @a;").Check(testkit.Rows("10 10 10")) tk.MustQuery("execute stmt using @a, @c;").Check(testkit.Rows("10 10 10", "11 11 11")) - tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("1")) + tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("0")) // a>=9 and a<=9 --> a=9 tk.MustQuery("execute stmt using @c, @a;").Check(testkit.Rows("10 10 10")) tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("1")) diff --git a/planner/core/prepare_test.go b/planner/core/prepare_test.go index 899cd16f29649..946730b7e81e5 100644 --- a/planner/core/prepare_test.go +++ b/planner/core/prepare_test.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/hint" "github.com/pingcap/tidb/util/kvcache" "github.com/prometheus/client_golang/prometheus" @@ -1583,7 +1584,7 @@ func TestIssue29303(t *testing.T) { tk.MustQuery(`execute stmt using @a,@b,@c,@d`).Check(testkit.Rows()) tk.MustExec(`set @a="龂", @b="龂", @c="龂", @d="龂"`) tk.MustQuery(`execute stmt using @a,@b,@c,@d`).Check(testkit.Rows("� 龂 � 龂")) - tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0")) + tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1")) } func TestIssue34725(t *testing.T) { @@ -2218,11 +2219,10 @@ func TestPlanCachePointGetAndTableDual(t *testing.T) { tk.MustExec("insert into t1 values('0000','7777',1)") tk.MustExec("prepare s1 from 'select * from t1 where c1=? and c2>=? and c2<=?'") tk.MustExec("set @a1='0000', @b1='9999'") - // IndexLookup plan would be built, we should cache it. tk.MustQuery("execute s1 using @a1, @b1, @b1").Check(testkit.Rows()) tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) tk.MustQuery("execute s1 using @a1, @a1, @b1").Check(testkit.Rows("0000 7777 1")) - tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) // c2>=9999 and c2<=9999 --> c2=9999 tk.MustExec("create table t2(c1 bigint(20) primary key, c2 varchar(20))") tk.MustExec("insert into t2 values(1,'7777')") @@ -2238,17 +2238,15 @@ func TestPlanCachePointGetAndTableDual(t *testing.T) { tk.MustExec("insert into t3 values(2,1,1)") tk.MustExec("prepare s3 from 'select /*+ use_index_merge(t3) */ * from t3 where (c1 >= ? and c1 <= ?) or c2 > 1'") tk.MustExec("set @a3=1,@b3=3") - // TableReader plan would be built, we should cache it. tk.MustQuery("execute s3 using @a3,@a3").Check(testkit.Rows()) tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) tk.MustQuery("execute s3 using @a3,@b3").Check(testkit.Rows("2 1 1")) - tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) // c1>=1 and c1<=1 --> c1==1 tk.MustExec("prepare s3 from 'select /*+ use_index_merge(t3) */ * from t3 where (c1 >= ? and c1 <= ?) or c2 > 1'") tk.MustExec("set @a3=1,@b3=3") - // TableReader plan would be built, we should cache it. tk.MustQuery("execute s3 using @b3,@a3").Check(testkit.Rows()) - tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) tk.MustQuery("execute s3 using @a3,@b3").Check(testkit.Rows("2 1 1")) tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) @@ -2259,7 +2257,7 @@ func TestPlanCachePointGetAndTableDual(t *testing.T) { tk.MustQuery("execute s4 using @a4,@a4").Check(testkit.Rows()) tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) tk.MustQuery("execute s4 using @a4,@b4").Check(testkit.Rows("2 1 1")) - tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) // c1>=3 and c1<=3 --> c1=3 tk.MustExec("prepare s4 from 'select /*+ use_index_merge(t4) */ * from t4 where (c1 >= ? and c1 <= ?) or c2 > 1'") tk.MustExec("set @a4=1,@b4=3") @@ -2337,7 +2335,35 @@ func TestIssue23671(t *testing.T) { tk.MustQuery("execute s1 using @a, @b, @c").Check(testkit.Rows("1 1")) tk.MustExec("set @a=1, @b=1, @c=10") tk.MustQuery("execute s1 using @a, @b, @c").Check(testkit.Rows("1 1", "2 2")) - tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) // b>=1 and b<=1 --> b=1 +} + +func TestIssue38533(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + orgEnable := core.PreparedPlanCacheEnabled() + defer core.SetPreparedPlanCache(orgEnable) + core.SetPreparedPlanCache(true) + se, err := session.CreateSession4TestWithOpt(store, &session.Opt{ + PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64), + }) + require.NoError(t, err) + tk := testkit.NewTestKitWithSession(t, store, se) + + tk.MustExec("use test") + tk.MustExec("create table t (a int, key (a))") + tk.MustExec(`prepare st from "select /*+ use_index(t, a) */ a from t where a=? and a=?"`) + tk.MustExec(`set @a=1`) + tk.MustExec(`execute st using @a, @a`) + tkProcess := tk.Session().ShowProcess() + ps := []*util.ProcessInfo{tkProcess} + tk.Session().SetSessionManager(&testkit.MockSessionManager{PS: ps}) + plan := tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Rows() + require.True(t, strings.Contains(plan[1][0].(string), "RangeScan")) // range-scan instead of full-scan + + tk.MustExec(`execute st using @a, @a`) + tk.MustExec(`execute st using @a, @a`) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) } func TestIssue29296(t *testing.T) { @@ -2877,7 +2903,7 @@ func TestCachedTable(t *testing.T) { // IndexLookup tk.MustQuery("execute indexLookup using @a, @b").Check(testkit.Rows("2")) require.True(t, lastReadFromCache(tk)) - tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) // b>1 and b<3 --> b=2 // PointGet tk.MustQuery("execute pointGet using @a").Check(testkit.Rows("1")) diff --git a/util/ranger/detacher.go b/util/ranger/detacher.go index b94b4aff6e1e1..eb9186828269c 100644 --- a/util/ranger/detacher.go +++ b/util/ranger/detacher.go @@ -578,8 +578,8 @@ func ExtractEqAndInCondition(sctx sessionctx.Context, conditions []expression.Ex columnValues[i] = &valueInfo{mutable: true} } if expression.MaybeOverOptimized4PlanCache(sctx, conditions) { - // TODO: optimize it more elaborately, e.g. return [2 3, 2 3] as accesses for 'where a = 2 and b = 3 and c >= ? and c <= ?' - return nil, conditions, nil, nil, false + // `a=@x and a=@y` --> `a=@x if @x==@y` + sctx.GetSessionVars().StmtCtx.SkipPlanCache = true } } } From 1288a5e382587127fd98c669b28a1dee3b3f3bf6 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 28 Mar 2023 15:46:55 +0800 Subject: [PATCH 11/13] session: use TxnCtx.InfoSchema, no matter vars.InTxn() or not (#42027) (#42276) close pingcap/tidb#41622, close pingcap/tidb#42003 --- executor/stale_txn_test.go | 9 +++++++-- session/session.go | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 9899fb0e1c8fd..674c48e1d9d30 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -530,7 +530,9 @@ func TestStalenessTransactionSchemaVer(t *testing.T) { schemaVer1 := tk.Session().GetInfoSchema().SchemaMetaVersion() time1 := time.Now() + time.Sleep(100 * time.Millisecond) tk.MustExec("alter table t add c int") + time.Sleep(300 * time.Millisecond) // confirm schema changed time.Sleep(time.Millisecond * 20) @@ -544,12 +546,12 @@ func TestStalenessTransactionSchemaVer(t *testing.T) { // schema changed back to the newest tk.MustExec("commit") - time.Sleep(time.Millisecond * 20) + tk.Session().PrepareStmt("select 1") require.Equal(t, schemaVer2, tk.Session().GetInfoSchema().SchemaMetaVersion()) // select does not affect the infoschema tk.MustExec(fmt.Sprintf(`SELECT * from t AS OF TIMESTAMP '%s'`, time1.Format("2006-1-2 15:04:05.000"))) - time.Sleep(time.Millisecond * 20) + tk.Session().PrepareStmt("select 1") require.Equal(t, schemaVer2, tk.Session().GetInfoSchema().SchemaMetaVersion()) } @@ -945,7 +947,9 @@ func TestSetTransactionInfoSchema(t *testing.T) { schemaVer1 := tk.Session().GetInfoSchema().SchemaMetaVersion() time1 := time.Now() + time.Sleep(100 * time.Millisecond) tk.MustExec("alter table t add c int") + time.Sleep(300 * time.Millisecond) // confirm schema changed schemaVer2 := tk.Session().GetInfoSchema().SchemaMetaVersion() @@ -964,6 +968,7 @@ func TestSetTransactionInfoSchema(t *testing.T) { tk.MustExec("begin;") require.Equal(t, schemaVer2, tk.Session().GetInfoSchema().SchemaMetaVersion()) tk.MustExec("commit") + tk.Session().PrepareStmt("select 1") require.Equal(t, schemaVer3, tk.Session().GetInfoSchema().SchemaMetaVersion()) } diff --git a/session/session.go b/session/session.go index f0484876a3c90..f6336eb530ab9 100644 --- a/session/session.go +++ b/session/session.go @@ -3454,7 +3454,7 @@ func (s *session) GetInfoSchema() sessionctx.InfoschemaMetaVersion { if snap, ok := vars.SnapshotInfoschema.(infoschema.InfoSchema); ok { logutil.BgLogger().Info("use snapshot schema", zap.Uint64("conn", vars.ConnectionID), zap.Int64("schemaVersion", snap.SchemaMetaVersion())) is = snap - } else if vars.TxnCtx != nil && vars.InTxn() { + } else if vars.TxnCtx != nil { if tmp, ok := vars.TxnCtx.InfoSchema.(infoschema.InfoSchema); ok { is = tmp } From 9a875bdd9256546d3de45ff486aac1f1524866fe Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 29 Mar 2023 09:38:54 +0800 Subject: [PATCH 12/13] planner: skip plan-cache for prepared queries with `INT in (Decimals...)` (#40312) (#40690) close pingcap/tidb#40224 --- executor/prepared_test.go | 38 +++++++++++++++++++++++++++++ planner/core/expression_rewriter.go | 12 +++------ 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/executor/prepared_test.go b/executor/prepared_test.go index 33001c1db3979..ae40bccf618e0 100644 --- a/executor/prepared_test.go +++ b/executor/prepared_test.go @@ -184,6 +184,44 @@ func TestPreparedNullParam(t *testing.T) { } } +func TestIssue40224(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + + orgEnable := plannercore.PreparedPlanCacheEnabled() + defer func() { + plannercore.SetPreparedPlanCache(orgEnable) + }() + plannercore.SetPreparedPlanCache(true) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("create table t (a int, key(a))") + tk.MustExec("prepare st from 'select a from t where a in (?, ?)'") + tk.MustExec("set @a=1.0, @b=2.0") + tk.MustExec("execute st using @a, @b") + tk.MustExec("execute st using @a, @b") + tkProcess := tk.Session().ShowProcess() + ps := []*util.ProcessInfo{tkProcess} + tk.Session().SetSessionManager(&testkit.MockSessionManager{PS: ps}) + tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).CheckAt([]int{0}, + [][]interface{}{ + {"IndexReader_6"}, + {"└─IndexRangeScan_5"}, // range scan not full scan + }) + + tk.MustExec("set @a=1, @b=2") + tk.MustExec("execute st using @a, @b") + tk.MustExec("execute st using @a, @b") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) // cacheable for INT + tk.MustExec("execute st using @a, @b") + tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).CheckAt([]int{0}, + [][]interface{}{ + {"IndexReader_6"}, + {"└─IndexRangeScan_5"}, // range scan not full scan + }) +} + func TestIssue29850(t *testing.T) { store, clean := testkit.CreateMockStore(t) defer clean() diff --git a/planner/core/expression_rewriter.go b/planner/core/expression_rewriter.go index 3026574d8e424..ed8c83a5c4d1f 100644 --- a/planner/core/expression_rewriter.go +++ b/planner/core/expression_rewriter.go @@ -1496,16 +1496,10 @@ func (er *expressionRewriter) inToExpression(lLen int, not bool, tp *types.Field if c, ok := args[i].(*expression.Constant); ok { var isExceptional bool if expression.MaybeOverOptimized4PlanCache(er.sctx, []expression.Expression{c}) { - if c.GetType().EvalType() == types.ETString { - // To keep the result be compatible with MySQL, refine `int non-constant str constant` - // here and skip this refine operation in all other cases for safety. - er.sctx.GetSessionVars().StmtCtx.SkipPlanCache = true - expression.RemoveMutableConst(er.sctx, []expression.Expression{c}) - } else { - continue + if c.GetType().EvalType() == types.ETInt { + continue // no need to refine it } - } else if er.sctx.GetSessionVars().StmtCtx.SkipPlanCache { - // We should remove the mutable constant for correctness, because its value may be changed. + er.sctx.GetSessionVars().StmtCtx.SkipPlanCache = true expression.RemoveMutableConst(er.sctx, []expression.Expression{c}) } args[i], isExceptional = expression.RefineComparedConstant(er.sctx, *leftFt, c, opcode.EQ) From 3012080282c0a65d615eae20671ba89c7a595dd0 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 29 Mar 2023 11:00:54 +0800 Subject: [PATCH 13/13] executor: fix IndexMerge handle panic logic (#41036) (#41062) close pingcap/tidb#41047 --- executor/index_merge_reader.go | 209 ++++++++++++++++++++-------- executor/index_merge_reader_test.go | 79 +++++++++++ 2 files changed, 227 insertions(+), 61 deletions(-) diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index fabf68c1fe36d..ce2351d135a21 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -49,6 +49,13 @@ var ( _ Executor = &IndexMergeReaderExecutor{} ) +const ( + partialIndexWorkerType = "IndexMergePartialIndexWorker" + partialTableWorkerType = "IndexMergePartialTableWorker" + processWorkerType = "IndexMergeProcessWorker" + tableScanWorkerType = "IndexMergeTableScanWorker" +) + // IndexMergeReaderExecutor accesses a table with multiple index/table scan. // There are three types of workers: // 1. partialTableWorker/partialIndexWorker, which are used to fetch the handles @@ -87,10 +94,10 @@ type IndexMergeReaderExecutor struct { // All fields above are immutable. - tblWorkerWg sync.WaitGroup - idxWorkerWg sync.WaitGroup - processWokerWg sync.WaitGroup - finished chan struct{} + tblWorkerWg sync.WaitGroup + idxWorkerWg sync.WaitGroup + processWorkerWg sync.WaitGroup + finished chan struct{} workerStarted bool keyRanges [][]kv.KeyRange @@ -245,20 +252,26 @@ func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Cont indexMerge: e, stats: e.stats, } - e.processWokerWg.Add(1) + e.processWorkerWg.Add(1) go func() { defer trace.StartRegion(ctx, "IndexMergeProcessWorker").End() util.WithRecovery( func() { idxMergeProcessWorker.fetchLoop(ctx, fetch, workCh, e.resultCh, e.finished) }, - idxMergeProcessWorker.handleLoopFetcherPanic(ctx, e.resultCh), + handleWorkerPanic(ctx, e.finished, e.resultCh, processWorkerType), ) - e.processWokerWg.Done() + e.processWorkerWg.Done() }() } func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, workID int) error { + failpoint.Inject("testIndexMergeResultChCloseEarly", func(_ failpoint.Value) { + // Wait for processWorker to close resultCh. + time.Sleep(2) + // Should use fetchCh instead of resultCh to send error. + syncErr(ctx, e.finished, fetchCh, errors.New("testIndexMergeResultChCloseEarly")) + }) if e.runtimeStats != nil { collExec := true e.dagPBs[workID].CollectExecutionSummaries = &collExec @@ -282,6 +295,17 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, defer e.idxWorkerWg.Done() util.WithRecovery( func() { + failpoint.Inject("testIndexMergePanicPartialIndexWorker", nil) + failpoint.Inject("mockSleepBeforeStartTableReader", func(_ failpoint.Value) { + select { + case <-ctx.Done(): + failpoint.Return() + case <-e.finished: + failpoint.Return() + case <-exitCh: + failpoint.Return() + } + }) worker := &partialIndexWorker{ stats: e.stats, idxID: e.getPartitalPlanID(workID), @@ -289,13 +313,14 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, batchSize: e.maxChunkSize, maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, maxChunkSize: e.maxChunkSize, + memTracker: e.memTracker, } if e.isCorColInPartialFilters[workID] { // We got correlated column, so need to refresh Selection operator. var err error if e.dagPBs[workID].Executors, _, err = constructDistExec(e.ctx, e.partialPlans[workID]); err != nil { - worker.syncErr(e.resultCh, err) + syncErr(ctx, e.finished, fetchCh, err) return } } @@ -323,12 +348,12 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, // init kvReq and worker for this partition kvReq, err := builder.SetKeyRanges(keyRange).Build() if err != nil { - worker.syncErr(e.resultCh, err) + syncErr(ctx, e.finished, fetchCh, err) return } result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, e.handleCols.GetFieldsTypes(), e.feedbacks[workID], getPhysicalPlanIDs(e.partialPlans[workID]), e.getPartitalPlanID(workID)) if err != nil { - worker.syncErr(e.resultCh, err) + syncErr(ctx, e.finished, fetchCh, err) return } worker.batchSize = e.maxChunkSize @@ -341,7 +366,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, // fetch all data from this partition ctx1, cancel := context.WithCancel(ctx) - _, fetchErr := worker.fetchHandles(ctx1, result, exitCh, fetchCh, e.resultCh, e.finished, e.handleCols) + _, fetchErr := worker.fetchHandles(ctx1, result, exitCh, fetchCh, e.finished, e.handleCols) if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again e.feedbacks[workID].Invalidate() } @@ -355,7 +380,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, } } }, - e.handleHandlesFetcherPanic(ctx, e.resultCh, "partialIndexWorker"), + handleWorkerPanic(ctx, e.finished, fetchCh, partialIndexWorkerType), ) }() @@ -379,6 +404,17 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, defer e.idxWorkerWg.Done() util.WithRecovery( func() { + failpoint.Inject("testIndexMergePanicPartialTableWorker", nil) + failpoint.Inject("mockSleepBeforeStartTableReader", func(_ failpoint.Value) { + select { + case <-ctx.Done(): + failpoint.Return() + case <-e.finished: + failpoint.Return() + case <-exitCh: + failpoint.Return() + } + }) var err error partialTableReader := &TableReaderExecutor{ baseExecutor: newBaseExecutor(e.ctx, ts.Schema(), e.getPartitalPlanID(workID)), @@ -399,11 +435,12 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, maxChunkSize: e.maxChunkSize, tableReader: partialTableReader, + memTracker: e.memTracker, } if e.isCorColInPartialFilters[workID] { if e.dagPBs[workID].Executors, _, err = constructDistExec(e.ctx, e.partialPlans[workID]); err != nil { - worker.syncErr(e.resultCh, err) + syncErr(ctx, e.finished, fetchCh, err) return } partialTableReader.dagPB = e.dagPBs[workID] @@ -421,7 +458,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, partialTableReader.table = tbl if err = partialTableReader.Open(ctx); err != nil { logutil.Logger(ctx).Error("open Select result failed:", zap.Error(err)) - worker.syncErr(e.resultCh, err) + syncErr(ctx, e.finished, fetchCh, err) break } worker.batchSize = e.maxChunkSize @@ -434,7 +471,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, // fetch all handles from this table ctx1, cancel := context.WithCancel(ctx) - _, fetchErr := worker.fetchHandles(ctx1, exitCh, fetchCh, e.resultCh, e.finished, e.handleCols) + _, fetchErr := worker.fetchHandles(ctx1, exitCh, fetchCh, e.finished, e.handleCols) if fetchErr != nil { // this error is synced in fetchHandles, so don't sync it again e.feedbacks[workID].Invalidate() } @@ -450,7 +487,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, } } }, - e.handleHandlesFetcherPanic(ctx, e.resultCh, "partialTableWorker"), + handleWorkerPanic(ctx, e.finished, fetchCh, partialTableWorkerType), ) }() return nil @@ -487,17 +524,10 @@ type partialTableWorker struct { maxChunkSize int tableReader Executor partition table.PhysicalTable // it indicates if this worker is accessing a particular partition table + memTracker *memory.Tracker } -func (w *partialTableWorker) syncErr(resultCh chan<- *lookupTableTask, err error) { - doneCh := make(chan error, 1) - doneCh <- err - resultCh <- &lookupTableTask{ - doneCh: doneCh, - } -} - -func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, resultCh chan<- *lookupTableTask, +func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, finished <-chan struct{}, handleCols plannercore.HandleCols) (count int64, err error) { chk := chunk.NewChunkWithCapacity(retTypes(w.tableReader), w.maxChunkSize) var basic *execdetails.BasicRuntimeStats @@ -508,7 +538,7 @@ func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan str start := time.Now() handles, retChunk, err := w.extractTaskHandles(ctx, chk, handleCols) if err != nil { - w.syncErr(resultCh, err) + syncErr(ctx, finished, fetchCh, err) return count, err } if len(handles) == 0 { @@ -537,6 +567,8 @@ func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan str func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, handleCols plannercore.HandleCols) ( handles []kv.Handle, retChk *chunk.Chunk, err error) { handles = make([]kv.Handle, 0, w.batchSize) + var memUsage int64 + defer w.memTracker.Consume(-memUsage) for len(handles) < w.batchSize { chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize) err = errors.Trace(w.tableReader.Next(ctx, chk)) @@ -544,8 +576,14 @@ func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk. return handles, nil, err } if chk.NumRows() == 0 { + failpoint.Inject("testIndexMergeErrorPartialTableWorker", func(v failpoint.Value) { + failpoint.Return(handles, nil, errors.New(v.(string))) + }) return handles, retChk, nil } + memDelta := chk.MemoryUsage() + memUsage += memDelta + w.memTracker.Consume(memDelta) for i := 0; i < chk.NumRows(); i++ { handle, err := handleCols.BuildHandle(chk.GetRow(i)) if err != nil { @@ -590,13 +628,13 @@ func (e *IndexMergeReaderExecutor) startIndexMergeTableScanWorker(ctx context.Co defer trace.StartRegion(ctx, "IndexMergeTableScanWorker").End() var task *lookupTableTask util.WithRecovery( - // Note we use the address of `task` as the argument of both `pickAndExecTask` and `handlePickAndExecTaskPanic` + // Note we use the address of `task` as the argument of both `pickAndExecTask` and `handleTableScanWorkerPanic` // because `task` is expected to be assigned in `pickAndExecTask`, and this assignment should also be visible - // in `handlePickAndExecTaskPanic` since it will get `doneCh` from `task`. Golang always pass argument by value, + // in `handleTableScanWorkerPanic` since it will get `doneCh` from `task`. Golang always pass argument by value, // so if we don't use the address of `task` as the argument, the assignment to `task` in `pickAndExecTask` is - // not visible in `handlePickAndExecTaskPanic` + // not visible in `handleTableScanWorkerPanic` func() { worker.pickAndExecTask(ctx1, &task) }, - worker.handlePickAndExecTaskPanic(ctx1, &task), + worker.handleTableScanWorkerPanic(ctx1, e.finished, &task, tableScanWorkerType), ) cancel() e.tblWorkerWg.Done() @@ -681,19 +719,32 @@ func (e *IndexMergeReaderExecutor) getResultTask() (*lookupTableTask, error) { return e.resultCurr, nil } -func (e *IndexMergeReaderExecutor) handleHandlesFetcherPanic(ctx context.Context, resultCh chan<- *lookupTableTask, worker string) func(r interface{}) { +func handleWorkerPanic(ctx context.Context, finished <-chan struct{}, ch chan<- *lookupTableTask, worker string) func(r interface{}) { return func(r interface{}) { + if worker == processWorkerType { + // There is only one processWorker, so it's safe to close here. + // No need to worry about "close on closed channel" error. + defer close(ch) + } if r == nil { return } - err4Panic := errors.Errorf("panic in IndexMergeReaderExecutor %s: %v", worker, r) + err4Panic := errors.Errorf("%s: %v", worker, r) logutil.Logger(ctx).Error(err4Panic.Error()) doneCh := make(chan error, 1) doneCh <- err4Panic - resultCh <- &lookupTableTask{ + task := &lookupTableTask{ doneCh: doneCh, } + select { + case <-ctx.Done(): + return + case <-finished: + return + case ch <- task: + return + } } } @@ -703,9 +754,9 @@ func (e *IndexMergeReaderExecutor) Close() error { return nil } close(e.finished) - e.processWokerWg.Wait() e.tblWorkerWg.Wait() e.idxWorkerWg.Wait() + e.processWorkerWg.Wait() e.finished = nil e.workerStarted = false // TODO: how to store e.feedbacks @@ -719,17 +770,32 @@ type indexMergeProcessWorker struct { func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan *lookupTableTask, workCh chan<- *lookupTableTask, resultCh chan<- *lookupTableTask, finished <-chan struct{}) { - defer func() { - close(workCh) - close(resultCh) - }() + failpoint.Inject("testIndexMergeResultChCloseEarly", func(_ failpoint.Value) { + failpoint.Return() + }) + memTracker := memory.NewTracker(w.indexMerge.id, -1) + memTracker.AttachTo(w.indexMerge.memTracker) + defer memTracker.Detach() + defer close(workCh) + failpoint.Inject("testIndexMergePanicProcessWorkerUnion", nil) distinctHandles := make(map[int64]*kv.HandleMap) for task := range fetchCh { + select { + case err := <-task.doneCh: + // If got error from partialIndexWorker/partialTableWorker, stop processing. + if err != nil { + syncErr(ctx, finished, resultCh, err) + return + } + default: + } start := time.Now() handles := task.handles fhs := make([]kv.Handle, 0, 8) + memTracker.Consume(int64(cap(task.handles) * 8)) + var tblID int64 if w.indexMerge.partitionTableMode { tblID = getPhysicalTableID(task.partitionTable) @@ -770,22 +836,6 @@ func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan } } -func (w *indexMergeProcessWorker) handleLoopFetcherPanic(ctx context.Context, resultCh chan<- *lookupTableTask) func(r interface{}) { - return func(r interface{}) { - if r == nil { - return - } - - err4Panic := errors.Errorf("panic in IndexMergeReaderExecutor indexMergeTableWorker: %v", r) - logutil.Logger(ctx).Error(err4Panic.Error()) - doneCh := make(chan error, 1) - doneCh <- err4Panic - resultCh <- &lookupTableTask{ - doneCh: doneCh, - } - } -} - type partialIndexWorker struct { stats *IndexMergeRuntimeStat sc sessionctx.Context @@ -794,14 +844,26 @@ type partialIndexWorker struct { maxBatchSize int maxChunkSize int partition table.PhysicalTable // it indicates if this worker is accessing a particular partition table + memTracker *memory.Tracker } -func (w *partialIndexWorker) syncErr(resultCh chan<- *lookupTableTask, err error) { +func syncErr(ctx context.Context, finished <-chan struct{}, errCh chan<- *lookupTableTask, err error) { + logutil.BgLogger().Error("IndexMergeReaderExecutor.syncErr", zap.Error(err)) doneCh := make(chan error, 1) doneCh <- err - resultCh <- &lookupTableTask{ + task := &lookupTableTask{ doneCh: doneCh, } + + // ctx.Done and finished is to avoid write channel is stuck. + select { + case <-ctx.Done(): + return + case <-finished: + return + case errCh <- task: + return + } } func (w *partialIndexWorker) fetchHandles( @@ -809,7 +871,6 @@ func (w *partialIndexWorker) fetchHandles( result distsql.SelectResult, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, - resultCh chan<- *lookupTableTask, finished <-chan struct{}, handleCols plannercore.HandleCols) (count int64, err error) { chk := chunk.NewChunkWithCapacity(handleCols.GetFieldsTypes(), w.maxChunkSize) @@ -824,7 +885,7 @@ func (w *partialIndexWorker) fetchHandles( start := time.Now() handles, retChunk, err := w.extractTaskHandles(ctx, chk, result, handleCols) if err != nil { - w.syncErr(resultCh, err) + syncErr(ctx, finished, fetchCh, err) return count, err } if len(handles) == 0 { @@ -853,6 +914,8 @@ func (w *partialIndexWorker) fetchHandles( func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult, handleCols plannercore.HandleCols) ( handles []kv.Handle, retChk *chunk.Chunk, err error) { handles = make([]kv.Handle, 0, w.batchSize) + var memUsage int64 + defer w.memTracker.Consume(-memUsage) for len(handles) < w.batchSize { chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize) err = errors.Trace(idxResult.Next(ctx, chk)) @@ -860,8 +923,14 @@ func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk. return handles, nil, err } if chk.NumRows() == 0 { + failpoint.Inject("testIndexMergeErrorPartialIndexWorker", func(v failpoint.Value) { + failpoint.Return(handles, nil, errors.New(v.(string))) + }) return handles, retChk, nil } + memDelta := chk.MemoryUsage() + memUsage += memDelta + w.memTracker.Consume(memDelta) for i := 0; i < chk.NumRows(); i++ { handle, err := handleCols.BuildHandleFromIndexRow(chk.GetRow(i)) if err != nil { @@ -912,6 +981,17 @@ func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context, task ** case <-w.finished: return } + // Make sure panic failpoint is after fetch task from workCh. + // Otherwise cannot send error to task.doneCh. + failpoint.Inject("testIndexMergePanicTableScanWorker", nil) + failpoint.Inject("mockSleepBeforeStartTableReader", func(_ failpoint.Value) { + select { + case <-ctx.Done(): + failpoint.Return() + case <-w.finished: + failpoint.Return() + } + }) execStart := time.Now() err := w.executeTask(ctx, *task) if w.stats != nil { @@ -924,16 +1004,23 @@ func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context, task ** } } -func (w *indexMergeTableScanWorker) handlePickAndExecTaskPanic(ctx context.Context, task **lookupTableTask) func(r interface{}) { +func (w *indexMergeTableScanWorker) handleTableScanWorkerPanic(ctx context.Context, finished <-chan struct{}, task **lookupTableTask, worker string) func(r interface{}) { return func(r interface{}) { if r == nil { return } - err4Panic := errors.Errorf("panic in IndexMergeReaderExecutor indexMergeTableWorker: %v", r) + err4Panic := errors.Errorf("%s: %v", worker, r) logutil.Logger(ctx).Error(err4Panic.Error()) if *task != nil { - (*task).doneCh <- err4Panic + select { + case <-ctx.Done(): + return + case <-finished: + return + case (*task).doneCh <- err4Panic: + return + } } } } diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 9d06eb3d70b68..81dcc8155a7c7 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -605,3 +605,82 @@ func TestPessimisticLockOnPartitionForIndexMerge(t *testing.T) { // TODO: add support for index merge reader in dynamic tidb_partition_prune_mode } + +func TestIndexMergePanic(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));") + tk.MustExec("insert into t1 values(1, 1, 1), (100, 100, 100)") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeResultChCloseEarly", "return(true)")) + tk.MustExec("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c1 < 100 or c2 < 100") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeResultChCloseEarly")) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;") + insertStr := "insert into t1 values(0, 0, 0)" + for i := 1; i < 1000; i++ { + insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i) + } + tk.MustExec(insertStr) + tk.MustExec("analyze table t1;") + tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") + + minV := 200 + maxV := 1000 + runSQL := func(fp string) { + var sql string + v1 := rand.Intn(maxV-minV) + minV + v2 := rand.Intn(maxV-minV) + minV + sql = fmt.Sprintf("select /*+ use_index_merge(t1) */ c1 from t1 where c1 < %d or c2 < %d;", v1, v2) + res := tk.MustQuery("explain " + sql).Rows() + require.Contains(t, res[1][0], "IndexMerge") + err := tk.QueryToErr(sql) + require.Contains(t, err.Error(), fp) + } + + packagePath := "github.com/pingcap/tidb/executor/" + panicFPPaths := []string{ + packagePath + "testIndexMergePanicPartialIndexWorker", + packagePath + "testIndexMergePanicPartialTableWorker", + + packagePath + "testIndexMergePanicProcessWorkerUnion", + + packagePath + "testIndexMergePanicTableScanWorker", + } + for _, fp := range panicFPPaths { + fmt.Println("handling failpoint: ", fp) + if !strings.Contains(fp, "testIndexMergePanicTableScanWorker") { + // When mockSleepBeforeStartTableReader is enabled, will not read real data. This is to avoid leaking goroutines in coprocessor. + // But should disable mockSleepBeforeStartTableReader for testIndexMergePanicTableScanWorker. + // Because finalTableScanWorker need task.doneCh to pass error, so need partialIndexWorker/partialTableWorker runs normally. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockSleepBeforeStartTableReader", "return(1000)")) + } + for i := 0; i < 1000; i++ { + require.NoError(t, failpoint.Enable(fp, fmt.Sprintf(`panic("%s")`, fp))) + runSQL(fp) + require.NoError(t, failpoint.Disable(fp)) + } + if !strings.Contains(fp, "testIndexMergePanicTableScanWorker") { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockSleepBeforeStartTableReader")) + } + } + + errFPPaths := []string{ + packagePath + "testIndexMergeErrorPartialIndexWorker", + packagePath + "testIndexMergeErrorPartialTableWorker", + } + for _, fp := range errFPPaths { + fmt.Println("handling failpoint: ", fp) + require.NoError(t, failpoint.Enable(fp, fmt.Sprintf(`return("%s")`, fp))) + for i := 0; i < 100; i++ { + runSQL(fp) + } + require.NoError(t, failpoint.Disable(fp)) + } +}