Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix static pruning partition table in disaggregated tiflash mode #40238

Merged
merged 28 commits into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
cd17496
planner: refine planner code in disaggregated tiflash mode
guo-shaoge Dec 10, 2022
1f79552
better refine(consider task type)
guo-shaoge Dec 10, 2022
fd77409
fix planner(Selection); fix gcworker
guo-shaoge Dec 11, 2022
3d94b23
refine case and failpoint
guo-shaoge Dec 11, 2022
e266256
delete unnecessary var
guo-shaoge Dec 11, 2022
cd3fcaf
fix lint
guo-shaoge Dec 11, 2022
8f32c37
Merge branch 'master' into disaggregate_planner_error
guo-shaoge Dec 12, 2022
e298492
Merge branch 'master' of github.com:pingcap/tidb into disaggregate_pl…
guo-shaoge Dec 12, 2022
a1f7042
fix unit-test case (window)
guo-shaoge Dec 12, 2022
b1b4657
Merge branch 'disaggregate_planner_error' of github.com:guo-shaoge/ti…
guo-shaoge Dec 12, 2022
e8fd626
Merge branch 'master' of github.com:pingcap/tidb into disaggregate_pl…
guo-shaoge Dec 12, 2022
2d626f5
*: fix static pruning in disaggregated tiflash
guo-shaoge Dec 17, 2022
3534420
Merge branch 'master' of github.com:pingcap/tidb into disaggregated_s…
guo-shaoge Dec 29, 2022
2f86b3c
fix fmt
guo-shaoge Dec 29, 2022
e5e7137
add case
guo-shaoge Dec 29, 2022
1b2a9d2
update comment; fix bazel lint
guo-shaoge Dec 29, 2022
78d0646
update case
guo-shaoge Dec 29, 2022
160fd90
fix lint
guo-shaoge Dec 29, 2022
ecefc09
Merge branch 'master' of github.com:pingcap/tidb into disaggregated_s…
guo-shaoge Jan 2, 2023
7784593
Merge branch 'master' into disaggregated_static_part_error
guo-shaoge Jan 4, 2023
b939df4
fix var declaration
guo-shaoge Jan 4, 2023
3087e81
Revert "fix var declaration"
guo-shaoge Jan 4, 2023
e5d1f9a
Merge branch 'disaggregated_static_part_error' of github.com:guo-shao…
guo-shaoge Jan 4, 2023
212f34d
Merge branch 'master' into disaggregated_static_part_error
guo-shaoge Jan 4, 2023
806b83d
Merge branch 'master' into disaggregated_static_part_error
ti-chi-bot Jan 6, 2023
6cf1b6f
Merge branch 'master' into disaggregated_static_part_error
ti-chi-bot Jan 6, 2023
9c00062
Merge branch 'master' into disaggregated_static_part_error
ti-chi-bot Jan 6, 2023
34d9453
Merge branch 'master' into disaggregated_static_part_error
ti-chi-bot Jan 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1427,7 +1427,7 @@ func (do *Domain) LoadSysVarCacheLoop(ctx sessionctx.Context) error {
// WatchTiFlashComputeNodeChange create a routine to watch if the topology of tiflash_compute node is changed.
// TODO: tiflashComputeNodeKey is not put to etcd yet(finish this when AutoScaler is done)
//
// store cache will only be invalidated every 30 seconds.
// store cache will only be invalidated every n seconds.
func (do *Domain) WatchTiFlashComputeNodeChange() error {
var watchCh clientv3.WatchChan
if do.etcdClient != nil {
Expand Down Expand Up @@ -1468,8 +1468,8 @@ func (do *Domain) WatchTiFlashComputeNodeChange() error {
case tikv.Storage:
logCount++
s.GetRegionCache().InvalidateTiFlashComputeStores()
if logCount == 60 {
// Print log every 60*duration seconds.
if logCount == 6 {
// Print log every 6*duration seconds.
logutil.BgLogger().Debug("tiflash_compute store cache invalied, will update next query", zap.Bool("watched", watched))
logCount = 0
}
Expand Down
4 changes: 3 additions & 1 deletion executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
for _, mppTask := range pf.ExchangeSender.Tasks {
if mppTask.PartitionTableIDs != nil {
err = updateExecutorTableID(context.Background(), dagReq.RootExecutor, true, mppTask.PartitionTableIDs)
} else {
} else if !mppTask.IsDisaggregatedTiFlashStaticPrune {
// If isDisaggregatedTiFlashStaticPrune is true, it means this TableScan is under PartitionUnoin,
// tableID in TableScan is already the physical table id of this partition, no need to update again.
err = updateExecutorTableID(context.Background(), dagReq.RootExecutor, true, []int64{mppTask.TableID})
}
if err != nil {
Expand Down
24 changes: 24 additions & 0 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1308,4 +1308,28 @@ func TestDisaggregatedTiFlashQuery(t *testing.T) {
failpoint.Enable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery", fmt.Sprintf("return(%s)", needCheckTiFlashComputeNode))
defer failpoint.Disable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery")
tk.MustExec("explain select max( tbl_1.col_1 ) as r0 , sum( tbl_1.col_1 ) as r1 , sum( tbl_1.col_8 ) as r2 from tbl_1 where tbl_1.col_8 != 68 or tbl_1.col_3 between null and 939 order by r0,r1,r2;")

tk.MustExec("set @@tidb_partition_prune_mode = 'static';")
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
tk.MustExec("create table t1(c1 int, c2 int) partition by hash(c1) partitions 3")
tk.MustExec("insert into t1 values(1, 1), (2, 2), (3, 3)")
tk.MustExec("alter table t1 set tiflash replica 1")
tb = external.GetTableByName(t, tk, "test", "t1")
err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustQuery("explain select * from t1 where c1 < 2").Check(testkit.Rows(
"PartitionUnion_10 9970.00 root ",
"├─TableReader_15 3323.33 root data:ExchangeSender_14",
"│ └─ExchangeSender_14 3323.33 mpp[tiflash] ExchangeType: PassThrough",
"│ └─Selection_13 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
"│ └─TableFullScan_12 10000.00 mpp[tiflash] table:t1, partition:p0 keep order:false, stats:pseudo",
"├─TableReader_19 3323.33 root data:ExchangeSender_18",
"│ └─ExchangeSender_18 3323.33 mpp[tiflash] ExchangeType: PassThrough",
"│ └─Selection_17 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
"│ └─TableFullScan_16 10000.00 mpp[tiflash] table:t1, partition:p1 keep order:false, stats:pseudo",
"└─TableReader_23 3323.33 root data:ExchangeSender_22",
" └─ExchangeSender_22 3323.33 mpp[tiflash] ExchangeType: PassThrough",
" └─Selection_21 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
" └─TableFullScan_20 10000.00 mpp[tiflash] table:t1, partition:p2 keep order:false, stats:pseudo"))
// tk.MustQuery("select * from t1 where c1 < 2").Check(testkit.Rows("1 1"))
}
3 changes: 2 additions & 1 deletion kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type MPPTask struct {
MppQueryID MPPQueryID
TableID int64 // physical table id

PartitionTableIDs []int64
PartitionTableIDs []int64
IsDisaggregatedTiFlashStaticPrune bool
}

// ToPB generates the pb structure.
Expand Down
3 changes: 2 additions & 1 deletion planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2001,8 +2001,9 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
if ts.KeepOrder {
return invalidTask, nil
}
if prop.MPPPartitionTp != property.AnyType || ts.isPartition {
if prop.MPPPartitionTp != property.AnyType || (ts.isPartition && !canMppConvertToRootForDisaggregatedTiFlash) {
// If ts is a single partition, then this partition table is in static-only prune, then we should not choose mpp execution.
// But in disaggregated tiflash mode, we can only use mpp, so we add ExchangeSender and ExchangeReceiver above TableScan for static pruning partition table.
ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because table `" + ds.tableInfo.Name.O + "`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`.")
return invalidTask, nil
}
Expand Down
45 changes: 31 additions & 14 deletions planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -375,17 +376,30 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic
var allPartitionsIDs []int64
var err error
splitedRanges, _ := distsql.SplitRangesAcrossInt64Boundary(ts.Ranges, false, false, ts.Table.IsCommonHandle)
// True when:
// 0. Is disaggregated tiflash. because in non-disaggregated tiflash, we dont use mpp for static pruning.
// 1. Is partition table.
// 2. Dynamic prune is not used.
var isDisaggregatedTiFlashStaticPrune bool
if ts.Table.GetPartitionInfo() != nil {
isDisaggregatedTiFlashStaticPrune = config.GetGlobalConfig().DisaggregatedTiFlash &&
!e.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune()

tmp, _ := e.is.TableByID(ts.Table.ID)
tbl := tmp.(table.PartitionedTable)
var partitions []table.PhysicalTable
partitions, err = partitionPruning(e.ctx, tbl, ts.PartitionInfo.PruningConds, ts.PartitionInfo.PartitionNames, ts.PartitionInfo.Columns, ts.PartitionInfo.ColumnNames)
if err != nil {
return nil, errors.Trace(err)
if !isDisaggregatedTiFlashStaticPrune {
var partitions []table.PhysicalTable
partitions, err = partitionPruning(e.ctx, tbl, ts.PartitionInfo.PruningConds, ts.PartitionInfo.PartitionNames, ts.PartitionInfo.Columns, ts.PartitionInfo.ColumnNames)
if err != nil {
return nil, errors.Trace(err)
}
req, allPartitionsIDs, err = e.constructMPPBuildTaskReqForPartitionedTable(ts, splitedRanges, partitions)
} else {
singlePartTbl := tbl.GetPartition(ts.physicalTableID)
req, err = e.constructMPPBuildTaskForNonPartitionTable(singlePartTbl.GetPhysicalID(), ts.Table.IsCommonHandle, splitedRanges)
}
req, allPartitionsIDs, err = e.constructMPPBuildTaskReqForPartitionedTable(ts, splitedRanges, partitions)
} else {
req, err = e.constructMPPBuildTaskForNonPartitionTable(ts, splitedRanges)
req, err = e.constructMPPBuildTaskForNonPartitionTable(ts.Table.ID, ts.Table.IsCommonHandle, splitedRanges)
}
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -403,12 +417,15 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic

tasks := make([]*kv.MPPTask, 0, len(metas))
for _, meta := range metas {
task := &kv.MPPTask{Meta: meta,
ID: AllocMPPTaskID(e.ctx),
StartTs: e.startTS,
MppQueryID: e.mppQueryID,
TableID: ts.Table.ID,
PartitionTableIDs: allPartitionsIDs}
task := &kv.MPPTask{
Meta: meta,
ID: AllocMPPTaskID(e.ctx),
StartTs: e.startTS,
MppQueryID: e.mppQueryID,
TableID: ts.Table.ID,
PartitionTableIDs: allPartitionsIDs,
IsDisaggregatedTiFlashStaticPrune: isDisaggregatedTiFlashStaticPrune,
}
tasks = append(tasks, task)
}
return tasks, nil
Expand All @@ -435,8 +452,8 @@ func (e *mppTaskGenerator) constructMPPBuildTaskReqForPartitionedTable(ts *Physi
return &kv.MPPBuildTasksRequest{PartitionIDAndRanges: partitionIDAndRanges}, allPartitionsIDs, nil
}

func (e *mppTaskGenerator) constructMPPBuildTaskForNonPartitionTable(ts *PhysicalTableScan, splitedRanges []*ranger.Range) (*kv.MPPBuildTasksRequest, error) {
kvRanges, err := distsql.TableHandleRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, []int64{ts.Table.ID}, ts.Table.IsCommonHandle, splitedRanges, nil)
func (e *mppTaskGenerator) constructMPPBuildTaskForNonPartitionTable(tid int64, isCommonHandle bool, splitedRanges []*ranger.Range) (*kv.MPPBuildTasksRequest, error) {
kvRanges, err := distsql.TableHandleRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, []int64{tid}, isCommonHandle, splitedRanges, nil)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down