diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 9b4d3cde8b127..510eeeaf6ecb8 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -19,6 +19,7 @@ package ddl import ( "bytes" + "context" "fmt" "strings" "time" @@ -31,6 +32,7 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" @@ -1170,11 +1172,28 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e err = d.doDDLJob(ctx, job) if err == nil { // do pre-split and scatter. - if tbInfo.ShardRowIDBits > 0 && tbInfo.PreSplitRegions > 0 { - if ctx.GetSessionVars().WaitSplitRegionFinish { - preSplitTableShardRowIDBitsRegion(d.store, tbInfo, true) + sp, ok := d.store.(kv.SplitableStore) + if ok && EnableSplitTableRegion { + var ( + preSplit func() + scatterRegion bool + ) + val, err := variable.GetGlobalSystemVar(ctx.GetSessionVars(), variable.TiDBScatterRegion) + if err != nil { + logutil.Logger(context.Background()).Warn("[ddl] won't scatter region", zap.Error(err)) + } else { + scatterRegion = variable.TiDBOptOn(val) + } + pi := tbInfo.GetPartitionInfo() + if pi != nil { + preSplit = func() { splitPartitionTableRegion(sp, pi, scatterRegion) } + } else { + preSplit = func() { splitTableRegion(sp, tbInfo, scatterRegion) } + } + if scatterRegion { + preSplit() } else { - go preSplitTableShardRowIDBitsRegion(d.store, tbInfo, false) + go preSplit() } } if tbInfo.AutoIncID > 1 { diff --git a/ddl/split_region.go b/ddl/split_region.go new file mode 100644 index 0000000000000..18b4471fac56a --- /dev/null +++ b/ddl/split_region.go @@ -0,0 +1,123 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "context" + + "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +func splitPartitionTableRegion(store kv.SplitableStore, pi *model.PartitionInfo, scatter bool) { + // Max partition count is 4096, should we sample and just choose some of the partition to split? + regionIDs := make([]uint64, 0, len(pi.Definitions)) + for _, def := range pi.Definitions { + regionIDs = append(regionIDs, splitRecordRegion(store, def.ID, scatter)) + } + if scatter { + waitScatterRegionFinish(store, regionIDs) + } +} + +func splitTableRegion(store kv.SplitableStore, tbInfo *model.TableInfo, scatter bool) { + regionIDs := make([]uint64, 0, len(tbInfo.Indices)+1) + if tbInfo.ShardRowIDBits > 0 && tbInfo.PreSplitRegions > 0 { + // Example: + // ShardRowIDBits = 5 + // PreSplitRegions = 3 + // + // then will pre-split 2^(3-1) = 4 regions. + // + // in this code: + // max = 1 << (tblInfo.ShardRowIDBits - 1) = 1 << (5-1) = 16 + // step := int64(1 << (tblInfo.ShardRowIDBits - tblInfo.PreSplitRegions)) = 1 << (5-3) = 4; + // + // then split regionID is below: + // 4 << 59 = 2305843009213693952 + // 8 << 59 = 4611686018427387904 + // 12 << 59 = 6917529027641081856 + // + // The 4 pre-split regions range is below: + // 0 ~ 2305843009213693952 + // 2305843009213693952 ~ 4611686018427387904 + // 4611686018427387904 ~ 6917529027641081856 + // 6917529027641081856 ~ 9223372036854775807 ( (1 << 63) - 1 ) + // + // And the max _tidb_rowid is 9223372036854775807, it won't be negative number. + + // Split table region. + step := int64(1 << (tbInfo.ShardRowIDBits - tbInfo.PreSplitRegions)) + // The highest bit is the symbol bit,and alloc _tidb_rowid will always be positive number. + // So we only need to split the region for the positive number. + max := int64(1 << (tbInfo.ShardRowIDBits - 1)) + for p := int64(step); p < max; p += step { + recordID := p << (64 - tbInfo.ShardRowIDBits) + recordPrefix := tablecodec.GenTableRecordPrefix(tbInfo.ID) + key := tablecodec.EncodeRecordKey(recordPrefix, recordID) + regionID, err := store.SplitRegion(key, scatter) + if err != nil { + logutil.Logger(context.Background()).Warn("[ddl] pre split table region failed", zap.Int64("recordID", recordID), + zap.Error(err)) + } else { + regionIDs = append(regionIDs, regionID) + } + } + } else { + regionIDs = append(regionIDs, splitRecordRegion(store, tbInfo.ID, scatter)) + } + regionIDs = append(regionIDs, splitIndexRegion(store, tbInfo, scatter)...) + if scatter { + waitScatterRegionFinish(store, regionIDs) + } +} + +func splitRecordRegion(store kv.SplitableStore, tableID int64, scatter bool) uint64 { + tableStartKey := tablecodec.GenTablePrefix(tableID) + regionID, err := store.SplitRegion(tableStartKey, scatter) + if err != nil { + // It will be automatically split by TiKV later. + logutil.Logger(context.Background()).Warn("[ddl] split table region failed", zap.Error(err)) + } + return regionID +} + +func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter bool) []uint64 { + regionIDs := make([]uint64, 0, len(tblInfo.Indices)) + for _, idx := range tblInfo.Indices { + indexPrefix := tablecodec.EncodeTableIndexPrefix(tblInfo.ID, idx.ID) + regionID, err := store.SplitRegion(indexPrefix, scatter) + if err != nil { + logutil.Logger(context.Background()).Warn("[ddl] pre split table index region failed", + zap.Stringer("table", tblInfo.Name), + zap.Stringer("index", idx.Name), + zap.Error(err)) + } + regionIDs = append(regionIDs, regionID) + } + return regionIDs +} + +func waitScatterRegionFinish(store kv.SplitableStore, regionIDs []uint64) { + for _, regionID := range regionIDs { + err := store.WaitScatterRegionFinish(regionID) + if err != nil { + logutil.Logger(context.Background()).Warn("[ddl] wait scatter region failed", zap.Uint64("regionID", regionID), + zap.Error(err)) + } + } +} diff --git a/ddl/table.go b/ddl/table.go index d1d332fc3dacb..1ae046940b54f 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -31,8 +31,6 @@ import ( "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/charset" - "github.com/pingcap/tidb/util/logutil" - "go.uber.org/zap" ) func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { @@ -72,10 +70,6 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) if err != nil { return ver, errors.Trace(err) } - if EnableSplitTableRegion { - // TODO: Add restrictions to this operation. - go splitTableRegion(d.store, tbInfo.ID) - } // Finish this job. job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo) asyncNotifyEvent(d, &util.Event{Tp: model.ActionCreateTable, TableInfo: tbInfo}) @@ -160,91 +154,6 @@ func onDropTable(t *meta.Meta, job *model.Job) (ver int64, _ error) { return ver, errors.Trace(err) } -type splitableStore interface { - SplitRegion(splitKey kv.Key) error - SplitRegionAndScatter(splitKey kv.Key) (uint64, error) - WaitScatterRegionFinish(regionID uint64) error -} - -func splitTableRegion(store kv.Storage, tableID int64) { - s, ok := store.(splitableStore) - if !ok { - return - } - tableStartKey := tablecodec.GenTablePrefix(tableID) - if err := s.SplitRegion(tableStartKey); err != nil { - // It will be automatically split by TiKV later. - logutil.Logger(ddlLogCtx).Warn("[ddl] split table region failed", zap.Error(err)) - } -} - -func preSplitTableShardRowIDBitsRegion(store kv.Storage, tblInfo *model.TableInfo, waitTableSplitFinish bool) { - s, ok := store.(splitableStore) - if !ok { - return - } - regionIDs := make([]uint64, 0, 1<<(tblInfo.PreSplitRegions-1)+len(tblInfo.Indices)) - - // Example: - // ShardRowIDBits = 5 - // PreSplitRegions = 3 - // - // then will pre-split 2^(3-1) = 4 regions. - // - // in this code: - // max = 1 << (tblInfo.ShardRowIDBits - 1) = 1 << (5-1) = 16 - // step := int64(1 << (tblInfo.ShardRowIDBits - tblInfo.PreSplitRegions)) = 1 << (5-3) = 4; - // - // then split regionID is below: - // 4 << 59 = 2305843009213693952 - // 8 << 59 = 4611686018427387904 - // 12 << 59 = 6917529027641081856 - // - // The 4 pre-split regions range is below: - // 0 ~ 2305843009213693952 - // 2305843009213693952 ~ 4611686018427387904 - // 4611686018427387904 ~ 6917529027641081856 - // 6917529027641081856 ~ 9223372036854775807 ( (1 << 63) - 1 ) - // - // And the max _tidb_rowid is 9223372036854775807, it won't be negative number. - - // Split table region. - step := int64(1 << (tblInfo.ShardRowIDBits - tblInfo.PreSplitRegions)) - // The highest bit is the symbol bit,and alloc _tidb_rowid will always be positive number. - // So we only need to split the region for the positive number. - max := int64(1 << (tblInfo.ShardRowIDBits - 1)) - for p := int64(step); p < max; p += step { - recordID := p << (64 - tblInfo.ShardRowIDBits) - recordPrefix := tablecodec.GenTableRecordPrefix(tblInfo.ID) - key := tablecodec.EncodeRecordKey(recordPrefix, recordID) - regionID, err := s.SplitRegionAndScatter(key) - if err != nil { - logutil.Logger(ddlLogCtx).Warn("[ddl] pre split table region failed", zap.Int64("recordID", recordID), zap.Error(err)) - } else { - regionIDs = append(regionIDs, regionID) - } - } - // Split index region. - for _, idx := range tblInfo.Indices { - indexPrefix := tablecodec.EncodeTableIndexPrefix(tblInfo.ID, idx.ID) - regionID, err := s.SplitRegionAndScatter(indexPrefix) - if err != nil { - logutil.Logger(ddlLogCtx).Warn("[ddl] pre split table index region failed", zap.String("index", idx.Name.L), zap.Error(err)) - } else { - regionIDs = append(regionIDs, regionID) - } - } - if !waitTableSplitFinish { - return - } - for _, regionID := range regionIDs { - err := s.WaitScatterRegionFinish(regionID) - if err != nil { - logutil.Logger(ddlLogCtx).Warn("[ddl] wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err)) - } - } -} - func getTable(store kv.Storage, schemaID int64, tblInfo *model.TableInfo) (table.Table, error) { alloc := autoid.NewAllocator(store, tblInfo.GetDBID(schemaID), tblInfo.IsAutoIncColUnsigned()) tbl, err := table.TableFromMeta(alloc, tblInfo) diff --git a/executor/set_test.go b/executor/set_test.go index 8bd8d4797ca4b..39776f5fac6db 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -267,6 +267,17 @@ func (s *testSuite) TestSetVar(c *C) { tk.MustExec("set tidb_wait_split_region_finish = 0") tk.MustQuery(`select @@session.tidb_wait_split_region_finish;`).Check(testkit.Rows("0")) + // test for tidb_scatter_region + tk.MustQuery(`select @@global.tidb_scatter_region;`).Check(testkit.Rows("0")) + tk.MustExec("set global tidb_scatter_region = 1") + tk.MustQuery(`select @@global.tidb_scatter_region;`).Check(testkit.Rows("1")) + tk.MustExec("set global tidb_scatter_region = 0") + tk.MustQuery(`select @@global.tidb_scatter_region;`).Check(testkit.Rows("0")) + _, err = tk.Exec("set session tidb_scatter_region = 0") + c.Assert(err, NotNil) + _, err = tk.Exec(`select @@session.tidb_scatter_region;`) + c.Assert(err, NotNil) + // test for tidb_wait_split_region_timeout tk.MustQuery(`select @@session.tidb_wait_split_region_timeout;`).Check(testkit.Rows(strconv.Itoa(variable.DefWaitSplitRegionTimeout))) tk.MustExec("set tidb_wait_split_region_timeout = 1") diff --git a/executor/split.go b/executor/split.go index fd2efbb66b6d8..01e086e2f2b6b 100644 --- a/executor/split.go +++ b/executor/split.go @@ -46,15 +46,10 @@ type SplitIndexRegionExec struct { valueLists [][]types.Datum } -type splitableStore interface { - SplitRegionAndScatter(splitKey kv.Key) (uint64, error) - WaitScatterRegionFinish(regionID uint64) error -} - // Next implements the Executor Next interface. func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error { store := e.ctx.GetStore() - s, ok := store.(splitableStore) + s, ok := store.(kv.SplitableStore) if !ok { return nil } @@ -67,7 +62,7 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error { defer cancel() regionIDs := make([]uint64, 0, len(splitIdxKeys)) for _, idxKey := range splitIdxKeys { - regionID, err := s.SplitRegionAndScatter(idxKey) + regionID, err := s.SplitRegion(idxKey, true) if err != nil { logutil.Logger(context.Background()).Warn("split table index region failed", zap.String("table", e.tableInfo.Name.L), @@ -231,7 +226,7 @@ type SplitTableRegionExec struct { // Next implements the Executor Next interface. func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error { store := e.ctx.GetStore() - s, ok := store.(splitableStore) + s, ok := store.(kv.SplitableStore) if !ok { return nil } @@ -245,7 +240,7 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error { regionIDs := make([]uint64, 0, len(splitKeys)) for _, key := range splitKeys { - regionID, err := s.SplitRegionAndScatter(key) + regionID, err := s.SplitRegion(key, true) if err != nil { logutil.Logger(context.Background()).Warn("split table region failed", zap.String("table", e.tableInfo.Name.L), diff --git a/kv/kv.go b/kv/kv.go index ebeebc98d862a..601bb6bfbdd92 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -287,3 +287,9 @@ type Iterator interface { Next() error Close() } + +// SplitableStore is the kv store which supports split regions. +type SplitableStore interface { + SplitRegion(splitKey Key, scatter bool) (regionID uint64, err error) + WaitScatterRegionFinish(regionID uint64) error +} diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index a6f55ce1390a0..535c1f1ccb7f6 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -677,6 +677,7 @@ var defaultSysVars = []*SysVar{ {ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]}, {ScopeSession, TiDBCheckMb4ValueInUTF8, BoolToIntStr(config.GetGlobalConfig().CheckMb4ValueInUTF8)}, {ScopeSession, TiDBSlowQueryFile, ""}, + {ScopeGlobal, TiDBScatterRegion, BoolToIntStr(DefTiDBScatterRegion)}, {ScopeSession, TiDBWaitSplitRegionFinish, BoolToIntStr(DefTiDBWaitSplitRegionFinish)}, {ScopeSession, TiDBWaitSplitRegionTimeout, strconv.Itoa(DefWaitSplitRegionTimeout)}, } diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 895d78af5621d..4b5d3bb75726c 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -203,6 +203,9 @@ const ( // It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH TiDBDDLReorgPriority = "tidb_ddl_reorg_priority" + // tidb_scatter_region will scatter the regions for DDLs when it is ON. + TiDBScatterRegion = "tidb_scatter_region" + // TiDBWaitSplitRegionFinish defines the split region behaviour is sync or async. TiDBWaitSplitRegionFinish = "tidb_wait_split_region_finish" @@ -264,6 +267,7 @@ const ( DefTiDBHashAggPartialConcurrency = 4 DefTiDBHashAggFinalConcurrency = 4 DefTiDBForcePriority = mysql.NoPriority + DefTiDBScatterRegion = false DefTiDBWaitSplitRegionFinish = true DefWaitSplitRegionTimeout = 300 // 300s ) diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 94e5435dd6108..dda8d0dd92376 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -326,7 +326,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, case AutocommitVar, TiDBSkipUTF8Check, TiDBOptAggPushDown, TiDBOptInSubqUnFolding, TiDBEnableTablePartition, TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming, - TiDBBatchDelete, TiDBCheckMb4ValueInUTF8: + TiDBBatchDelete, TiDBCheckMb4ValueInUTF8, TiDBScatterRegion: if strings.EqualFold(value, "ON") || value == "1" || strings.EqualFold(value, "OFF") || value == "0" { return value, nil } diff --git a/store/tikv/scan_test.go b/store/tikv/scan_test.go index c45716c4b0a51..92801cb6d559b 100644 --- a/store/tikv/scan_test.go +++ b/store/tikv/scan_test.go @@ -91,12 +91,12 @@ func (s *testScanSuite) TestScan(c *C) { c.Assert(err, IsNil) if rowNum > 123 { - err = s.store.SplitRegion(encodeKey(s.prefix, s08d("key", 123))) + _, err = s.store.SplitRegion(encodeKey(s.prefix, s08d("key", 123)), false) c.Assert(err, IsNil) } if rowNum > 456 { - err = s.store.SplitRegion(encodeKey(s.prefix, s08d("key", 456))) + _, err = s.store.SplitRegion(encodeKey(s.prefix, s08d("key", 456)), false) c.Assert(err, IsNil) } diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index e47502177c85a..8bc279b7f0733 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -18,7 +18,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/tikvrpc" @@ -29,12 +28,7 @@ import ( // SplitRegion splits the region contains splitKey into 2 regions: [start, // splitKey) and [splitKey, end). -func (s *tikvStore) SplitRegion(splitKey kv.Key) error { - _, err := s.splitRegion(splitKey) - return err -} - -func (s *tikvStore) splitRegion(splitKey kv.Key) (*metapb.Region, error) { +func (s *tikvStore) SplitRegion(splitKey kv.Key, scatter bool) (regionID uint64, err error) { logutil.Logger(context.Background()).Info("start split region", zap.Binary("at", splitKey)) bo := NewBackoffer(context.Background(), splitRegionBackoff) @@ -49,25 +43,25 @@ func (s *tikvStore) splitRegion(splitKey kv.Key) (*metapb.Region, error) { for { loc, err := s.regionCache.LocateKey(bo, splitKey) if err != nil { - return nil, errors.Trace(err) + return 0, errors.Trace(err) } if bytes.Equal(splitKey, loc.StartKey) { logutil.Logger(context.Background()).Info("skip split region", zap.Binary("at", splitKey)) - return nil, nil + return 0, nil } res, err := sender.SendReq(bo, req, loc.Region, readTimeoutShort) if err != nil { - return nil, errors.Trace(err) + return 0, errors.Trace(err) } regionErr, err := res.GetRegionError() if err != nil { - return nil, errors.Trace(err) + return 0, errors.Trace(err) } if regionErr != nil { err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) if err != nil { - return nil, errors.Trace(err) + return 0, errors.Trace(err) } continue } @@ -75,7 +69,17 @@ func (s *tikvStore) splitRegion(splitKey kv.Key) (*metapb.Region, error) { zap.Binary("at", splitKey), zap.Stringer("new region left", res.SplitRegion.GetLeft()), zap.Stringer("new region right", res.SplitRegion.GetRight())) - return res.SplitRegion.GetLeft(), nil + left := res.SplitRegion.GetLeft() + if left == nil { + return 0, nil + } + if scatter { + err = s.scatterRegion(left.Id) + if err != nil { + return 0, errors.Trace(err) + } + } + return left.Id, nil } } @@ -99,6 +103,7 @@ func (s *tikvStore) scatterRegion(regionID uint64) error { return nil } +// WaitScatterRegionFinish implements SplitableStore interface. func (s *tikvStore) WaitScatterRegionFinish(regionID uint64) error { logutil.Logger(context.Background()).Info("wait scatter region", zap.Uint64("regionID", regionID)) @@ -130,18 +135,3 @@ func (s *tikvStore) WaitScatterRegionFinish(regionID uint64) error { } } } - -func (s *tikvStore) SplitRegionAndScatter(splitKey kv.Key) (uint64, error) { - left, err := s.splitRegion(splitKey) - if err != nil { - return 0, err - } - if left == nil { - return 0, nil - } - err = s.scatterRegion(left.Id) - if err != nil { - return 0, err - } - return left.Id, nil -}