Skip to content

Commit

Permalink
Merge branch 'master' into planner-support-leading-hint
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored May 17, 2022
2 parents 3ddd5c9 + 683ba09 commit 19f8993
Show file tree
Hide file tree
Showing 29 changed files with 1,575 additions and 1,183 deletions.
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ type AbstractBackend interface {
// ImportEngine imports engine data to the backend. If it returns ErrDuplicateDetected,
// it means there is duplicate detected. For this situation, all data in the engine must be imported.
// It's safe to reset or cleanup this engine.
ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error
ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error

CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error

Expand Down Expand Up @@ -315,7 +315,7 @@ func (be Backend) CheckDiskQuota(quota int64) (
// into the target and then reset the engine to empty. This method will not
// close the engine. Make sure the engine is flushed manually before calling
// this method.
func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error {
func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
// DO NOT call be.abstract.CloseEngine()! The engine should still be writable after
// calling UnsafeImportAndReset().
closedEngine := ClosedEngine{
Expand All @@ -325,7 +325,7 @@ func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID
uuid: engineUUID,
},
}
if err := closedEngine.Import(ctx, regionSplitSize); err != nil {
if err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys); err != nil {
return err
}
return be.abstract.ResetEngine(ctx, engineUUID)
Expand Down Expand Up @@ -445,12 +445,12 @@ func (en engine) unsafeClose(ctx context.Context, cfg *EngineConfig) (*ClosedEng
}

// Import the data written to the engine into the target.
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize int64) error {
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionSplitKeys int64) error {
var err error

for i := 0; i < importMaxRetryTimes; i++ {
task := engine.logger.With(zap.Int("retryCnt", i)).Begin(zap.InfoLevel, "import")
err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize)
err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize, regionSplitKeys)
if !common.IsRetryableError(err) {
task.End(zap.ErrorLevel, err)
return err
Expand Down
18 changes: 9 additions & 9 deletions br/pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestOpenCloseImportCleanUpEngine(t *testing.T) {
Return(nil).
After(openCall)
importCall := s.mockBackend.EXPECT().
ImportEngine(ctx, engineUUID, gomock.Any()).
ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any()).
Return(nil).
After(closeCall)
s.mockBackend.EXPECT().
Expand All @@ -66,7 +66,7 @@ func TestOpenCloseImportCleanUpEngine(t *testing.T) {
require.NoError(t, err)
closedEngine, err := engine.Close(ctx, nil)
require.NoError(t, err)
err = closedEngine.Import(ctx, 1)
err = closedEngine.Import(ctx, 1, 1)
require.NoError(t, err)
err = closedEngine.Cleanup(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -250,12 +250,12 @@ func TestImportFailedNoRetry(t *testing.T) {

s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any(), gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()).
Return(errors.Annotate(context.Canceled, "fake unrecoverable import error"))

closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1)
require.NoError(t, err)
err = closedEngine.Import(ctx, 1)
err = closedEngine.Import(ctx, 1, 1)
require.Error(t, err)
require.Regexp(t, "^fake unrecoverable import error", err.Error())
}
Expand All @@ -268,14 +268,14 @@ func TestImportFailedWithRetry(t *testing.T) {

s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any(), gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()).
Return(errors.Annotate(driver.ErrBadConn, "fake recoverable import error")).
MinTimes(2)
s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes()

closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1)
require.NoError(t, err)
err = closedEngine.Import(ctx, 1)
err = closedEngine.Import(ctx, 1, 1)
require.Error(t, err)
require.Contains(t, err.Error(), "fake recoverable import error")
}
Expand All @@ -288,16 +288,16 @@ func TestImportFailedRecovered(t *testing.T) {

s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any(), gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()).
Return(gmysql.ErrInvalidConn)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any(), gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil)
s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes()

closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1)
require.NoError(t, err)
err = closedEngine.Import(ctx, 1)
err = closedEngine.Import(ctx, 1, 1)
require.NoError(t, err)
}

Expand Down
59 changes: 50 additions & 9 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,6 @@ const (
gRPCKeepAliveTimeout = 5 * time.Minute
gRPCBackOffMaxDelay = 10 * time.Minute

// See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360
// lower the max-key-count to avoid tikv trigger region auto split
regionMaxKeyCount = 1_280_000
defaultRegionSplitSize = 96 * units.MiB
// The max ranges count in a batch to split and scatter.
maxBatchSplitRanges = 4096

Expand Down Expand Up @@ -823,7 +819,7 @@ func (local *local) WriteToTiKV(
// if region-split-size <= 96MiB, we bump the threshold a bit to avoid too many retry split
// because the range-properties is not 100% accurate
regionMaxSize := regionSplitSize
if regionSplitSize <= defaultRegionSplitSize {
if regionSplitSize <= int64(config.SplitRegionSize) {
regionMaxSize = regionSplitSize * 4 / 3
}

Expand Down Expand Up @@ -1328,7 +1324,7 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engine *Engine,
return allErr
}

func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error {
func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
lf := local.lockEngine(engineUUID, importMutexStateImport)
if lf == nil {
// skip if engine not exist. See the comment of `CloseEngine` for more detail.
Expand All @@ -1342,9 +1338,16 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
log.L().Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID))
return nil
}
regionSplitKeys := int64(regionMaxKeyCount)
if regionSplitSize > defaultRegionSplitSize {
regionSplitKeys = int64(float64(regionSplitSize) / float64(defaultRegionSplitSize) * float64(regionMaxKeyCount))
kvRegionSplitSize, kvRegionSplitKeys, err := getRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
if err == nil {
if kvRegionSplitSize > regionSplitSize {
regionSplitSize = kvRegionSplitSize
}
if kvRegionSplitKeys > regionSplitKeys {
regionSplitKeys = kvRegionSplitKeys
}
} else {
log.L().Warn("fail to get region split keys and size", zap.Error(err))
}

// split sorted file into range by 96MB size per file
Expand Down Expand Up @@ -1842,3 +1845,41 @@ func (local *local) EngineFileSizes() (res []backend.EngineFileSize) {
})
return
}

func getSplitConfFromStore(ctx context.Context, host string, tls *common.TLS) (int64, int64, error) {
var (
nested struct {
Coprocessor struct {
RegionSplitSize string `json:"region-split-size"`
RegionSplitKeys int64 `json:"region-split-keys"`
} `json:"coprocessor"`
}
)
if err := tls.WithHost(host).GetJSON(ctx, "/config", &nested); err != nil {
return 0, 0, errors.Trace(err)
}
splitSize, err := units.FromHumanSize(nested.Coprocessor.RegionSplitSize)
if err != nil {
return 0, 0, errors.Trace(err)
}

return splitSize, nested.Coprocessor.RegionSplitKeys, nil
}

func getRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) (int64, int64, error) {
stores, err := cli.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return 0, 0, err
}
for _, store := range stores {
if store.StatusAddress == "" || version.IsTiFlash(store) {
continue
}
regionSplitSize, regionSplitKeys, err := getSplitConfFromStore(ctx, store.StatusAddress, tls)
if err == nil {
return regionSplitSize, regionSplitKeys, nil
}
log.L().Warn("get region split size and keys failed", zap.Error(err), zap.String("store", store.StatusAddress))
}
return 0, 0, errors.New("get region split size and keys failed")
}
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/noop/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (b noopBackend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig,
return nil
}

func (b noopBackend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error {
func (b noopBackend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func (be *tidbBackend) ResolveDuplicateRows(ctx context.Context, tbl table.Table
return nil
}

func (be *tidbBackend) ImportEngine(context.Context, uuid.UUID, int64) error {
func (be *tidbBackend) ImportEngine(context.Context, uuid.UUID, int64, int64) error {
return nil
}

Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ type TikvImporter struct {
MaxKVPairs int `toml:"max-kv-pairs" json:"max-kv-pairs"`
SendKVPairs int `toml:"send-kv-pairs" json:"send-kv-pairs"`
RegionSplitSize ByteSize `toml:"region-split-size" json:"region-split-size"`
RegionSplitKeys int `toml:"region-split-keys" json:"region-split-keys"`
SortedKVDir string `toml:"sorted-kv-dir" json:"sorted-kv-dir"`
DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"`
RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"`
Expand Down
7 changes: 5 additions & 2 deletions br/pkg/lightning/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (

const (
// mydumper
ReadBlockSize ByteSize = 64 * units.KiB
MaxRegionSize ByteSize = 256 * units.MiB
ReadBlockSize ByteSize = 64 * units.KiB
MaxRegionSize ByteSize = 256 * units.MiB
// See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360
// lower the max-key-count to avoid tikv trigger region auto split
SplitRegionSize ByteSize = 96 * units.MiB
SplitRegionKeys int = 1_280_000
MaxSplitRegionSizeRatio int = 10

BufferSizeScale = 5
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1807,7 +1807,7 @@ func (rc *Controller) enforceDiskQuota(ctx context.Context) {
var importErr error
for _, engine := range largeEngines {
// Use a larger split region size to avoid split the same region by many times.
if err := rc.backend.UnsafeImportAndReset(ctx, engine, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio)); err != nil {
if err := rc.backend.UnsafeImportAndReset(ctx, engine, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys)*int64(config.MaxSplitRegionSizeRatio)); err != nil {
importErr = multierr.Append(importErr, err)
}
}
Expand Down
11 changes: 10 additions & 1 deletion br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,8 @@ func (tr *TableRestore) importKV(
) error {
task := closedEngine.Logger().Begin(zap.InfoLevel, "import and cleanup engine")
regionSplitSize := int64(rc.cfg.TikvImporter.RegionSplitSize)
regionSplitKeys := int64(rc.cfg.TikvImporter.RegionSplitKeys)

if regionSplitSize == 0 && rc.taskMgr != nil {
regionSplitSize = int64(config.SplitRegionSize)
if err := rc.taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) {
Expand All @@ -932,7 +934,14 @@ func (tr *TableRestore) importKV(
return errors.Trace(err)
}
}
err := closedEngine.Import(ctx, regionSplitSize)
if regionSplitKeys == 0 {
if regionSplitSize > int64(config.SplitRegionSize) {
regionSplitKeys = int64(float64(regionSplitSize) / float64(config.SplitRegionSize) * float64(config.SplitRegionKeys))
} else {
regionSplitKeys = int64(config.SplitRegionKeys)
}
}
err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys)
saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, engineID, err, checkpoints.CheckpointStatusImported)
// Don't clean up when save checkpoint failed, because we will verifyLocalFile and import engine again after restart.
if err == nil && saveCpErr == nil {
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/restore/table_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ func (s *tableRestoreSuite) TestImportKVSuccess() {
CloseEngine(ctx, nil, engineUUID).
Return(nil)
mockBackend.EXPECT().
ImportEngine(ctx, engineUUID, gomock.Any()).
ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any()).
Return(nil)
mockBackend.EXPECT().
CleanupEngine(ctx, engineUUID).
Expand Down Expand Up @@ -866,7 +866,7 @@ func (s *tableRestoreSuite) TestImportKVFailure() {
CloseEngine(ctx, nil, engineUUID).
Return(nil)
mockBackend.EXPECT().
ImportEngine(ctx, engineUUID, gomock.Any()).
ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any()).
Return(errors.Annotate(context.Canceled, "fake import error"))

closedEngine, err := importer.UnsafeCloseEngineWithUUID(ctx, nil, "tag", engineUUID)
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/mock/backend.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion dumpling/export/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func InitMetricsVector(labels prometheus.Labels) {
Namespace: "dumpling",
Subsystem: "write",
Name: "receive_chunk_duration_time",
Help: "Bucketed histogram of write time (s) of files",
Help: "Bucketed histogram of receiving time (s) of chunks",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
}, labelNames)
errorCount = prometheus.NewCounterVec(
Expand Down
2 changes: 1 addition & 1 deletion executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ func TestSetVar(t *testing.T) {
tk.MustQuery("show global variables like 'tidb_ignore_prepared_cache_close_stmt'").Check(testkit.Rows("tidb_ignore_prepared_cache_close_stmt OFF"))

// test for tidb_enable_new_cost_interface
tk.MustQuery("select @@global.tidb_enable_new_cost_interface").Check(testkit.Rows("0")) // default value is 0
tk.MustQuery("select @@global.tidb_enable_new_cost_interface").Check(testkit.Rows("1")) // default value is 1
tk.MustExec("set global tidb_enable_new_cost_interface=1")
tk.MustQuery("select @@global.tidb_enable_new_cost_interface").Check(testkit.Rows("1"))
tk.MustQuery("show global variables like 'tidb_enable_new_cost_interface'").Check(testkit.Rows()) // hidden
Expand Down
9 changes: 2 additions & 7 deletions executor/show_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,15 +426,10 @@ func (e *ShowExec) fetchShowStatsHealthy() {
}

func (e *ShowExec) appendTableForStatsHealthy(dbName, tblName, partitionName string, statsTbl *statistics.Table) {
if statsTbl.Pseudo {
healthy, ok := statsTbl.GetStatsHealthy()
if !ok {
return
}
var healthy int64
if statsTbl.ModifyCount < statsTbl.Count {
healthy = int64((1.0 - float64(statsTbl.ModifyCount)/float64(statsTbl.Count)) * 100.0)
} else if statsTbl.ModifyCount == 0 {
healthy = 100
}
e.appendRow([]interface{}{
dbName,
tblName,
Expand Down
Loading

0 comments on commit 19f8993

Please sign in to comment.