Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: implement disk-based sort (Part 2) #14279

Merged
merged 56 commits into from
Feb 7, 2020
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
063f0a4
merge sort
wshwsh12 Dec 30, 2019
d823392
split partition
wshwsh12 Dec 30, 2019
aca5777
mem track
wshwsh12 Dec 30, 2019
ef80a75
comments
wshwsh12 Dec 30, 2019
b22f83b
address heap comments
wshwsh12 Dec 31, 2019
2ccadc6
add action
wshwsh12 Dec 22, 2019
f6c4ebe
add comments
wshwsh12 Dec 31, 2019
d83e9af
address comments
wshwsh12 Dec 31, 2019
525d779
add some test
wshwsh12 Dec 31, 2019
c461ec7
Merge branch 'master' into sort_part2
wshwsh12 Dec 31, 2019
19ec95a
remove useless field
wshwsh12 Dec 31, 2019
76af1d8
addres comments
wshwsh12 Dec 31, 2019
556a858
fix
wshwsh12 Dec 31, 2019
90e8081
address comments
wshwsh12 Jan 2, 2020
6483d74
fix
wshwsh12 Jan 2, 2020
bd11fd7
optimize
wshwsh12 Jan 2, 2020
7af4946
fix
wshwsh12 Jan 2, 2020
3773023
add comments
wshwsh12 Jan 2, 2020
86f3f83
address comments
wshwsh12 Jan 2, 2020
da8d986
Merge remote-tracking branch 'upstream/master' into sort_part2
wshwsh12 Jan 6, 2020
0a481cb
row container
wshwsh12 Jan 7, 2020
cc5bed2
address comments
wshwsh12 Jan 7, 2020
26f072f
temp
wshwsh12 Jan 7, 2020
704bf10
address comments
wshwsh12 Jan 8, 2020
1e6e992
fix
wshwsh12 Jan 8, 2020
ed0b163
fix
wshwsh12 Jan 8, 2020
2684c06
fix
wshwsh12 Jan 8, 2020
ab3ad50
Merge remote-tracking branch 'upstream/master' into sort_part2
wshwsh12 Jan 14, 2020
934f4ce
Merge remote-tracking branch 'upstream/master' into sort_part2
wshwsh12 Jan 17, 2020
45d8a16
address commets
wshwsh12 Jan 19, 2020
0179417
Merge remote-tracking branch 'upstream/master' into sort_part2
wshwsh12 Jan 19, 2020
50ecaf1
address comments 1
wshwsh12 Jan 19, 2020
0e65c36
address comments 2
wshwsh12 Jan 19, 2020
21c6203
fix
wshwsh12 Jan 19, 2020
831aae7
s/chunk /rowcontainer
wshwsh12 Jan 20, 2020
a184a1a
try 1
wshwsh12 Jan 20, 2020
25d2f90
Merge remote-tracking branch 'upstream/master' into sort_part2
wshwsh12 Jan 31, 2020
e971c49
Merge branch 'sort_benchmark' into sort_part2
wshwsh12 Jan 31, 2020
704d86f
address
wshwsh12 Jan 31, 2020
3890061
fix regression
wshwsh12 Feb 2, 2020
8517e11
fix
wshwsh12 Feb 2, 2020
07d604e
fix
wshwsh12 Feb 2, 2020
d50b58f
fix
wshwsh12 Feb 3, 2020
f99fd4c
Merge branch 'master' into sort_part2
wshwsh12 Feb 3, 2020
777da74
address comments
wshwsh12 Feb 4, 2020
11ac747
fix ci
wshwsh12 Feb 4, 2020
476560e
Merge branch 'master' into sort_part2
sre-bot Feb 5, 2020
9e37ebb
address
wshwsh12 Feb 5, 2020
06e5ef6
address
wshwsh12 Feb 5, 2020
dae9cdf
add ut
wshwsh12 Feb 5, 2020
2aeb4a6
Merge branch 'master' into sort_part2
sre-bot Feb 6, 2020
36ecd38
fix rowcontainer data race
wshwsh12 Feb 6, 2020
09b7253
fix rowcontainer data race
wshwsh12 Feb 6, 2020
f85c0f5
Merge branch 'master' into sort_part2
sre-bot Feb 6, 2020
31a0909
Merge branch 'master' into sort_part2
wshwsh12 Feb 6, 2020
3f62898
Merge branch 'master' into sort_part2
sre-bot Feb 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 140 additions & 30 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor

import (
"context"
"encoding/base64"
"fmt"
"math/rand"
"sort"
Expand Down Expand Up @@ -46,13 +47,12 @@ var (
)

type mockDataSourceParameters struct {
schema *expression.Schema
genDataFunc func(row int, typ *types.FieldType) interface{}
ndvs []int // number of distinct values on columns[i] and zero represents no limit
orders []bool // columns[i] should be ordered if orders[i] is true
rows int // number of rows the DataSource should output
ctx sessionctx.Context
isRawDataSmall bool // false: rawData, true: rawDataSmall
schema *expression.Schema
genDataFunc func(row int, typ *types.FieldType) interface{}
ndvs []int // number of distinct values on columns[i] and zero represents no limit
orders []bool // columns[i] should be ordered if orders[i] is true
rows int // number of rows the DataSource should output
ctx sessionctx.Context
}

type mockDataSource struct {
Expand Down Expand Up @@ -154,10 +154,9 @@ func (mds *mockDataSource) randDatum(typ *types.FieldType) interface{} {
case mysql.TypeDouble:
return rand.Float64()
case mysql.TypeVarString:
if mds.p.isRawDataSmall {
return rawDataSmall
}
return rawData
buff := make([]byte, 10)
rand.Read(buff)
return base64.RawURLEncoding.EncodeToString(buff)
default:
panic("not implement")
}
Expand Down Expand Up @@ -508,18 +507,14 @@ type windowTestCase struct {
concurrency int
dataSourceSorted bool
ctx sessionctx.Context
rawDataSmall string
}

var rawData = strings.Repeat("x", 5*1024)
var rawDataSmall = strings.Repeat("x", 16)

func (a windowTestCase) columns() []*expression.Column {
rawDataTp := new(types.FieldType)
types.DefaultTypeForValue(rawData, rawDataTp)
return []*expression.Column{
{Index: 0, RetType: types.NewFieldType(mysql.TypeDouble)},
{Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)},
{Index: 2, RetType: rawDataTp},
{Index: 2, RetType: types.NewFieldType(mysql.TypeVarString)},
{Index: 3, RetType: types.NewFieldType(mysql.TypeLonglong)},
}
}
Expand All @@ -533,7 +528,7 @@ func defaultWindowTestCase() *windowTestCase {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
return &windowTestCase{ast.WindowFuncRowNumber, 1, nil, 1000, 10000000, 1, true, ctx}
return &windowTestCase{ast.WindowFuncRowNumber, 1, nil, 1000, 10000000, 1, true, ctx, strings.Repeat("x", 16)}
}

func benchmarkWindowExecWithCase(b *testing.B, casTest *windowTestCase) {
Expand All @@ -544,12 +539,21 @@ func benchmarkWindowExecWithCase(b *testing.B, casTest *windowTestCase) {

cols := casTest.columns()
dataSource := buildMockDataSource(mockDataSourceParameters{
schema: expression.NewSchema(cols...),
ndvs: []int{0, casTest.ndv, 0, 0},
orders: []bool{false, casTest.dataSourceSorted, false, false},
rows: casTest.rows,
ctx: casTest.ctx,
isRawDataSmall: true,
schema: expression.NewSchema(cols...),
ndvs: []int{0, casTest.ndv, 0, 0},
orders: []bool{false, casTest.dataSourceSorted, false, false},
rows: casTest.rows,
ctx: casTest.ctx,
genDataFunc: func(row int, typ *types.FieldType) interface{} {
switch typ.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(row)
case mysql.TypeVarString:
return casTest.rawDataSmall
default:
panic("not implement")
}
},
})

b.ResetTimer()
Expand Down Expand Up @@ -679,6 +683,7 @@ type hashJoinTestCase struct {
joinType core.JoinType
disk bool
useOuterToBuild bool
rawData string
}

func (tc hashJoinTestCase) columns() []*expression.Column {
Expand All @@ -702,7 +707,7 @@ func defaultHashJoinTestCase(cols []*types.FieldType, joinType core.JoinType, us
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(nil, -1)
ctx.GetSessionVars().IndexLookupJoinConcurrency = 4
tc := &hashJoinTestCase{rows: 100000, concurrency: 4, ctx: ctx, keyIdx: []int{0, 1}}
tc := &hashJoinTestCase{rows: 100000, concurrency: 4, ctx: ctx, keyIdx: []int{0, 1}, rawData: strings.Repeat("x", 5*1024)}
tc.cols = cols
tc.useOuterToBuild = useOuterToBuild
tc.joinType = joinType
Expand Down Expand Up @@ -762,7 +767,7 @@ func benchmarkHashJoinExecWithCase(b *testing.B, casTest *hashJoinTestCase) {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(row)
case mysql.TypeVarString:
return rawData
return casTest.rawData
case mysql.TypeDouble:
return float64(row)
default:
Expand Down Expand Up @@ -915,7 +920,7 @@ func benchmarkBuildHashTableForList(b *testing.B, casTest *hashJoinTestCase) {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(row)
case mysql.TypeVarString:
return rawData
return casTest.rawData
default:
panic("not implement")
}
Expand Down Expand Up @@ -994,6 +999,7 @@ type indexJoinTestCase struct {
innerJoinKeyIdx []int
innerIdx []int
needOuterSort bool
rawData string
}

func (tc indexJoinTestCase) columns() []*expression.Column {
Expand All @@ -1019,6 +1025,7 @@ func defaultIndexJoinTestCase() *indexJoinTestCase {
outerJoinKeyIdx: []int{0, 1},
innerJoinKeyIdx: []int{0, 1},
innerIdx: []int{0, 1},
rawData: strings.Repeat("x", 5*1024),
}
return tc
}
Expand All @@ -1039,7 +1046,7 @@ func (tc indexJoinTestCase) getMockDataSourceOptByRows(rows int) mockDataSourceP
case mysql.TypeDouble:
return float64(row)
case mysql.TypeVarString:
return rawData
return tc.rawData
default:
panic("not implement")
}
Expand Down Expand Up @@ -1316,6 +1323,7 @@ func newMergeJoinBenchmark(numOuterRows, numInnerDup, numInnerRedundant int) (tc
outerJoinKeyIdx: []int{0, 1},
innerJoinKeyIdx: []int{0, 1},
innerIdx: []int{0, 1},
rawData: strings.Repeat("x", 5*1024),
}
tc = &mergeJoinTestCase{*itc}
outerOpt := mockDataSourceParameters{
Expand All @@ -1329,7 +1337,7 @@ func newMergeJoinBenchmark(numOuterRows, numInnerDup, numInnerRedundant int) (tc
case mysql.TypeDouble:
return float64(row)
case mysql.TypeVarString:
return rawData
return tc.rawData
default:
panic("not implement")
}
Expand All @@ -1348,7 +1356,7 @@ func newMergeJoinBenchmark(numOuterRows, numInnerDup, numInnerRedundant int) (tc
case mysql.TypeDouble:
return float64(row)
case mysql.TypeVarString:
return rawData
return tc.rawData
default:
panic("not implement")
}
Expand Down Expand Up @@ -1443,3 +1451,105 @@ func BenchmarkMergeJoinExec(b *testing.B) {
})
}
}

type sortCase struct {
rows int
orderByIdx []int
ndvs []int
ctx sessionctx.Context
}

func (tc sortCase) columns() []*expression.Column {
return []*expression.Column{
{Index: 0, RetType: types.NewFieldType(mysql.TypeLonglong)},
{Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)},
}
}

func (tc sortCase) String() string {
return fmt.Sprintf("(rows:%v, orderBy:%v, ndvs: %v)", tc.rows, tc.orderByIdx, tc.ndvs)
}

func defaultSortTestCase() *sortCase {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
tc := &sortCase{rows: 300000, orderByIdx: []int{0, 1}, ndvs: []int{0, 0}, ctx: ctx}
return tc
}

func benchmarkSortExec(b *testing.B, cas *sortCase) {
opt := mockDataSourceParameters{
schema: expression.NewSchema(cas.columns()...),
rows: cas.rows,
ctx: cas.ctx,
ndvs: cas.ndvs,
}
dataSource := buildMockDataSource(opt)
exec := &SortExec{
baseExecutor: newBaseExecutor(cas.ctx, dataSource.schema, stringutil.StringerStr("sort"), dataSource),
ByItems: make([]*core.ByItems, 0, len(cas.orderByIdx)),
schema: dataSource.schema,
}
for _, idx := range cas.orderByIdx {
exec.ByItems = append(exec.ByItems, &core.ByItems{Expr: cas.columns()[idx]})
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
tmpCtx := context.Background()
chk := newFirstChunk(exec)
dataSource.prepareChunks()

b.StartTimer()
if err := exec.Open(tmpCtx); err != nil {
b.Fatal(err)
}
for {
if err := exec.Next(tmpCtx, chk); err != nil {
b.Fatal(err)
}
if chk.NumRows() == 0 {
break
}
}

if err := exec.Close(); err != nil {
b.Fatal(err)
}
b.StopTimer()
}
}

func BenchmarkSortExec(b *testing.B) {
b.ReportAllocs()
cas := defaultSortTestCase()
// all random data
cas.ndvs = []int{0, 0}
cas.orderByIdx = []int{0, 1}
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkSortExec(b, cas)
})

ndvs := []int{1, 10000}
for _, ndv := range ndvs {
cas.ndvs = []int{ndv, 0}
cas.orderByIdx = []int{0, 1}
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkSortExec(b, cas)
})

cas.ndvs = []int{ndv, 0}
cas.orderByIdx = []int{0}
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkSortExec(b, cas)
})

cas.ndvs = []int{ndv, 0}
cas.orderByIdx = []int{1}
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkSortExec(b, cas)
})
}
}
91 changes: 91 additions & 0 deletions executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/stringutil"
)

var _ = Suite(&testExecSuite{})
Expand Down Expand Up @@ -234,3 +239,89 @@ func assertEqualStrings(c *C, got []field, expect []string) {
c.Assert(string(got[i].str), Equals, expect[i])
}
}

func (s *testExecSuite) TestSortSpillDisk(c *C) {
originCfg := config.GetGlobalConfig()
newConf := *originCfg
newConf.OOMUseTmpStorage = true
newConf.MemQuotaQuery = 1
config.StoreGlobalConfig(&newConf)
defer config.StoreGlobalConfig(originCfg)

ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
cas := &sortCase{rows: 2048, orderByIdx: []int{0, 1}, ndvs: []int{0, 0}, ctx: ctx}
opt := mockDataSourceParameters{
schema: expression.NewSchema(cas.columns()...),
rows: cas.rows,
ctx: cas.ctx,
ndvs: cas.ndvs,
}
dataSource := buildMockDataSource(opt)
exec := &SortExec{
baseExecutor: newBaseExecutor(cas.ctx, dataSource.schema, stringutil.StringerStr("sort"), dataSource),
ByItems: make([]*core.ByItems, 0, len(cas.orderByIdx)),
schema: dataSource.schema,
}
for _, idx := range cas.orderByIdx {
exec.ByItems = append(exec.ByItems, &core.ByItems{Expr: cas.columns()[idx]})
}
tmpCtx := context.Background()
chk := newFirstChunk(exec)
dataSource.prepareChunks()
err := exec.Open(tmpCtx)
c.Assert(err, IsNil)
for {
err = exec.Next(tmpCtx, chk)
c.Assert(err, IsNil)
if chk.NumRows() == 0 {
break
}
}
// Test only 1 partition and all data in memory.
c.Assert(len(exec.partitionList), Equals, 1)
c.Assert(exec.partitionList[0].AlreadySpilled(), Equals, false)
c.Assert(exec.partitionList[0].NumRow(), Equals, 2048)
err = exec.Close()
c.Assert(err, IsNil)

ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, 1)
dataSource.prepareChunks()
err = exec.Open(tmpCtx)
c.Assert(err, IsNil)
for {
err = exec.Next(tmpCtx, chk)
c.Assert(err, IsNil)
if chk.NumRows() == 0 {
break
}
}
// Test 2 partitions and all data in disk.
c.Assert(len(exec.partitionList), Equals, 2)
c.Assert(exec.partitionList[0].AlreadySpilled(), Equals, true)
c.Assert(exec.partitionList[1].AlreadySpilled(), Equals, true)
c.Assert(exec.partitionList[0].NumRow(), Equals, 1024)
c.Assert(exec.partitionList[1].NumRow(), Equals, 1024)
err = exec.Close()
c.Assert(err, IsNil)

ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, 24000)
dataSource.prepareChunks()
err = exec.Open(tmpCtx)
c.Assert(err, IsNil)
for {
err = exec.Next(tmpCtx, chk)
c.Assert(err, IsNil)
if chk.NumRows() == 0 {
break
}
}
// Test only 1 partition but spill disk.
c.Assert(len(exec.partitionList), Equals, 1)
c.Assert(exec.partitionList[0].AlreadySpilled(), Equals, true)
c.Assert(exec.partitionList[0].NumRow(), Equals, 2048)
err = exec.Close()
c.Assert(err, IsNil)
}
Loading