Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: fix wrong agg function when agg push down union (#17022) #17327

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ id count task operator info
IndexReader_5 10000.00 root index:IndexScan_4
└─IndexScan_4 10000.00 cop[tikv] table:t1, index:c2, range:[NULL,+inf], keep order:false, stats:pseudo
explain select c1 from t2 union select c1 from t2 union all select c1 from t2;
<<<<<<< HEAD
id count task operator info
Union_17 26000.00 root
├─HashAgg_21 16000.00 root group by:c1, funcs:firstrow(join_agg_0)
Expand All @@ -186,6 +187,39 @@ HashAgg_18 24000.00 root group by:c1, funcs:firstrow(join_agg_0)
└─StreamAgg_54 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1), firstrow(test.t2.c1)
└─IndexReader_64 10000.00 root index:IndexScan_63
└─IndexScan_63 10000.00 cop[tikv] table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
=======
id estRows task access object operator info
Union_17 26000.00 root
├─HashAgg_21 16000.00 root group by:Column#10, funcs:firstrow(Column#12)->Column#10
│ └─Union_22 16000.00 root
│ ├─StreamAgg_27 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#12, funcs:firstrow(test.t2.c1)->Column#10
│ │ └─IndexReader_40 10000.00 root index:IndexFullScan_39
│ │ └─IndexFullScan_39 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo
│ └─StreamAgg_45 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#12, funcs:firstrow(test.t2.c1)->Column#10
│ └─IndexReader_58 10000.00 root index:IndexFullScan_57
│ └─IndexFullScan_57 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo
└─IndexReader_63 10000.00 root index:IndexFullScan_62
└─IndexFullScan_62 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:false, stats:pseudo
explain select c1 from t2 union all select c1 from t2 union select c1 from t2;
id estRows task access object operator info
HashAgg_18 24000.00 root group by:Column#10, funcs:firstrow(Column#11)->Column#10
└─Union_19 24000.00 root
├─StreamAgg_24 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11, funcs:firstrow(test.t2.c1)->Column#10
│ └─IndexReader_37 10000.00 root index:IndexFullScan_36
│ └─IndexFullScan_36 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo
├─StreamAgg_42 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11, funcs:firstrow(test.t2.c1)->Column#10
│ └─IndexReader_55 10000.00 root index:IndexFullScan_54
│ └─IndexFullScan_54 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo
└─StreamAgg_60 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11, funcs:firstrow(test.t2.c1)->Column#10
└─IndexReader_73 10000.00 root index:IndexFullScan_72
└─IndexFullScan_72 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo
select * from information_schema.tidb_indexes where table_name='t4';
TABLE_SCHEMA TABLE_NAME NON_UNIQUE KEY_NAME SEQ_IN_INDEX COLUMN_NAME SUB_PART INDEX_COMMENT Expression INDEX_ID
test t4 0 PRIMARY 1 a NULL NULL 0
test t4 1 idx 1 a NULL NULL 1
test t4 1 idx 2 b NULL NULL 1
test t4 1 expr_idx 1 NULL NULL (`a` + `b` + 1) 2
>>>>>>> b248783... planner: fix wrong agg function when agg push down union (#17022)
explain select count(1) from (select count(1) from (select * from t1 where c3 = 100) k) k2;
id count task operator info
StreamAgg_13 1.00 root funcs:count(1)
Expand Down Expand Up @@ -663,6 +697,7 @@ Sort_6 10000.00 root test.t.a:asc, test.t.b:asc
drop table if exists t;
set @@session.tidb_opt_insubq_to_join_and_agg=1;
explain SELECT 0 AS a FROM dual UNION SELECT 1 AS a FROM dual ORDER BY a;
<<<<<<< HEAD
id count task operator info
Sort_13 2.00 root a:asc
└─HashAgg_17 2.00 root group by:a, funcs:firstrow(join_agg_0)
Expand All @@ -683,6 +718,25 @@ HashAgg_15 2.00 root group by:a, funcs:firstrow(join_agg_0)
└─StreamAgg_26 1.00 root group by:a, funcs:firstrow(a), firstrow(a)
└─Projection_31 1.00 root 1
└─TableDual_32 1.00 root rows:1
=======
id estRows task access object operator info
Sort_13 2.00 root Column#3
└─HashAgg_17 2.00 root group by:Column#3, funcs:firstrow(Column#6)->Column#3
└─Union_18 2.00 root
├─HashAgg_19 1.00 root group by:1, funcs:firstrow(0)->Column#6, funcs:firstrow(0)->Column#3
│ └─TableDual_22 1.00 root rows:1
└─HashAgg_25 1.00 root group by:1, funcs:firstrow(1)->Column#6, funcs:firstrow(1)->Column#3
└─TableDual_28 1.00 root rows:1
explain SELECT 0 AS a FROM dual UNION (SELECT 1 AS a FROM dual ORDER BY a);
id estRows task access object operator info
HashAgg_15 2.00 root group by:Column#3, funcs:firstrow(Column#6)->Column#3
└─Union_16 2.00 root
├─HashAgg_17 1.00 root group by:1, funcs:firstrow(0)->Column#6, funcs:firstrow(0)->Column#3
│ └─TableDual_20 1.00 root rows:1
└─StreamAgg_27 1.00 root group by:Column#1, funcs:firstrow(Column#1)->Column#6, funcs:firstrow(Column#1)->Column#3
└─Projection_32 1.00 root 1->Column#1
└─TableDual_33 1.00 root rows:1
>>>>>>> b248783... planner: fix wrong agg function when agg push down union (#17022)
create table t (i int key, j int, unique key (i, j));
begin;
insert into t values (1, 1);
Expand Down
47 changes: 47 additions & 0 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,54 @@ func (s *testSuite1) TestOnlyFullGroupBy(c *C) {
c.Assert(terror.ErrorEqual(err, plannercore.ErrAmbiguous), IsTrue, Commentf("err %v", err))
}

<<<<<<< HEAD
func (s *testSuite1) TestIssue13652(c *C) {
=======
func (s *testSuiteAgg) TestIssue16279(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set sql_mode = 'ONLY_FULL_GROUP_BY'")
tk.MustExec("drop table if exists s")
tk.MustExec("create table s(a int);")
tk.MustQuery("select count(a) , date_format(a, '%Y-%m-%d') from s group by date_format(a, '%Y-%m-%d');")
tk.MustQuery("select count(a) , date_format(a, '%Y-%m-%d') as xx from s group by date_format(a, '%Y-%m-%d');")
tk.MustQuery("select count(a) , date_format(a, '%Y-%m-%d') as xx from s group by xx")
}

func (s *testSuiteAgg) TestAggPushDownPartitionTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec(`CREATE TABLE t1 (
a int(11) DEFAULT NULL,
b tinyint(4) NOT NULL,
PRIMARY KEY (b)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
PARTITION BY RANGE ( b ) (
PARTITION p0 VALUES LESS THAN (10),
PARTITION p1 VALUES LESS THAN (20),
PARTITION p2 VALUES LESS THAN (30),
PARTITION p3 VALUES LESS THAN (40),
PARTITION p4 VALUES LESS THAN (MAXVALUE)
)`)
tk.MustExec("insert into t1 values (0, 0), (1, 1), (1, 2), (1, 3), (2, 4), (2, 5), (2, 6), (3, 7), (3, 10), (3, 11), (12, 12), (12, 13), (14, 14), (14, 15), (20, 20), (20, 21), (20, 22), (23, 23), (23, 24), (23, 25), (31, 30), (31, 31), (31, 32), (33, 33), (33, 34), (33, 35), (36, 36), (80, 80), (90, 90), (100, 100)")
tk.MustExec("set @@tidb_opt_agg_push_down = 1")
tk.MustQuery("select /*+ AGG_TO_COP() */ sum(a), sum(b) from t1 where a < 40 group by a").Sort().Check(testkit.Rows(
"0 0",
"24 25",
"28 29",
"3 6",
"36 36",
"6 15",
"60 63",
"69 72",
"9 28",
"93 93",
"99 102"))
}

func (s *testSuiteAgg) TestIssue13652(c *C) {
>>>>>>> b248783... planner: fix wrong agg function when agg push down union (#17022)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set sql_mode = 'ONLY_FULL_GROUP_BY'")
Expand Down
23 changes: 20 additions & 3 deletions planner/core/rule_aggregation_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (a *aggregationPushDownSolver) makeNewAgg(ctx sessionctx.Context, aggFuncs

// pushAggCrossUnion will try to push the agg down to the union. If the new aggregation's group-by columns doesn't contain unique key.
// We will return the new aggregation. Otherwise we will transform the aggregation to projection.
func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, unionSchema *expression.Schema, unionChild LogicalPlan) LogicalPlan {
func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, unionSchema *expression.Schema, unionChild LogicalPlan) (LogicalPlan, error) {
ctx := agg.ctx
newAgg := LogicalAggregation{
AggFuncs: make([]*aggregation.AggFuncDesc, 0, len(agg.AggFuncs)),
Expand All @@ -319,6 +319,12 @@ func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, u
for _, gbyExpr := range agg.GroupByItems {
newExpr := expression.ColumnSubstitute(gbyExpr, unionSchema, expression.Column2Exprs(unionChild.Schema().Columns))
newAgg.GroupByItems = append(newAgg.GroupByItems, newExpr)
// TODO: if there is a duplicated first_row function, we can delete it.
firstRow, err := aggregation.NewAggFuncDesc(agg.ctx, ast.AggFuncFirstRow, []expression.Expression{gbyExpr}, false)
if err != nil {
return nil, err
}
newAgg.AggFuncs = append(newAgg.AggFuncs, firstRow)
}
newAgg.collectGroupByColumns()
tmpSchema := expression.NewSchema(newAgg.groupByCols...)
Expand All @@ -327,13 +333,21 @@ func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, u
// this will cause error during executor phase.
for _, key := range unionChild.Schema().Keys {
if tmpSchema.ColumnsIndices(key) != nil {
<<<<<<< HEAD
proj := a.convertAggToProj(newAgg)
proj.SetChildren(unionChild)
return proj
=======
if ok, proj := ConvertAggToProj(newAgg, newAgg.schema); ok {
proj.SetChildren(unionChild)
return proj, nil
}
break
>>>>>>> b248783... planner: fix wrong agg function when agg push down union (#17022)
}
}
newAgg.SetChildren(unionChild)
return newAgg
return newAgg, nil
}

func (a *aggregationPushDownSolver) optimize(ctx context.Context, p LogicalPlan) (LogicalPlan, error) {
Expand Down Expand Up @@ -412,7 +426,10 @@ func (a *aggregationPushDownSolver) aggPushDown(p LogicalPlan) (_ LogicalPlan, e
}
newChildren := make([]LogicalPlan, 0, len(union.children))
for _, child := range union.children {
newChild := a.pushAggCrossUnion(pushedAgg, union.Schema(), child)
newChild, err := a.pushAggCrossUnion(pushedAgg, union.Schema(), child)
if err != nil {
return p, err
}
newChildren = append(newChildren, newChild)
}
union.SetSchema(expression.NewSchema(newChildren[0].Schema().Columns...))
Expand Down
Loading