Skip to content

Commit

Permalink
Merge branch 'release-5.0' into release-5.0-b07942836fa9
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Jun 22, 2021
2 parents 4099df9 + 9030fbb commit cc7d3ae
Show file tree
Hide file tree
Showing 46 changed files with 1,929 additions and 919 deletions.
21 changes: 11 additions & 10 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -195,31 +195,32 @@ test t4 1 expr_idx 1 NULL NULL (`a` + `b` + 1) 2 YES NO
explain format = 'brief' select count(1) from (select count(1) from (select * from t1 where c3 = 100) k) k2;
id estRows task access object operator info
StreamAgg 1.00 root funcs:count(1)->Column#5
└─StreamAgg 1.00 root funcs:firstrow(Column#9)->Column#7
└─StreamAgg 1.00 root funcs:count(Column#9)->Column#7
└─TableReader 1.00 root data:StreamAgg
└─StreamAgg 1.00 cop[tikv] funcs:firstrow(1)->Column#9
└─StreamAgg 1.00 cop[tikv] funcs:count(1)->Column#9
└─Selection 10.00 cop[tikv] eq(test.t1.c3, 100)
└─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
explain format = 'brief' select 1 from (select count(c2), count(c3) from t1) k;
id estRows task access object operator info
Projection 1.00 root 1->Column#6
└─StreamAgg 1.00 root funcs:firstrow(Column#14)->Column#9
└─StreamAgg 1.00 root funcs:count(Column#14)->Column#9
└─TableReader 1.00 root data:StreamAgg
└─StreamAgg 1.00 cop[tikv] funcs:firstrow(1)->Column#14
└─StreamAgg 1.00 cop[tikv] funcs:count(1)->Column#14
└─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
explain format = 'brief' select count(1) from (select max(c2), count(c3) as m from t1) k;
id estRows task access object operator info
StreamAgg 1.00 root funcs:count(1)->Column#6
└─StreamAgg 1.00 root funcs:firstrow(Column#13)->Column#8
└─StreamAgg 1.00 root funcs:count(Column#13)->Column#8
└─TableReader 1.00 root data:StreamAgg
└─StreamAgg 1.00 cop[tikv] funcs:firstrow(1)->Column#13
└─StreamAgg 1.00 cop[tikv] funcs:count(1)->Column#13
└─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
explain format = 'brief' select count(1) from (select count(c2) from t1 group by c3) k;
id estRows task access object operator info
StreamAgg 1.00 root funcs:count(1)->Column#5
└─HashAgg 8000.00 root group by:test.t1.c3, funcs:firstrow(1)->Column#7
└─TableReader 10000.00 root data:TableFullScan
└─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
└─HashAgg 8000.00 root group by:test.t1.c3, funcs:count(Column#9)->Column#7
└─TableReader 8000.00 root data:HashAgg
└─HashAgg 8000.00 cop[tikv] group by:test.t1.c3, funcs:count(1)->Column#9
└─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
set @@session.tidb_opt_insubq_to_join_and_agg=0;
explain format = 'brief' select sum(t1.c1 in (select c1 from t2)) from t1;
id estRows task access object operator info
Expand Down Expand Up @@ -499,7 +500,7 @@ PRIMARY KEY (`id`)
explain format = 'brief' SELECT COUNT(1) FROM (SELECT COALESCE(b.region_name, '不详') region_name, SUM(a.registration_num) registration_num FROM (SELECT stat_date, show_date, region_id, 0 registration_num FROM test01 WHERE period = 1 AND stat_date >= 20191202 AND stat_date <= 20191202 UNION ALL SELECT stat_date, show_date, region_id, registration_num registration_num FROM test01 WHERE period = 1 AND stat_date >= 20191202 AND stat_date <= 20191202) a LEFT JOIN test02 b ON a.region_id = b.id WHERE registration_num > 0 AND a.stat_date >= '20191202' AND a.stat_date <= '20191202' GROUP BY a.stat_date , a.show_date , COALESCE(b.region_name, '不详') ) JLS;
id estRows task access object operator info
StreamAgg 1.00 root funcs:count(1)->Column#22
└─HashAgg 8000.00 root group by:Column#32, Column#33, Column#34, funcs:firstrow(1)->Column#31
└─HashAgg 8000.00 root group by:Column#32, Column#33, Column#34, funcs:count(1)->Column#31
└─Projection 10000.01 root Column#14, Column#15, coalesce(test.test02.region_name, 不详)->Column#34
└─HashJoin 10000.01 root left outer join, equal:[eq(Column#16, test.test02.id)]
├─TableReader(Build) 10000.00 root data:TableFullScan
Expand Down
52 changes: 41 additions & 11 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,6 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" {
return
}
if len(r.selectResp.GetExecutionSummaries()) != len(r.copPlanIDs) {
logutil.Logger(ctx).Error("invalid cop task execution summaries length",
zap.Int("expected", len(r.copPlanIDs)),
zap.Int("received", len(r.selectResp.GetExecutionSummaries())))

return
}
if r.stats == nil {
id := r.rootPlanID
r.stats = &selectResultRuntimeStats{
Expand All @@ -316,12 +309,49 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordScanDetail(r.copPlanIDs[len(r.copPlanIDs)-1], r.storeType.Name(), copStats.ScanDetail)
}

for i, detail := range r.selectResp.GetExecutionSummaries() {
// If hasExecutor is true, it means the summary is returned from TiFlash.
hasExecutor := false
for _, detail := range r.selectResp.GetExecutionSummaries() {
if detail != nil && detail.TimeProcessedNs != nil &&
detail.NumProducedRows != nil && detail.NumIterations != nil {
planID := r.copPlanIDs[i]
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.
RecordOneCopTask(planID, r.storeType.Name(), callee, detail)
if detail.ExecutorId != nil {
hasExecutor = true
}
break
}
}
if hasExecutor {
var recorededPlanIDs = make(map[int]int)
for i, detail := range r.selectResp.GetExecutionSummaries() {
if detail != nil && detail.TimeProcessedNs != nil &&
detail.NumProducedRows != nil && detail.NumIterations != nil {
planID := r.copPlanIDs[i]
recorededPlanIDs[r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.
RecordOneCopTask(planID, r.storeType.Name(), callee, detail)] = 0
}
}
num := uint64(0)
dummySummary := &tipb.ExecutorExecutionSummary{TimeProcessedNs: &num, NumProducedRows: &num, NumIterations: &num, ExecutorId: nil}
for _, planID := range r.copPlanIDs {
if _, ok := recorededPlanIDs[planID]; !ok {
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordOneCopTask(planID, r.storeType.Name(), callee, dummySummary)
}
}
} else {
// For cop task cases, we still need this protection.
if len(r.selectResp.GetExecutionSummaries()) != len(r.copPlanIDs) {
logutil.Logger(ctx).Error("invalid cop task execution summaries length",
zap.Int("expected", len(r.copPlanIDs)),
zap.Int("received", len(r.selectResp.GetExecutionSummaries())))
return
}
for i, detail := range r.selectResp.GetExecutionSummaries() {
if detail != nil && detail.TimeProcessedNs != nil &&
detail.NumProducedRows != nil && detail.NumIterations != nil {
planID := r.copPlanIDs[i]
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.
RecordOneCopTask(planID, r.storeType.Name(), callee, detail)
}
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3056,9 +3056,10 @@ func (s *testSerialSuite) TestTiDBLastTxnInfoCommitMode(c *C) {
c.Assert(rows[0][1], Equals, "false")
c.Assert(rows[0][2], Equals, "false")

config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.AsyncCommit.SafeWindow = 0
})
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/invalidMaxCommitTS", "return"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/invalidMaxCommitTS"), IsNil)
}()

tk.MustExec("set @@tidb_enable_async_commit = 1")
tk.MustExec("set @@tidb_enable_1pc = 0")
Expand Down
4 changes: 4 additions & 0 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/israce"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testutil"
)
Expand Down Expand Up @@ -246,6 +247,9 @@ func (s *testSuite10) TestPaddingCommonHandle(c *C) {
}

func (s *testSuite2) TestInsertReorgDelete(c *C) {
if israce.RaceEnabled {
c.Skip("exhaustive types test, skip race test")
}
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

Expand Down
24 changes: 10 additions & 14 deletions executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,20 @@ type MPPGather struct {
respIter distsql.SelectResult
}

func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment, tasks []*kv.MPPTask, isRoot bool) error {
func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
dagReq, _, err := constructDAGReq(e.ctx, []plannercore.PhysicalPlan{pf.ExchangeSender}, kv.TiFlash)
if err != nil {
return errors.Trace(err)
}
for i := range pf.ExchangeSender.Schema().Columns {
dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i))
}
if !isRoot {
if !pf.IsRoot {
dagReq.EncodeType = tipb.EncodeType_TypeCHBlock
} else {
dagReq.EncodeType = tipb.EncodeType_TypeChunk
}
for _, mppTask := range tasks {
for _, mppTask := range pf.ExchangeSender.Tasks {
err := updateExecutorTableID(context.Background(), dagReq.RootExecutor, mppTask.TableID, true)
if err != nil {
return errors.Trace(err)
Expand All @@ -77,20 +77,14 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment, tasks []*kv.M
Data: pbData,
Meta: mppTask.Meta,
ID: mppTask.ID,
IsRoot: isRoot,
IsRoot: pf.IsRoot,
Timeout: 10,
SchemaVar: e.is.SchemaMetaVersion(),
StartTs: e.startTS,
State: kv.MppTaskReady,
}
e.mppReqs = append(e.mppReqs, req)
}
for _, r := range pf.ExchangeReceivers {
err = e.appendMPPDispatchReq(r.GetExchangeSender().Fragment, r.Tasks, false)
if err != nil {
return errors.Trace(err)
}
}
return nil
}

Expand All @@ -108,13 +102,15 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
// TODO: Move the construct tasks logic to planner, so we can see the explain results.
sender := e.originalPlan.(*plannercore.PhysicalExchangeSender)
planIDs := collectPlanIDS(e.originalPlan, nil)
rootTasks, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, sender, e.is)
frags, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, sender, e.is)
if err != nil {
return errors.Trace(err)
}
err = e.appendMPPDispatchReq(sender.Fragment, rootTasks, true)
if err != nil {
return errors.Trace(err)
for _, frag := range frags {
err = e.appendMPPDispatchReq(frag)
if err != nil {
return errors.Trace(err)
}
}
failpoint.Inject("checkTotalMPPTasks", func(val failpoint.Value) {
if val.(int) != len(e.mppReqs) {
Expand Down
11 changes: 10 additions & 1 deletion executor/parallel_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type ParallelNestedLoopApplyExec struct {
// fields about concurrency control
concurrency int
started uint32
drained uint32 // drained == true indicates there is no more data
freeChkCh chan *chunk.Chunk
resultChkCh chan result
outerRowCh chan outerRow
Expand Down Expand Up @@ -132,6 +133,11 @@ func (e *ParallelNestedLoopApplyExec) Open(ctx context.Context) error {

// Next implements the Executor interface.
func (e *ParallelNestedLoopApplyExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
if atomic.LoadUint32(&e.drained) == 1 {
req.Reset()
return nil
}

if atomic.CompareAndSwapUint32(&e.started, 0, 1) {
e.workerWg.Add(1)
go e.outerWorker(ctx)
Expand All @@ -149,6 +155,7 @@ func (e *ParallelNestedLoopApplyExec) Next(ctx context.Context, req *chunk.Chunk
}
if result.chk == nil { // no more data
req.Reset()
atomic.StoreUint32(&e.drained, 1)
return nil
}
req.SwapColumns(result.chk)
Expand All @@ -159,12 +166,14 @@ func (e *ParallelNestedLoopApplyExec) Next(ctx context.Context, req *chunk.Chunk
// Close implements the Executor interface.
func (e *ParallelNestedLoopApplyExec) Close() error {
e.memTracker = nil
err := e.outerExec.Close()
if atomic.LoadUint32(&e.started) == 1 {
close(e.exit)
e.notifyWg.Wait()
e.started = 0
}
// Wait all workers to finish before Close() is called.
// Otherwise we may got data race.
err := e.outerExec.Close()

if e.runtimeStats != nil {
runtimeStats := newJoinRuntimeStats()
Expand Down
11 changes: 11 additions & 0 deletions executor/parallel_apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,3 +597,14 @@ func (s *testSuite) TestApplyGoroutinePanic(c *C) {
c.Assert(failpoint.Disable(panicPath), IsNil)
}
}

func (s *testSuite) TestIssue24930(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("set tidb_enable_parallel_apply=true")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1(a int)")
tk.MustExec("create table t2(a int)")
tk.MustQuery(`select case when t1.a is null
then (select t2.a from t2 where t2.a = t1.a limit 1) else t1.a end a
from t1 where t1.a=1 order by a limit 1`).Check(testkit.Rows()) // can return an empty result instead of hanging forever
}
14 changes: 14 additions & 0 deletions executor/point_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,20 @@ func (s *testPointGetSuite) TestPointGetDataTooLong(c *C) {
tk.MustExec("drop table if exists PK_1389;")
}

// issue #25320
func (s *testPointGetSuite) TestDistinctPlan(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists test_distinct;")
tk.MustExec(`CREATE TABLE test_distinct (
id bigint(18) NOT NULL COMMENT '主键',
b bigint(18) NOT NULL COMMENT '用户ID',
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;`)
tk.MustExec("insert into test_distinct values (123456789101112131,223456789101112131),(123456789101112132,223456789101112131);")
tk.MustQuery("select distinct b from test_distinct where id in (123456789101112131,123456789101112132);").Check(testkit.Rows("223456789101112131"))
}

func (s *testPointGetSuite) TestPointGetCharPK(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test;`)
Expand Down
Loading

0 comments on commit cc7d3ae

Please sign in to comment.