Skip to content

Commit

Permalink
planner, util: move FastIntSet to util (#47723)
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros authored Oct 17, 2023
1 parent aebf22d commit 204d780
Show file tree
Hide file tree
Showing 13 changed files with 195 additions and 168 deletions.
2 changes: 1 addition & 1 deletion pkg/expression/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ go_library(
"//pkg/parser/opcode",
"//pkg/parser/terror",
"//pkg/parser/types",
"//pkg/planner/funcdep",
"//pkg/privilege",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
Expand All @@ -98,6 +97,7 @@ go_library(
"//pkg/util/encrypt",
"//pkg/util/generatedexpr",
"//pkg/util/hack",
"//pkg/util/intset",
"//pkg/util/logutil",
"//pkg/util/mathutil",
"//pkg/util/mock",
Expand Down
26 changes: 13 additions & 13 deletions pkg/expression/grouping_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (

"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/mysql"
fd "github.com/pingcap/tidb/pkg/planner/funcdep"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/util/intset"
"github.com/pingcap/tidb/pkg/util/size"
"github.com/pingcap/tipb/go-tipb"
)
Expand Down Expand Up @@ -178,7 +178,7 @@ func (gss GroupingSets) TargetOne(normalAggArgs []Expression) int {
return 0
}
// for other normal agg args like: count(a), count(a+b), count(not(a is null)) and so on.
normalAggArgsIDSet := fd.NewFastIntSet()
normalAggArgsIDSet := intset.NewFastIntSet()
for _, one := range columnInNormalAggArgs {
normalAggArgsIDSet.Insert(int(one.UniqueID))
}
Expand All @@ -202,7 +202,7 @@ func (gss GroupingSets) TargetOne(normalAggArgs []Expression) int {
func (gss GroupingSets) NeedCloneColumn() bool {
// for grouping sets like: {<a,c>},{<c>} / {<a,c>},{<b,c>}
// the column c should be copied one more time here, otherwise it will be filled with null values and not visible for the other grouping set again.
setIDs := make([]*fd.FastIntSet, 0, len(gss))
setIDs := make([]*intset.FastIntSet, 0, len(gss))
for _, groupingSet := range gss {
setIDs = append(setIDs, groupingSet.AllColIDs())
}
Expand Down Expand Up @@ -231,8 +231,8 @@ func (gs GroupingSet) IsEmpty() bool {
}

// AllColIDs collect all the grouping col's uniqueID. (here assuming that all the grouping expressions are single col)
func (gs GroupingSet) AllColIDs() *fd.FastIntSet {
res := fd.NewFastIntSet()
func (gs GroupingSet) AllColIDs() *intset.FastIntSet {
res := intset.NewFastIntSet()
for _, groupingExprs := range gs {
// on the condition that every grouping expression is single column.
// eg: group by a, b, c
Expand Down Expand Up @@ -313,8 +313,8 @@ func (gss GroupingSets) IsEmpty() bool {
}

// AllSetsColIDs is used to collect all the column id inside into a fast int set.
func (gss GroupingSets) AllSetsColIDs() *fd.FastIntSet {
res := fd.NewFastIntSet()
func (gss GroupingSets) AllSetsColIDs() *intset.FastIntSet {
res := intset.NewFastIntSet()
for _, groupingSet := range gss {
res.UnionWith(*groupingSet.AllColIDs())
}
Expand Down Expand Up @@ -361,8 +361,8 @@ func (g GroupingExprs) IsEmpty() bool {

// SubSetOf is used to do the logical computation of subset between two grouping expressions.
func (g GroupingExprs) SubSetOf(other GroupingExprs) bool {
oldOne := fd.NewFastIntSet()
newOne := fd.NewFastIntSet()
oldOne := intset.NewFastIntSet()
newOne := intset.NewFastIntSet()
for _, one := range g {
oldOne.Insert(int(one.(*Column).UniqueID))
}
Expand All @@ -373,8 +373,8 @@ func (g GroupingExprs) SubSetOf(other GroupingExprs) bool {
}

// IDSet is used to collect column ids inside grouping expressions into a fast int set.
func (g GroupingExprs) IDSet() *fd.FastIntSet {
res := fd.NewFastIntSet()
func (g GroupingExprs) IDSet() *intset.FastIntSet {
res := intset.NewFastIntSet()
for _, one := range g {
res.Insert(int(one.(*Column).UniqueID))
}
Expand Down Expand Up @@ -493,7 +493,7 @@ func AdjustNullabilityFromGroupingSets(gss GroupingSets, schema *Schema) {
// set, so it won't be filled with null value at any time, the nullable change is unnecessary.
groupingIDs := gss.AllSetsColIDs()
// cache the grouping ids set to avoid fetch them multi times below.
groupingIDsSlice := make([]*fd.FastIntSet, 0, len(gss))
groupingIDsSlice := make([]*intset.FastIntSet, 0, len(gss))
for _, oneGroupingSet := range gss {
groupingIDsSlice = append(groupingIDsSlice, oneGroupingSet.AllColIDs())
}
Expand Down Expand Up @@ -570,7 +570,7 @@ func (gss GroupingSets) DistinctSize() (int, []uint64, map[int]map[uint64]struct
func (gss GroupingSets) DistinctSizeWithThreshold(N int) (int, []uint64, map[int]map[uint64]struct{}) {
// all the group by item are col, deduplicate from id-set.
distinctGroupingIDsPos := make([]int, 0, len(gss))
originGroupingIDsSlice := make([]*fd.FastIntSet, 0, len(gss))
originGroupingIDsSlice := make([]*intset.FastIntSet, 0, len(gss))

for _, oneGroupingSet := range gss {
curIDs := oneGroupingSet.AllColIDs()
Expand Down
1 change: 1 addition & 0 deletions pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ go_library(
"//pkg/util/hack",
"//pkg/util/hint",
"//pkg/util/intest",
"//pkg/util/intset",
"//pkg/util/kvcache",
"//pkg/util/logutil",
"//pkg/util/mathutil",
Expand Down
19 changes: 10 additions & 9 deletions pkg/planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/pingcap/tidb/pkg/util/hint"
"github.com/pingcap/tidb/pkg/util/intset"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/plancodec"
"github.com/pingcap/tidb/pkg/util/set"
Expand Down Expand Up @@ -1809,7 +1810,7 @@ func (b *PlanBuilder) buildProjection(ctx context.Context, p LogicalPlan, fields
if fields[offset].AuxiliaryColInAgg {
continue
}
item := fd.NewFastIntSet()
item := intset.NewFastIntSet()
switch x := expr.(type) {
case *expression.Column:
item.Insert(int(x.UniqueID))
Expand Down Expand Up @@ -1851,7 +1852,7 @@ func (b *PlanBuilder) buildProjection(ctx context.Context, p LogicalPlan, fields
baseCols := expression.ExtractColumns(expr)
errShowCol := baseCols[0]
for _, col := range baseCols {
colSet := fd.NewFastIntSet(int(col.UniqueID))
colSet := intset.NewFastIntSet(int(col.UniqueID))
if !colSet.SubsetOf(strictClosure) {
errShowCol = col
break
Expand All @@ -1876,7 +1877,7 @@ func (b *PlanBuilder) buildProjection(ctx context.Context, p LogicalPlan, fields
}
if fds.GroupByCols.Only1Zero() {
// maxOneRow is delayed from agg's ExtractFD logic since some details listed in it.
projectionUniqueIDs := fd.NewFastIntSet()
projectionUniqueIDs := intset.NewFastIntSet()
for _, expr := range proj.Exprs {
switch x := expr.(type) {
case *expression.Column:
Expand Down Expand Up @@ -5317,15 +5318,15 @@ func (ds *DataSource) ExtractFD() *fd.FDSet {
// Once the all conditions are not equal to nil, built it again.
if ds.fdSet == nil || ds.allConds != nil {
fds := &fd.FDSet{HashCodeToUniqueID: make(map[string]int)}
allCols := fd.NewFastIntSet()
allCols := intset.NewFastIntSet()
// should use the column's unique ID avoiding fdSet conflict.
for _, col := range ds.TblCols {
// todo: change it to int64
allCols.Insert(int(col.UniqueID))
}
// int pk doesn't store its index column in indexInfo.
if ds.tableInfo.PKIsHandle {
keyCols := fd.NewFastIntSet()
keyCols := intset.NewFastIntSet()
for _, col := range ds.TblCols {
if mysql.HasPriKeyFlag(col.RetType.GetFlag()) {
keyCols.Insert(int(col.UniqueID))
Expand All @@ -5351,7 +5352,7 @@ func (ds *DataSource) ExtractFD() *fd.FDSet {
}
// other indices including common handle.
for _, idx := range ds.tableInfo.Indices {
keyCols := fd.NewFastIntSet()
keyCols := intset.NewFastIntSet()
allColIsNotNull := true
if ds.isForUpdateRead && changed {
latestIndex, ok := latestIndexes[idx.ID]
Expand Down Expand Up @@ -5410,14 +5411,14 @@ func (ds *DataSource) ExtractFD() *fd.FDSet {
// the generated column is sequentially dependent on the forward column.
// a int, b int as (a+1), c int as (b+1), here we can build the strict FD down:
// {a} -> {b}, {b} -> {c}, put the maintenance of the dependencies between generated columns to the FD graph.
notNullCols := fd.NewFastIntSet()
notNullCols := intset.NewFastIntSet()
for _, col := range ds.TblCols {
if col.VirtualExpr != nil {
dependencies := fd.NewFastIntSet()
dependencies := intset.NewFastIntSet()
dependencies.Insert(int(col.UniqueID))
// dig out just for 1 level.
directBaseCol := expression.ExtractColumns(col.VirtualExpr)
determinant := fd.NewFastIntSet()
determinant := intset.NewFastIntSet()
for _, col := range directBaseCol {
determinant.Insert(int(col.UniqueID))
}
Expand Down
55 changes: 28 additions & 27 deletions pkg/planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/intset"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/ranger"
"github.com/pingcap/tidb/pkg/util/size"
Expand Down Expand Up @@ -298,7 +299,7 @@ func (p *LogicalJoin) extractFDForOuterJoin(filtersFromApply []expression.Expres
outerFD, innerFD := p.children[0].ExtractFD(), p.children[1].ExtractFD()
innerCondition := p.RightConditions
outerCondition := p.LeftConditions
outerCols, innerCols := fd.NewFastIntSet(), fd.NewFastIntSet()
outerCols, innerCols := intset.NewFastIntSet(), intset.NewFastIntSet()
for _, col := range p.children[0].Schema().Columns {
outerCols.Insert(int(col.UniqueID))
}
Expand Down Expand Up @@ -326,7 +327,7 @@ func (p *LogicalJoin) extractFDForOuterJoin(filtersFromApply []expression.Expres
equivUniqueIDs := extractEquivalenceCols(allConds, p.SCtx(), filterFD)

filterFD.AddConstants(constUniqueIDs)
equivOuterUniqueIDs := fd.NewFastIntSet()
equivOuterUniqueIDs := intset.NewFastIntSet()
equivAcrossNum := 0
for _, equiv := range equivUniqueIDs {
filterFD.AddEquivalence(equiv[0], equiv[1])
Expand Down Expand Up @@ -354,7 +355,7 @@ func (p *LogicalJoin) extractFDForOuterJoin(filtersFromApply []expression.Expres
// other condition may contain right side cols, it doesn't affect the judgement of intersection of non-left-equiv cols.
outConditionCols = append(outConditionCols, expression.ExtractColumnsFromExpressions(nil, p.OtherConditions, nil)...)
}
outerConditionUniqueIDs := fd.NewFastIntSet()
outerConditionUniqueIDs := intset.NewFastIntSet()
for _, col := range outConditionCols {
outerConditionUniqueIDs.Insert(int(col.UniqueID))
}
Expand Down Expand Up @@ -857,8 +858,8 @@ func (p *LogicalProjection) ExtractFD() *fd.FDSet {
// basically extract the children's fdSet.
fds := p.logicalSchemaProducer.ExtractFD()
// collect the output columns' unique ID.
outputColsUniqueIDs := fd.NewFastIntSet()
notnullColsUniqueIDs := fd.NewFastIntSet()
outputColsUniqueIDs := intset.NewFastIntSet()
notnullColsUniqueIDs := intset.NewFastIntSet()
outputColsUniqueIDsArray := make([]int, 0, len(p.Schema().Columns))
// here schema extended columns may contain expr, const and column allocated with uniqueID.
for _, one := range p.Schema().Columns {
Expand All @@ -885,7 +886,7 @@ func (p *LogicalProjection) ExtractFD() *fd.FDSet {
constantUniqueID = outputColsUniqueIDsArray[idx]
fds.RegisterUniqueID(string(x.HashCode(p.SCtx().GetSessionVars().StmtCtx)), constantUniqueID)
}
fds.AddConstants(fd.NewFastIntSet(constantUniqueID))
fds.AddConstants(intset.NewFastIntSet(constantUniqueID))
case *expression.ScalarFunction:
// t1(a,b,c), t2(m,n)
// select a, (select c+n from t2 where m=b) from t1;
Expand All @@ -908,9 +909,9 @@ func (p *LogicalProjection) ExtractFD() *fd.FDSet {
} else {
// since the scalar's hash code has been registered before, the equivalence exists between the unique ID
// allocated by phase of building-projection-for-scalar and that of previous registered unique ID.
fds.AddEquivalence(fd.NewFastIntSet(scalarUniqueID), fd.NewFastIntSet(outputColsUniqueIDsArray[idx]))
fds.AddEquivalence(intset.NewFastIntSet(scalarUniqueID), intset.NewFastIntSet(outputColsUniqueIDsArray[idx]))
}
determinants := fd.NewFastIntSet()
determinants := intset.NewFastIntSet()
extractedColumns := expression.ExtractColumns(x)
extractedCorColumns := expression.ExtractCorColumns(x)
for _, one := range extractedColumns {
Expand All @@ -927,7 +928,7 @@ func (p *LogicalProjection) ExtractFD() *fd.FDSet {
if notnull || determinants.SubsetOf(fds.NotNullCols) {
notnullColsUniqueIDs.Insert(scalarUniqueID)
}
fds.AddStrictFunctionalDependency(determinants, fd.NewFastIntSet(scalarUniqueID))
fds.AddStrictFunctionalDependency(determinants, intset.NewFastIntSet(scalarUniqueID))
}
}

Expand Down Expand Up @@ -1013,10 +1014,10 @@ func (la *LogicalAggregation) ExtractFD() *fd.FDSet {
// basically extract the children's fdSet.
fds := la.logicalSchemaProducer.ExtractFD()
// collect the output columns' unique ID.
outputColsUniqueIDs := fd.NewFastIntSet()
notnullColsUniqueIDs := fd.NewFastIntSet()
groupByColsUniqueIDs := fd.NewFastIntSet()
groupByColsOutputCols := fd.NewFastIntSet()
outputColsUniqueIDs := intset.NewFastIntSet()
notnullColsUniqueIDs := intset.NewFastIntSet()
groupByColsUniqueIDs := intset.NewFastIntSet()
groupByColsOutputCols := intset.NewFastIntSet()
// Since the aggregation is build ahead of projection, the latter one will reuse the column with UniqueID allocated in aggregation
// via aggMapper, so we don't need unnecessarily maintain the <aggDes, UniqueID> mapping in the FDSet like expr did, just treating
// it as normal column.
Expand Down Expand Up @@ -1051,7 +1052,7 @@ func (la *LogicalAggregation) ExtractFD() *fd.FDSet {
fds.RegisterUniqueID(hashCode, scalarUniqueID)
groupByColsUniqueIDs.Insert(scalarUniqueID)
}
determinants := fd.NewFastIntSet()
determinants := intset.NewFastIntSet()
extractedColumns := expression.ExtractColumns(x)
extractedCorColumns := expression.ExtractCorColumns(x)
for _, one := range extractedColumns {
Expand All @@ -1066,7 +1067,7 @@ func (la *LogicalAggregation) ExtractFD() *fd.FDSet {
if notnull || determinants.SubsetOf(fds.NotNullCols) {
notnullColsUniqueIDs.Insert(scalarUniqueID)
}
fds.AddStrictFunctionalDependency(determinants, fd.NewFastIntSet(scalarUniqueID))
fds.AddStrictFunctionalDependency(determinants, intset.NewFastIntSet(scalarUniqueID))
}
}

Expand All @@ -1078,7 +1079,7 @@ func (la *LogicalAggregation) ExtractFD() *fd.FDSet {
//
// and since any_value will NOT be pushed down to agg schema, which means every firstRow aggDes in the agg logical operator
// is meaningless to build the FD with. Let's only store the non-firstRow FD down: {group by items} ~~> {real aggDes}
realAggFuncUniqueID := fd.NewFastIntSet()
realAggFuncUniqueID := intset.NewFastIntSet()
for i, aggDes := range la.AggFuncs {
if aggDes.Name != "firstrow" {
realAggFuncUniqueID.Insert(int(la.schema.Columns[i].UniqueID))
Expand All @@ -1095,7 +1096,7 @@ func (la *LogicalAggregation) ExtractFD() *fd.FDSet {
// 0 unique id is only used for here.
groupByColsUniqueIDs.Insert(0)
for i, ok := realAggFuncUniqueID.Next(0); ok; i, ok = realAggFuncUniqueID.Next(i + 1) {
fds.AddStrictFunctionalDependency(groupByColsUniqueIDs, fd.NewFastIntSet(i))
fds.AddStrictFunctionalDependency(groupByColsUniqueIDs, intset.NewFastIntSet(i))
}
} else {
// eliminating input columns that are un-projected.
Expand All @@ -1107,7 +1108,7 @@ func (la *LogicalAggregation) ExtractFD() *fd.FDSet {
// 1: it can always distinguish and group the all-null/part-null group column rows.
// 2: the rows with all/part null group column are unique row after group operation.
// 3: there won't be two same group key with different agg values, so strict FD secured.
fds.AddStrictFunctionalDependency(groupByColsUniqueIDs, fd.NewFastIntSet(i))
fds.AddStrictFunctionalDependency(groupByColsUniqueIDs, intset.NewFastIntSet(i))
}

// agg funcDes has been tag not null flag when building aggregation.
Expand Down Expand Up @@ -1211,7 +1212,7 @@ type LogicalSelection struct {
Conditions []expression.Expression
}

func extractNotNullFromConds(conditions []expression.Expression, p LogicalPlan) fd.FastIntSet {
func extractNotNullFromConds(conditions []expression.Expression, p LogicalPlan) intset.FastIntSet {
// extract the column NOT NULL rejection characteristic from selection condition.
// CNF considered only, DNF doesn't have its meanings (cause that condition's eval may don't take effect)
//
Expand All @@ -1224,7 +1225,7 @@ func extractNotNullFromConds(conditions []expression.Expression, p LogicalPlan)
// 2: `b` must be null since only `NULL is NULL` is evaluated as true.
//
// As a result, `a` will be extracted as not-null column to abound the FDSet.
notnullColsUniqueIDs := fd.NewFastIntSet()
notnullColsUniqueIDs := intset.NewFastIntSet()
for _, condition := range conditions {
var cols []*expression.Column
cols = expression.ExtractColumnsFromExpressions(cols, []expression.Expression{condition}, nil)
Expand All @@ -1237,13 +1238,13 @@ func extractNotNullFromConds(conditions []expression.Expression, p LogicalPlan)
return notnullColsUniqueIDs
}

func extractConstantCols(conditions []expression.Expression, sctx sessionctx.Context, fds *fd.FDSet) fd.FastIntSet {
func extractConstantCols(conditions []expression.Expression, sctx sessionctx.Context, fds *fd.FDSet) intset.FastIntSet {
// extract constant cols
// eg: where a=1 and b is null and (1+c)=5.
// TODO: Some columns can only be determined to be constant from multiple constraints (e.g. x <= 1 AND x >= 1)
var (
constObjs []expression.Expression
constUniqueIDs = fd.NewFastIntSet()
constUniqueIDs = intset.NewFastIntSet()
)
constObjs = expression.ExtractConstantEqColumnsOrScalar(sctx, constObjs, conditions)
for _, constObj := range constObjs {
Expand All @@ -1264,10 +1265,10 @@ func extractConstantCols(conditions []expression.Expression, sctx sessionctx.Con
return constUniqueIDs
}

func extractEquivalenceCols(conditions []expression.Expression, sctx sessionctx.Context, fds *fd.FDSet) [][]fd.FastIntSet {
func extractEquivalenceCols(conditions []expression.Expression, sctx sessionctx.Context, fds *fd.FDSet) [][]intset.FastIntSet {
var equivObjsPair [][]expression.Expression
equivObjsPair = expression.ExtractEquivalenceColumns(equivObjsPair, conditions)
equivUniqueIDs := make([][]fd.FastIntSet, 0, len(equivObjsPair))
equivUniqueIDs := make([][]intset.FastIntSet, 0, len(equivObjsPair))
for _, equivObjPair := range equivObjsPair {
// lhs of equivalence.
var (
Expand Down Expand Up @@ -1301,7 +1302,7 @@ func extractEquivalenceCols(conditions []expression.Expression, sctx sessionctx.
rhsUniqueID = scalarUniqueID
}
}
equivUniqueIDs = append(equivUniqueIDs, []fd.FastIntSet{fd.NewFastIntSet(lhsUniqueID), fd.NewFastIntSet(rhsUniqueID)})
equivUniqueIDs = append(equivUniqueIDs, []intset.FastIntSet{intset.NewFastIntSet(lhsUniqueID), intset.NewFastIntSet(rhsUniqueID)})
}
return equivUniqueIDs
}
Expand All @@ -1311,8 +1312,8 @@ func (p *LogicalSelection) ExtractFD() *fd.FDSet {
// basically extract the children's fdSet.
fds := p.baseLogicalPlan.ExtractFD()
// collect the output columns' unique ID.
outputColsUniqueIDs := fd.NewFastIntSet()
notnullColsUniqueIDs := fd.NewFastIntSet()
outputColsUniqueIDs := intset.NewFastIntSet()
notnullColsUniqueIDs := intset.NewFastIntSet()
// eg: select t2.a, count(t2.b) from t1 join t2 using (a) where t1.a = 1
// join's schema will miss t2.a while join.full schema has. since selection
// itself doesn't contain schema, extracting schema should tell them apart.
Expand Down
Loading

0 comments on commit 204d780

Please sign in to comment.