Skip to content

Commit

Permalink
Merge branch 'master' into bb7133/fix_31585
Browse files Browse the repository at this point in the history
  • Loading branch information
xhebox authored Aug 23, 2022
2 parents cab4d09 + cfd4ddd commit ef78378
Show file tree
Hide file tree
Showing 62 changed files with 1,576 additions and 328 deletions.
10 changes: 4 additions & 6 deletions build/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
"ddl/backfilling.go": "ddl/backfilling.go",
"ddl/column.go": "ddl/column.go",
"ddl/index.go": "ddl/index.go",
"ddl/lightning/": "ddl/lightning/",
"server/conn.go": "server/conn.go",
"server/conn_stmt.go": "server/conn_stmt.go",
"server/conn_test.go": "server/conn_test.go",
Expand Down Expand Up @@ -293,11 +294,11 @@
"ddl/backfilling.go": "ddl/backfilling.go",
"ddl/column.go": "ddl/column.go",
"ddl/index.go": "ddl/index.go",
"ddl/lightning/": "ddl/lightning/",
"expression/builtin_cast.go": "expression/builtin_cast code",
"server/conn.go": "server/conn.go",
"server/conn_stmt.go": "server/conn_stmt.go",
"server/conn_test.go": "server/conn_test.go",
"ddl/index.go": "ddl/index code",
"planner/core/rule_partition_eliminate.go": "planner/core/rule_partition_eliminate code",
"distsql/": "ignore distsql code",
"dumpling/export": "dumpling/export code",
Expand Down Expand Up @@ -650,15 +651,12 @@
"only_files": {
"types/json/binary_functions.go": "types/json/binary_functions.go",
"types/json/binary_test.go": "types/json/binary_test.go",
"ddl/backfilling.go": "ddl/backfilling.go",
"ddl/column.go": "ddl/column.go",
"ddl/index.go": "ddl/index.go",
"ddl/": "enable to ddl",
"expression/builtin_cast.go": "enable expression/builtin_cast.go",
"planner/core/plan.go": "planner/core/plan.go",
"server/conn.go": "server/conn.go",
"server/conn_stmt.go": "server/conn_stmt.go",
"server/conn_test.go": "server/conn_test.go",
"ddl/": "enable to ddl"
"server/conn_test.go": "server/conn_test.go"
}
},
"SA2000": {
Expand Down
2 changes: 2 additions & 0 deletions cmd/explaintest/r/clustered_index.result
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
set @@tidb_enable_outer_join_reorder=true;
drop database if exists with_cluster_index;
create database with_cluster_index;
drop database if exists wout_cluster_index;
Expand Down Expand Up @@ -127,3 +128,4 @@ StreamAgg_17 1.00 root funcs:count(Column#9)->Column#7
└─IndexReader_18 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#9
└─IndexRangeScan_16 109.70 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[803163,+inf], keep order:false
set @@tidb_enable_outer_join_reorder=false;
25 changes: 25 additions & 0 deletions cmd/explaintest/r/explain_complex.result
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,28 @@ Sort 1.00 root test.org_department.left_value
└─TableReader(Probe) 9.99 root data:Selection
└─Selection 9.99 cop[tikv] eq(test.org_employee_position.status, 1000), not(isnull(test.org_employee_position.position_id))
└─TableFullScan 10000.00 cop[tikv] table:ep keep order:false, stats:pseudo
create table test.Tab_A (id int primary key,bid int,cid int,name varchar(20),type varchar(20),num int,amt decimal(11,2));
create table test.Tab_B (id int primary key,name varchar(20));
create table test.Tab_C (id int primary key,name varchar(20),amt decimal(11,2));
insert into test.Tab_A values(2,2,2,'A01','01',112,111);
insert into test.Tab_A values(4,4,4,'A02','02',112,111);
insert into test.Tab_B values(2,'B01');
insert into test.Tab_B values(4,'B02');
insert into test.Tab_C values(2,'C01',22);
insert into test.Tab_C values(4,'C01',5);
explain select Tab_A.name AAA,Tab_B.name BBB,Tab_A.amt Aamt, Tab_C.amt Bamt,IFNULL(Tab_C.amt, 0) FROM Tab_A left join Tab_B on Tab_A.bid=Tab_B.id left join Tab_C on Tab_A.cid=Tab_C.id and Tab_A.type='01' where Tab_A.num=112;
id estRows task access object operator info
Projection_8 15.62 root test.tab_a.name, test.tab_b.name, test.tab_a.amt, test.tab_c.amt, ifnull(test.tab_c.amt, 0)->Column#13
└─IndexJoin_13 15.62 root left outer join, inner:TableReader_10, outer key:test.tab_a.cid, inner key:test.tab_c.id, equal cond:eq(test.tab_a.cid, test.tab_c.id), left cond:eq(test.tab_a.type, "01")
├─IndexJoin_24(Build) 12.50 root left outer join, inner:TableReader_21, outer key:test.tab_a.bid, inner key:test.tab_b.id, equal cond:eq(test.tab_a.bid, test.tab_b.id)
│ ├─TableReader_33(Build) 10.00 root data:Selection_32
│ │ └─Selection_32 10.00 cop[tikv] eq(test.tab_a.num, 112)
│ │ └─TableFullScan_31 10000.00 cop[tikv] table:Tab_A keep order:false, stats:pseudo
│ └─TableReader_21(Probe) 1.00 root data:TableRangeScan_20
│ └─TableRangeScan_20 1.00 cop[tikv] table:Tab_B range: decided by [test.tab_a.bid], keep order:false, stats:pseudo
└─TableReader_10(Probe) 1.00 root data:TableRangeScan_9
└─TableRangeScan_9 1.00 cop[tikv] table:Tab_C range: decided by [test.tab_a.cid], keep order:false, stats:pseudo
select Tab_A.name AAA,Tab_B.name BBB,Tab_A.amt Aamt, Tab_C.amt Bamt,IFNULL(Tab_C.amt, 0) FROM Tab_A left join Tab_B on Tab_A.bid=Tab_B.id left join Tab_C on Tab_A.cid=Tab_C.id and Tab_A.type='01' where Tab_A.num=112;
AAA BBB Aamt Bamt IFNULL(Tab_C.amt, 0)
A01 B01 111.00 22.00 22.00
A02 B02 111.00 NULL 0
2 changes: 2 additions & 0 deletions cmd/explaintest/r/select.result
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
set @@tidb_enable_outer_join_reorder=true;
DROP TABLE IF EXISTS t;
CREATE TABLE t (
c1 int,
Expand Down Expand Up @@ -654,3 +655,4 @@ create table t3(a char(10), primary key (a));
insert into t3 values ('a');
select * from t3 where a > 0x80;
Error 1105: Cannot convert string '\x80' from binary to utf8mb4
set @@tidb_enable_outer_join_reorder=false;
2 changes: 2 additions & 0 deletions cmd/explaintest/t/clustered_index.test
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
set @@tidb_enable_outer_join_reorder=true;
drop database if exists with_cluster_index;
create database with_cluster_index;
drop database if exists wout_cluster_index;
Expand Down Expand Up @@ -53,3 +54,4 @@ explain select count(*) from wout_cluster_index.tbl_0 where col_0 <= 0 ;

explain select count(*) from with_cluster_index.tbl_0 where col_0 >= 803163 ;
explain select count(*) from wout_cluster_index.tbl_0 where col_0 >= 803163 ;
set @@tidb_enable_outer_join_reorder=false;
16 changes: 16 additions & 0 deletions cmd/explaintest/t/explain_complex.test
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,19 @@ CREATE TABLE org_position (
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

explain format = 'brief' SELECT d.id, d.ctx, d.name, d.left_value, d.right_value, d.depth, d.leader_id, d.status, d.created_on, d.updated_on FROM org_department AS d LEFT JOIN org_position AS p ON p.department_id = d.id AND p.status = 1000 LEFT JOIN org_employee_position AS ep ON ep.position_id = p.id AND ep.status = 1000 WHERE (d.ctx = 1 AND (ep.user_id = 62 OR d.id = 20 OR d.id = 20) AND d.status = 1000) GROUP BY d.id ORDER BY d.left_value;

create table test.Tab_A (id int primary key,bid int,cid int,name varchar(20),type varchar(20),num int,amt decimal(11,2));
create table test.Tab_B (id int primary key,name varchar(20));
create table test.Tab_C (id int primary key,name varchar(20),amt decimal(11,2));

insert into test.Tab_A values(2,2,2,'A01','01',112,111);
insert into test.Tab_A values(4,4,4,'A02','02',112,111);
insert into test.Tab_B values(2,'B01');
insert into test.Tab_B values(4,'B02');
insert into test.Tab_C values(2,'C01',22);
insert into test.Tab_C values(4,'C01',5);

explain select Tab_A.name AAA,Tab_B.name BBB,Tab_A.amt Aamt, Tab_C.amt Bamt,IFNULL(Tab_C.amt, 0) FROM Tab_A left join Tab_B on Tab_A.bid=Tab_B.id left join Tab_C on Tab_A.cid=Tab_C.id and Tab_A.type='01' where Tab_A.num=112;

select Tab_A.name AAA,Tab_B.name BBB,Tab_A.amt Aamt, Tab_C.amt Bamt,IFNULL(Tab_C.amt, 0) FROM Tab_A left join Tab_B on Tab_A.bid=Tab_B.id left join Tab_C on Tab_A.cid=Tab_C.id and Tab_A.type='01' where Tab_A.num=112;

2 changes: 2 additions & 0 deletions cmd/explaintest/t/select.test
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
set @@tidb_enable_outer_join_reorder=true;
DROP TABLE IF EXISTS t;

CREATE TABLE t (
Expand Down Expand Up @@ -279,3 +280,4 @@ create table t3(a char(10), primary key (a));
insert into t3 values ('a');
--error 1105
select * from t3 where a > 0x80;
set @@tidb_enable_outer_join_reorder=false;
4 changes: 4 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2119,6 +2119,10 @@ func TestAlterTableExchangePartition(t *testing.T) {
tk.MustExec("alter table e18 exchange partition p0 with table e17")
tk.MustQuery("select * /*+ read_from_storage(tiflash[e18]) */ from e18").Check(testkit.Rows("1"))
tk.MustQuery("select * /*+ read_from_storage(tiflash[e17]) */ from e17").Check(testkit.Rows("2"))

tk.MustExec("create table e19 (a int) partition by hash(a) partitions 1")
tk.MustExec("create temporary table e20 (a int)")
tk.MustGetErrCode("alter table e19 exchange partition p0 with table e20", errno.ErrPartitionExchangeTempTable)
}

func TestExchangePartitionTableCompatiable(t *testing.T) {
Expand Down
18 changes: 18 additions & 0 deletions ddl/lightning/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "lightning",
srcs = ["mem_root.go"],
importpath = "github.com/pingcap/tidb/ddl/lightning",
visibility = ["//visibility:public"],
)

go_test(
name = "lightning_test",
srcs = ["mem_root_test.go"],
flaky = True,
deps = [
":lightning",
"@com_github_stretchr_testify//require",
],
)
115 changes: 115 additions & 0 deletions ddl/lightning/mem_root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lightning

import (
"sync"
)

// MemRoot is used to track the memory usage for the lightning backfill process.
type MemRoot interface {
Consume(size int64)
Release(size int64)
TestConsume(size int64) bool
ConsumeWithTag(tag string, size int64)
ReleaseWithTag(tag string)

SetMaxMemoryQuota(quota int64)
MaxMemoryQuota() int64
CurrentUsage() int64
CurrentUsageWithTag(tag string) int64
}

// memRootImpl is an implementation of MemRoot.
type memRootImpl struct {
maxLimit int64
currUsage int64
structSize map[string]int64
mu sync.RWMutex
}

// NewMemRootImpl creates a new memRootImpl.
func NewMemRootImpl(maxQuota int64) *memRootImpl {
return &memRootImpl{
maxLimit: maxQuota,
currUsage: 0,
structSize: make(map[string]int64, 10),
}
}

// SetMaxMemoryQuota implements MemRoot.
func (m *memRootImpl) SetMaxMemoryQuota(maxQuota int64) {
m.mu.Lock()
defer m.mu.Unlock()
m.maxLimit = maxQuota
}

// MaxMemoryQuota implements MemRoot.
func (m *memRootImpl) MaxMemoryQuota() int64 {
m.mu.RLock()
defer m.mu.RUnlock()
return m.maxLimit
}

// CurrentUsage implements MemRoot.
func (m *memRootImpl) CurrentUsage() int64 {
m.mu.RLock()
defer m.mu.RUnlock()
return m.currUsage
}

// CurrentUsageWithTag implements MemRoot.
func (m *memRootImpl) CurrentUsageWithTag(tag string) int64 {
m.mu.RLock()
defer m.mu.RUnlock()
return m.structSize[tag]
}

// Consume implements MemRoot.
func (m *memRootImpl) Consume(size int64) {
m.mu.Lock()
defer m.mu.Unlock()
m.currUsage += size
}

// Release implements MemRoot.
func (m *memRootImpl) Release(size int64) {
m.mu.Lock()
defer m.mu.Unlock()
m.currUsage -= size
}

// ConsumeWithTag implements MemRoot.
func (m *memRootImpl) ConsumeWithTag(tag string, size int64) {
m.mu.Lock()
defer m.mu.Unlock()
m.currUsage += size
m.structSize[tag] = size
}

// TestConsume implements MemRoot.
func (m *memRootImpl) TestConsume(size int64) bool {
m.mu.RLock()
defer m.mu.RUnlock()
return m.currUsage+size <= m.maxLimit
}

// ReleaseWithTag implements MemRoot.
func (m *memRootImpl) ReleaseWithTag(tag string) {
m.mu.Lock()
defer m.mu.Unlock()
m.currUsage -= m.structSize[tag]
delete(m.structSize, tag)
}
60 changes: 60 additions & 0 deletions ddl/lightning/mem_root_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lightning_test

import (
"testing"

"github.com/pingcap/tidb/ddl/lightning"
"github.com/stretchr/testify/require"
)

func TestMemoryRoot(t *testing.T) {
memRoot := lightning.MemRoot(lightning.NewMemRootImpl(1024))
require.Equal(t, int64(1024), memRoot.MaxMemoryQuota())
require.Equal(t, int64(0), memRoot.CurrentUsage())

require.True(t, memRoot.TestConsume(1023))
require.True(t, memRoot.TestConsume(1024))
require.False(t, memRoot.TestConsume(1025))

memRoot.Consume(512)
require.Equal(t, int64(512), memRoot.CurrentUsage())
require.True(t, memRoot.TestConsume(512))
require.False(t, memRoot.TestConsume(513))
require.Equal(t, int64(1024), memRoot.MaxMemoryQuota())

memRoot.Release(10)
require.Equal(t, int64(502), memRoot.CurrentUsage())
require.Equal(t, int64(1024), memRoot.MaxMemoryQuota())
memRoot.SetMaxMemoryQuota(512)
require.False(t, memRoot.TestConsume(20)) // 502+20 > 512
memRoot.Release(502)

require.Equal(t, int64(0), memRoot.CurrentUsage())
memRoot.SetMaxMemoryQuota(1024)
memRoot.ConsumeWithTag("a", 512)
memRoot.ConsumeWithTag("b", 512)
require.Equal(t, int64(1024), memRoot.CurrentUsage())
require.False(t, memRoot.TestConsume(1))
memRoot.ReleaseWithTag("a")
require.Equal(t, int64(512), memRoot.CurrentUsage())

memRoot.ReleaseWithTag("a") // Double release.
require.Equal(t, int64(512), memRoot.CurrentUsage())
require.True(t, memRoot.TestConsume(10))
memRoot.Consume(10) // Mix usage of tag and non-tag.
require.Equal(t, int64(522), memRoot.CurrentUsage())
}
4 changes: 0 additions & 4 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,6 @@ func alterTablePartitionBundles(t *meta.Meta, tblInfo *model.TableInfo, addingDe
p.Definitions = append(tblInfo.Partition.Definitions, addingDefinitions...)
tblInfo.Partition = &p

if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Count > 0 && tableHasPlacementSettings(tblInfo) {
return nil, errors.Trace(dbterror.ErrIncompatibleTiFlashAndPlacement)
}

// bundle for table should be recomputed because it includes some default configs for partitions
tblBundle, err := placement.NewTableBundle(t, tblInfo)
if err != nil {
Expand Down
16 changes: 0 additions & 16 deletions ddl/placement_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,19 +417,3 @@ func checkPlacementPolicyNotUsedByTable(tblInfo *model.TableInfo, policy *model.

return nil
}

func tableHasPlacementSettings(tblInfo *model.TableInfo) bool {
if tblInfo.PlacementPolicyRef != nil {
return true
}

if tblInfo.Partition != nil {
for _, def := range tblInfo.Partition.Definitions {
if def.PlacementPolicyRef != nil {
return true
}
}
}

return false
}
Loading

0 comments on commit ef78378

Please sign in to comment.