Skip to content

Commit

Permalink
planner: support reader operators cost detail (#36963)
Browse files Browse the repository at this point in the history
ref #36962
  • Loading branch information
Yisaer authored Aug 9, 2022
1 parent a849cc2 commit 7e20de6
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 47 deletions.
6 changes: 3 additions & 3 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2080,7 +2080,7 @@ func (ds *DataSource) convertToPointGet(prop *property.PhysicalProperty, candida
}

func (ds *DataSource) convertToBatchPointGet(prop *property.PhysicalProperty,
candidate *candidatePath, hashPartColName *ast.ColumnName, _ *physicalOptimizeOp) (task task) {
candidate *candidatePath, hashPartColName *ast.ColumnName, opt *physicalOptimizeOp) (task task) {
if !prop.IsSortItemEmpty() && !candidate.isMatchProp {
return invalidTask
}
Expand Down Expand Up @@ -2111,7 +2111,7 @@ func (ds *DataSource) convertToBatchPointGet(prop *property.PhysicalProperty,
batchPointGetPlan.Handles = append(batchPointGetPlan.Handles, kv.IntHandle(ran.LowVal[0].GetInt64()))
}
batchPointGetPlan.accessCols = ds.TblCols
cost = batchPointGetPlan.GetCost()
cost = batchPointGetPlan.GetCost(opt)
// Add filter condition to table plan now.
if len(candidate.path.TableFilters) > 0 {
sessVars := ds.ctx.GetSessionVars()
Expand Down Expand Up @@ -2139,7 +2139,7 @@ func (ds *DataSource) convertToBatchPointGet(prop *property.PhysicalProperty,
} else {
batchPointGetPlan.accessCols = ds.TblCols
}
cost = batchPointGetPlan.GetCost()
cost = batchPointGetPlan.GetCost(opt)
// Add index condition to table plan now.
if len(candidate.path.IndexFilters)+len(candidate.path.TableFilters) > 0 {
sessVars := ds.ctx.GetSessionVars()
Expand Down
87 changes: 58 additions & 29 deletions planner/core/plan_cost.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,21 +266,28 @@ func (p *PhysicalIndexReader) GetPlanCost(_ property.TaskType, option *PlanCostO
if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
return p.planCost, nil
}
var rowCount, rowSize, netFactor, indexPlanCost, netSeekCost float64
sqlScanConcurrency := p.ctx.GetSessionVars().DistSQLScanConcurrency()
// child's cost
childCost, err := p.indexPlan.GetPlanCost(property.CopSingleReadTaskType, option)
if err != nil {
return 0, err
}
p.planCost = childCost
indexPlanCost = childCost
p.planCost = indexPlanCost
// net I/O cost: rows * row-size * net-factor
tblStats := getTblStats(p.indexPlan)
rowSize := tblStats.GetAvgRowSize(p.ctx, p.indexPlan.Schema().Columns, true, false)
p.planCost += getCardinality(p.indexPlan, costFlag) * rowSize * getTableNetFactor(p.indexPlan)
rowSize = tblStats.GetAvgRowSize(p.ctx, p.indexPlan.Schema().Columns, true, false)
rowCount = getCardinality(p.indexPlan, costFlag)
netFactor = getTableNetFactor(p.indexPlan)
p.planCost += rowCount * rowSize * netFactor
// net seek cost
p.planCost += estimateNetSeekCost(p.indexPlan)
netSeekCost = estimateNetSeekCost(p.indexPlan)
p.planCost += netSeekCost
// consider concurrency
p.planCost /= float64(p.ctx.GetSessionVars().DistSQLScanConcurrency())
p.planCost /= float64(sqlScanConcurrency)

setPhysicalIndexReaderCostDetail(p, option.tracer, rowCount, rowSize, netFactor, netSeekCost, indexPlanCost, sqlScanConcurrency)
p.planCostInit = true
return p.planCost, nil
}
Expand All @@ -300,21 +307,27 @@ func (p *PhysicalTableReader) GetPlanCost(_ property.TaskType, option *PlanCostO
}
p.planCost = 0
netFactor := getTableNetFactor(p.tablePlan)
switch p.StoreType {
var rowCount, rowSize, netSeekCost, tableCost float64
sqlScanConcurrency := p.ctx.GetSessionVars().DistSQLScanConcurrency()
storeType := p.StoreType
switch storeType {
case kv.TiKV:
// child's cost
childCost, err := p.tablePlan.GetPlanCost(property.CopSingleReadTaskType, option)
if err != nil {
return 0, err
}
tableCost = childCost
p.planCost = childCost
// net I/O cost: rows * row-size * net-factor
rowSize := getTblStats(p.tablePlan).GetAvgRowSize(p.ctx, p.tablePlan.Schema().Columns, false, false)
p.planCost += getCardinality(p.tablePlan, costFlag) * rowSize * netFactor
rowSize = getTblStats(p.tablePlan).GetAvgRowSize(p.ctx, p.tablePlan.Schema().Columns, false, false)
rowCount = getCardinality(p.tablePlan, costFlag)
p.planCost += rowCount * rowSize * netFactor
// net seek cost
p.planCost += estimateNetSeekCost(p.tablePlan)
netSeekCost = estimateNetSeekCost(p.tablePlan)
p.planCost += netSeekCost
// consider concurrency
p.planCost /= float64(p.ctx.GetSessionVars().DistSQLScanConcurrency())
p.planCost /= float64(sqlScanConcurrency)
case kv.TiFlash:
var concurrency, rowSize, seekCost float64
_, isMPP := p.tablePlan.(*PhysicalExchangeSender)
Expand Down Expand Up @@ -357,6 +370,9 @@ func (p *PhysicalTableReader) GetPlanCost(_ property.TaskType, option *PlanCostO
p.planCost /= 1000000000
}
}
setPhysicalTableReaderCostDetail(p, option.tracer,
rowCount, rowSize, netFactor, netSeekCost, tableCost,
sqlScanConcurrency, storeType)
p.planCostInit = true
return p.planCost, nil
}
Expand Down Expand Up @@ -428,15 +444,18 @@ func (p *PhysicalTableScan) GetPlanCost(taskType property.TaskType, option *Plan
}

var selfCost float64
switch p.ctx.GetSessionVars().CostModelVersion {
var rowCount, rowSize, scanFactor float64
costModelVersion := p.ctx.GetSessionVars().CostModelVersion
switch costModelVersion {
case modelVer1: // scan cost: rows * row-size * scan-factor
scanFactor := p.ctx.GetSessionVars().GetScanFactor(p.Table)
scanFactor = p.ctx.GetSessionVars().GetScanFactor(p.Table)
if p.Desc && p.prop != nil && p.prop.ExpectedCnt >= smallScanThreshold {
scanFactor = p.ctx.GetSessionVars().GetDescScanFactor(p.Table)
}
selfCost = getCardinality(p, costFlag) * p.getScanRowSize() * scanFactor
rowCount = getCardinality(p, costFlag)
rowSize = p.getScanRowSize()
selfCost = rowCount * rowSize * scanFactor
case modelVer2: // scan cost: rows * log2(row-size) * scan-factor
var scanFactor float64
switch taskType {
case property.MppTaskType: // use a dedicated scan-factor for TiFlash
// no need to distinguish `Scan` and `DescScan` for TiFlash for now
Expand All @@ -448,16 +467,17 @@ func (p *PhysicalTableScan) GetPlanCost(taskType property.TaskType, option *Plan
}
}
// the formula `log(rowSize)` is based on experiment results
rowSize := math.Max(p.getScanRowSize(), 2.0) // to guarantee logRowSize >= 1
rowSize = math.Max(p.getScanRowSize(), 2.0) // to guarantee logRowSize >= 1
logRowSize := math.Log2(rowSize)
selfCost = getCardinality(p, costFlag) * logRowSize * scanFactor
rowCount = getCardinality(p, costFlag)
selfCost = rowCount * logRowSize * scanFactor

// give TiFlash a start-up cost to let the optimizer prefers to use TiKV to process small table scans.
if p.StoreType == kv.TiFlash {
selfCost += 2000 * logRowSize * scanFactor
}
}

setPhysicalTableOrIndexScanCostDetail(p, option.tracer, rowCount, rowSize, scanFactor, costModelVersion)
p.planCost = selfCost
p.planCostInit = true
return p.planCost, nil
Expand All @@ -471,23 +491,28 @@ func (p *PhysicalIndexScan) GetPlanCost(_ property.TaskType, option *PlanCostOpt
}

var selfCost float64
switch p.ctx.GetSessionVars().CostModelVersion {
var rowCount, rowSize, scanFactor float64
costModelVersion := p.ctx.GetSessionVars().CostModelVersion
switch costModelVersion {
case modelVer1: // scan cost: rows * row-size * scan-factor
scanFactor := p.ctx.GetSessionVars().GetScanFactor(p.Table)
scanFactor = p.ctx.GetSessionVars().GetScanFactor(p.Table)
if p.Desc && p.prop != nil && p.prop.ExpectedCnt >= smallScanThreshold {
scanFactor = p.ctx.GetSessionVars().GetDescScanFactor(p.Table)
}
selfCost = getCardinality(p, costFlag) * p.getScanRowSize() * scanFactor
rowCount = getCardinality(p, costFlag)
rowSize = p.getScanRowSize()
selfCost = rowCount * rowSize * scanFactor
case modelVer2:
scanFactor := p.ctx.GetSessionVars().GetScanFactor(p.Table)
scanFactor = p.ctx.GetSessionVars().GetScanFactor(p.Table)
if p.Desc {
scanFactor = p.ctx.GetSessionVars().GetDescScanFactor(p.Table)
}
rowSize := math.Max(p.getScanRowSize(), 2.0)
rowCount = getCardinality(p, costFlag)
rowSize = math.Max(p.getScanRowSize(), 2.0)
logRowSize := math.Log2(rowSize)
selfCost = getCardinality(p, costFlag) * logRowSize * scanFactor
selfCost = rowCount * logRowSize * scanFactor
}

setPhysicalTableOrIndexScanCostDetail(p, option.tracer, rowCount, rowSize, scanFactor, costModelVersion)
p.planCost = selfCost
p.planCostInit = true
return p.planCost, nil
Expand Down Expand Up @@ -1173,7 +1198,7 @@ func (p *PhysicalTopN) GetPlanCost(taskType property.TaskType, option *PlanCostO
}

// GetCost returns cost of the PointGetPlan.
func (p *BatchPointGetPlan) GetCost() float64 {
func (p *BatchPointGetPlan) GetCost(opt *physicalOptimizeOp) float64 {
cols := p.accessCols
if cols == nil {
return 0 // the cost of BatchGet generated in fast plan optimization is always 0
Expand All @@ -1188,9 +1213,13 @@ func (p *BatchPointGetPlan) GetCost() float64 {
rowCount = float64(len(p.IndexValues))
rowSize = p.stats.HistColl.GetIndexAvgRowSize(p.ctx, cols, p.IndexInfo.Unique)
}
cost += rowCount * rowSize * sessVars.GetNetworkFactor(p.TblInfo)
cost += rowCount * sessVars.GetSeekFactor(p.TblInfo)
cost /= float64(sessVars.DistSQLScanConcurrency())
networkFactor := sessVars.GetNetworkFactor(p.TblInfo)
seekFactor := sessVars.GetSeekFactor(p.TblInfo)
scanConcurrency := sessVars.DistSQLScanConcurrency()
cost += rowCount * rowSize * networkFactor
cost += rowCount * seekFactor
cost /= float64(scanConcurrency)
setBatchPointGetPlanCostDetail(p, opt, rowCount, rowSize, networkFactor, seekFactor, scanConcurrency)
return cost
}

Expand All @@ -1200,7 +1229,7 @@ func (p *BatchPointGetPlan) GetPlanCost(_ property.TaskType, option *PlanCostOpt
if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
return p.planCost, nil
}
p.planCost = p.GetCost()
p.planCost = p.GetCost(option.tracer)
p.planCostInit = true
return p.planCost, nil
}
Expand Down
93 changes: 93 additions & 0 deletions planner/core/plan_cost_detail.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,32 @@ package core
import (
"fmt"

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/tracing"
)

const (
// RowCountLbl indicates for rowCount
RowCountLbl = "rowCount"
// RowSizeLbl indicates rowSize
RowSizeLbl = "rowSize"

// NetworkFactorLbl indicates networkFactor
NetworkFactorLbl = "networkFactor"
// SeekFactorLbl indicates seekFactor
SeekFactorLbl = "seekFactor"
// ScanFactorLbl indicates for scanFactor
ScanFactorLbl = "scanFactor"

// ScanConcurrencyLbl indicates sql scan concurrency
ScanConcurrencyLbl = "scanConcurrency"

// NetSeekCostLbl indicates netSeek cost
NetSeekCostLbl = "netSeekCost"
// TablePlanCostLbl indicates tablePlan cost
TablePlanCostLbl = "tablePlanCost"
// IndexPlanCostLbl indicates indexPlan cost
IndexPlanCostLbl = "indexPlanCost"
)

func setPointGetPlanCostDetail(p *PointGetPlan, opt *physicalOptimizeOp,
Expand All @@ -41,3 +57,80 @@ func setPointGetPlanCostDetail(p *PointGetPlan, opt *physicalOptimizeOp,
SetDesc(fmt.Sprintf("%s*%s+%s", RowSizeLbl, NetworkFactorLbl, SeekFactorLbl))
opt.appendPlanCostDetail(detail)
}

func setBatchPointGetPlanCostDetail(p *BatchPointGetPlan, opt *physicalOptimizeOp,
rowCount, rowSize, networkFactor, seekFactor float64, scanConcurrency int) {
if opt == nil {
return
}
detail := tracing.NewPhysicalPlanCostDetail(p.ID(), p.TP())
detail.AddParam(RowCountLbl, rowCount).
AddParam(RowSizeLbl, rowSize).
AddParam(NetworkFactorLbl, networkFactor).
AddParam(SeekFactorLbl, seekFactor).
AddParam(ScanConcurrencyLbl, scanConcurrency).
SetDesc(fmt.Sprintf("(%s*%s*%s+%s*%s)/%s",
RowCountLbl, RowSizeLbl, NetworkFactorLbl, RowCountLbl, SeekFactorLbl, ScanConcurrencyLbl))
opt.appendPlanCostDetail(detail)
}

func setPhysicalTableOrIndexScanCostDetail(p PhysicalPlan, opt *physicalOptimizeOp,
rowCount, rowSize, scanFactor float64, costModelVersion int) {
if opt == nil {
return
}
_, ok1 := p.(*PhysicalTableScan)
_, ok2 := p.(*PhysicalIndexScan)
if !ok1 && !ok2 {
return
}
detail := tracing.NewPhysicalPlanCostDetail(p.ID(), p.TP())
detail.AddParam(RowCountLbl, rowCount).
AddParam(RowSizeLbl, rowSize).
AddParam(ScanFactorLbl, scanFactor)
var desc string
if costModelVersion == modelVer1 {
desc = fmt.Sprintf("%s*%s*%s", RowCountLbl, RowSizeLbl, ScanFactorLbl)
} else {
desc = fmt.Sprintf("%s*log2(%s)*%s", RowCountLbl, RowSizeLbl, ScanFactorLbl)
}
detail.SetDesc(desc)
opt.appendPlanCostDetail(detail)
}

func setPhysicalTableReaderCostDetail(p *PhysicalTableReader, opt *physicalOptimizeOp,
rowCount, rowSize, networkFactor, netSeekCost, tablePlanCost float64,
scanConcurrency int, storeType kv.StoreType) {
// tracer haven't support non tikv plan for now
if opt == nil || storeType != kv.TiKV {
return
}
detail := tracing.NewPhysicalPlanCostDetail(p.ID(), p.TP())
detail.AddParam(RowCountLbl, rowCount).
AddParam(RowSizeLbl, rowSize).
AddParam(NetworkFactorLbl, networkFactor).
AddParam(NetSeekCostLbl, netSeekCost).
AddParam(TablePlanCostLbl, tablePlanCost).
AddParam(ScanConcurrencyLbl, scanConcurrency)
detail.SetDesc(fmt.Sprintf("(%s+%s*%s*%s+%s)/%s", TablePlanCostLbl,
RowCountLbl, RowSizeLbl, NetworkFactorLbl, NetSeekCostLbl, ScanConcurrencyLbl))
opt.appendPlanCostDetail(detail)
}

func setPhysicalIndexReaderCostDetail(p *PhysicalIndexReader, opt *physicalOptimizeOp,
rowCount, rowSize, networkFactor, netSeekCost, indexPlanCost float64,
scanConcurrency int) {
if opt == nil {
return
}
detail := tracing.NewPhysicalPlanCostDetail(p.ID(), p.TP())
detail.AddParam(RowCountLbl, rowCount).
AddParam(RowSizeLbl, rowSize).
AddParam(NetworkFactorLbl, networkFactor).
AddParam(NetSeekCostLbl, netSeekCost).
AddParam(IndexPlanCostLbl, indexPlanCost).
AddParam(ScanConcurrencyLbl, scanConcurrency)
detail.SetDesc(fmt.Sprintf("(%s+%s*%s*%s+%s)/%s", IndexPlanCostLbl,
RowCountLbl, RowSizeLbl, NetworkFactorLbl, NetSeekCostLbl, ScanConcurrencyLbl))
opt.appendPlanCostDetail(detail)
}
Loading

0 comments on commit 7e20de6

Please sign in to comment.