diff --git a/br/pkg/lightning/common/storage_unix.go b/br/pkg/lightning/common/storage_unix.go index ba22e92354ceb..7e602cbe58eec 100644 --- a/br/pkg/lightning/common/storage_unix.go +++ b/br/pkg/lightning/common/storage_unix.go @@ -23,13 +23,18 @@ import ( "syscall" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "golang.org/x/sys/unix" ) // GetStorageSize gets storage's capacity and available size func GetStorageSize(dir string) (size StorageSize, err error) { - var stat unix.Statfs_t + failpoint.Inject("GetStorageSize", func(val failpoint.Value) { + injectedSize := val.(int) + failpoint.Return(StorageSize{Capacity: uint64(injectedSize), Available: uint64(injectedSize)}, nil) + }) + var stat unix.Statfs_t err = unix.Statfs(dir, &stat) if err != nil { return size, errors.Annotatef(err, "cannot get disk capacity at %s", dir) diff --git a/br/pkg/lightning/common/storage_windows.go b/br/pkg/lightning/common/storage_windows.go index 21a2398ad66c3..a95e8f8eeebfc 100644 --- a/br/pkg/lightning/common/storage_windows.go +++ b/br/pkg/lightning/common/storage_windows.go @@ -23,6 +23,7 @@ import ( "unsafe" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" ) var ( @@ -32,6 +33,10 @@ var ( // GetStorageSize gets storage's capacity and available size func GetStorageSize(dir string) (size StorageSize, err error) { + failpoint.Inject("GetStorageSize", func(val failpoint.Value) { + injectedSize := val.(int) + failpoint.Return(StorageSize{Capacity: uint64(injectedSize), Available: uint64(injectedSize)}, nil) + }) r, _, e := getDiskFreeSpaceExW.Call( uintptr(unsafe.Pointer(syscall.StringToUTF16Ptr(dir))), uintptr(unsafe.Pointer(&size.Available)), diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index 8f2e6f2dfa9ac..c13c17b3efb1d 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -519,6 +519,7 @@ type TikvImporter struct { DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"` RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"` DuplicateResolution DuplicateResolutionAlgorithm `toml:"duplicate-resolution" json:"duplicate-resolution"` + IncrementalImport bool `toml:"incremental-import" json:"incremental-import"` EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"` LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"` diff --git a/br/pkg/lightning/restore/check_info.go b/br/pkg/lightning/restore/check_info.go index 1b86ee482f362..72b660bd7559a 100644 --- a/br/pkg/lightning/restore/check_info.go +++ b/br/pkg/lightning/restore/check_info.go @@ -17,6 +17,7 @@ package restore import ( "bytes" "context" + "database/sql" "fmt" "io" "path/filepath" @@ -24,10 +25,15 @@ import ( "sort" "strconv" "strings" + "sync" "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "modernc.org/mathutil" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" @@ -38,15 +44,13 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/table/tables" "github.com/tikv/pd/server/api" pdconfig "github.com/tikv/pd/server/config" - - "go.uber.org/zap" - "modernc.org/mathutil" ) const ( @@ -454,33 +458,31 @@ func (rc *Controller) localResource(sourceSize int64) error { if err != nil { return errors.Trace(err) } - localAvailable := storageSize.Available + localAvailable := int64(storageSize.Available) var message string var passed bool switch { - case localAvailable > uint64(sourceSize): + case localAvailable > sourceSize: message = fmt.Sprintf("local disk resources are rich, estimate sorted data size %s, local available is %s", units.BytesSize(float64(sourceSize)), units.BytesSize(float64(localAvailable))) passed = true + case int64(rc.cfg.TikvImporter.DiskQuota) > localAvailable: + message = fmt.Sprintf("local disk space may not enough to finish import, estimate sorted data size is %s,"+ + " but local available is %s, please set `tikv-importer.disk-quota` to a smaller value than %s"+ + " or change `mydumper.sorted-kv-dir` to another disk with enough space to finish imports", + units.BytesSize(float64(sourceSize)), + units.BytesSize(float64(localAvailable)), units.BytesSize(float64(localAvailable))) + passed = false + log.L().Error(message) default: - if int64(rc.cfg.TikvImporter.DiskQuota) > int64(localAvailable) { - message = fmt.Sprintf("local disk space may not enough to finish import"+ - "estimate sorted data size is %s, but local available is %s,"+ - "you need a smaller number for tikv-importer.disk-quota (%s) to finish imports", - units.BytesSize(float64(sourceSize)), - units.BytesSize(float64(localAvailable)), units.BytesSize(float64(rc.cfg.TikvImporter.DiskQuota))) - passed = false - log.L().Error(message) - } else { - message = fmt.Sprintf("local disk space may not enough to finish import, "+ - "estimate sorted data size is %s, but local available is %s,"+ - "we will use disk-quota (size: %s) to finish imports, which may slow down import", - units.BytesSize(float64(sourceSize)), - units.BytesSize(float64(localAvailable)), units.BytesSize(float64(rc.cfg.TikvImporter.DiskQuota))) - passed = true - log.L().Warn(message) - } + message = fmt.Sprintf("local disk space may not enough to finish import, "+ + "estimate sorted data size is %s, but local available is %s,"+ + "we will use disk-quota (size: %s) to finish imports, which may slow down import", + units.BytesSize(float64(sourceSize)), + units.BytesSize(float64(localAvailable)), units.BytesSize(float64(rc.cfg.TikvImporter.DiskQuota))) + passed = true + log.L().Warn(message) } rc.checkTemplate.Collect(Critical, passed, message) return nil @@ -870,3 +872,84 @@ outloop: log.L().Info("Sample source data", zap.String("table", tableMeta.Name), zap.Float64("IndexRatio", tableMeta.IndexRatio), zap.Bool("IsSourceOrder", tableMeta.IsRowOrdered)) return nil } + +func (rc *Controller) checkTableEmpty(ctx context.Context) error { + if rc.cfg.TikvImporter.Backend == config.BackendTiDB || rc.cfg.TikvImporter.IncrementalImport { + return nil + } + db, _ := rc.tidbGlue.GetDB() + + tableCount := 0 + for _, db := range rc.dbMetas { + tableCount += len(db.Tables) + } + + var lock sync.Mutex + tableNames := make([]string, 0) + concurrency := utils.MinInt(tableCount, rc.cfg.App.RegionConcurrency) + ch := make(chan string, concurrency) + eg, gCtx := errgroup.WithContext(ctx) + for i := 0; i < concurrency; i++ { + eg.Go(func() error { + for tblName := range ch { + // skip tables that have checkpoint + if rc.cfg.Checkpoint.Enable { + _, err := rc.checkpointsDB.Get(gCtx, tblName) + switch { + case err == nil: + continue + case errors.IsNotFound(err): + default: + return errors.Trace(err) + } + } + + hasData, err1 := tableContainsData(gCtx, db, tblName) + if err1 != nil { + return err1 + } + if hasData { + lock.Lock() + tableNames = append(tableNames, tblName) + lock.Unlock() + } + } + return nil + }) + } + for _, db := range rc.dbMetas { + for _, tbl := range db.Tables { + ch <- common.UniqueTable(tbl.DB, tbl.Name) + } + } + close(ch) + if err := eg.Wait(); err != nil { + if common.IsContextCanceledError(err) { + return nil + } + return errors.Trace(err) + } + + if len(tableNames) > 0 { + // sort the failed names + sort.Strings(tableNames) + msg := fmt.Sprintf("table(s) [%s] are not empty", strings.Join(tableNames, ", ")) + rc.checkTemplate.Collect(Critical, false, msg) + } + return nil +} + +func tableContainsData(ctx context.Context, db utils.QueryExecutor, tableName string) (bool, error) { + query := "select 1 from " + tableName + " limit 1" + var dump int + err := db.QueryRowContext(ctx, query).Scan(&dump) + + switch { + case err == sql.ErrNoRows: + return false, nil + case err != nil: + return false, errors.Trace(err) + default: + return true, nil + } +} diff --git a/br/pkg/lightning/restore/check_info_test.go b/br/pkg/lightning/restore/check_info_test.go new file mode 100644 index 0000000000000..4426f3bc68be9 --- /dev/null +++ b/br/pkg/lightning/restore/check_info_test.go @@ -0,0 +1,223 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package restore + +import ( + "context" + "database/sql" + "path/filepath" + + "github.com/DATA-DOG/go-sqlmock" + . "github.com/pingcap/check" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/parser/mysql" + + "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" + "github.com/pingcap/tidb/br/pkg/lightning/config" + "github.com/pingcap/tidb/br/pkg/lightning/glue" + "github.com/pingcap/tidb/br/pkg/lightning/mydump" + "github.com/pingcap/tidb/br/pkg/lightning/worker" + "github.com/pingcap/tidb/br/pkg/storage" +) + +var _ = Suite(&checkInfoSuite{}) + +type checkInfoSuite struct{} + +func (s *checkInfoSuite) TestCheckTableEmpty(c *C) { + dir := c.MkDir() + cfg := config.NewConfig() + cfg.Checkpoint.Enable = false + dbMetas := []*mydump.MDDatabaseMeta{ + { + Name: "test1", + Tables: []*mydump.MDTableMeta{ + { + DB: "test1", + Name: "tbl1", + }, + { + DB: "test1", + Name: "tbl2", + }, + }, + }, + { + Name: "test2", + Tables: []*mydump.MDTableMeta{ + { + DB: "test2", + Name: "tbl1", + }, + }, + }, + } + + rc := &Controller{ + cfg: cfg, + dbMetas: dbMetas, + checkpointsDB: checkpoints.NewNullCheckpointsDB(), + } + + ctx := context.Background() + + // test tidb will do nothing + rc.cfg.TikvImporter.Backend = config.BackendTiDB + err := rc.checkTableEmpty(ctx) + c.Assert(err, IsNil) + + // test incremental mode + rc.cfg.TikvImporter.Backend = config.BackendLocal + rc.cfg.TikvImporter.IncrementalImport = true + err = rc.checkTableEmpty(ctx) + c.Assert(err, IsNil) + + rc.cfg.TikvImporter.IncrementalImport = false + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + mock.MatchExpectationsInOrder(false) + rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone) + mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + mock.ExpectQuery("select 1 from `test2`.`tbl1` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + // not error, need not to init check template + err = rc.checkTableEmpty(ctx) + c.Assert(err, IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) + + // single table contains data + db, mock, err = sqlmock.New() + c.Assert(err, IsNil) + rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone) + mock.MatchExpectationsInOrder(false) + mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + mock.ExpectQuery("select 1 from `test2`.`tbl1` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1)) + rc.checkTemplate = NewSimpleTemplate() + err = rc.checkTableEmpty(ctx) + c.Assert(err, IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) + tmpl := rc.checkTemplate.(*SimpleTemplate) + c.Assert(len(tmpl.criticalMsgs), Equals, 1) + c.Assert(tmpl.criticalMsgs[0], Matches, "table\\(s\\) \\[`test2`.`tbl1`\\] are not empty") + + // multi tables contains data + db, mock, err = sqlmock.New() + c.Assert(err, IsNil) + rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone) + mock.MatchExpectationsInOrder(false) + mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1)) + mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + mock.ExpectQuery("select 1 from `test2`.`tbl1` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1)) + rc.checkTemplate = NewSimpleTemplate() + err = rc.checkTableEmpty(ctx) + c.Assert(err, IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) + tmpl = rc.checkTemplate.(*SimpleTemplate) + c.Assert(len(tmpl.criticalMsgs), Equals, 1) + c.Assert(tmpl.criticalMsgs[0], Matches, "table\\(s\\) \\[`test1`.`tbl1`, `test2`.`tbl1`\\] are not empty") + + // init checkpoint with only two of the three tables + dbInfos := map[string]*checkpoints.TidbDBInfo{ + "test1": { + Name: "test1", + Tables: map[string]*checkpoints.TidbTableInfo{ + "tbl1": { + Name: "tbl1", + }, + }, + }, + "test2": { + Name: "test2", + Tables: map[string]*checkpoints.TidbTableInfo{ + "tbl1": { + Name: "tbl1", + }, + }, + }, + } + rc.cfg.Checkpoint.Enable = true + rc.checkpointsDB = checkpoints.NewFileCheckpointsDB(filepath.Join(dir, "cp.pb")) + err = rc.checkpointsDB.Initialize(ctx, cfg, dbInfos) + c.Check(err, IsNil) + db, mock, err = sqlmock.New() + c.Assert(err, IsNil) + rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone) + // only need to check the one that is not in checkpoint + mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + err = rc.checkTableEmpty(ctx) + c.Assert(err, IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) +} + +func (s *checkInfoSuite) TestLocalResource(c *C) { + dir := c.MkDir() + mockStore, err := storage.NewLocalStorage(dir) + c.Assert(err, IsNil) + + err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/common/GetStorageSize", "return(2048)") + c.Assert(err, IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/common/GetStorageSize") + }() + + cfg := config.NewConfig() + cfg.Mydumper.SourceDir = dir + cfg.TikvImporter.SortedKVDir = dir + cfg.TikvImporter.Backend = "local" + rc := &Controller{ + cfg: cfg, + store: mockStore, + ioWorkers: worker.NewPool(context.Background(), 1, "io"), + } + + // 1. source-size is smaller than disk-size, won't trigger error information + rc.checkTemplate = NewSimpleTemplate() + err = rc.localResource(1000) + c.Assert(err, IsNil) + tmpl := rc.checkTemplate.(*SimpleTemplate) + c.Assert(tmpl.warnFailedCount, Equals, 1) + c.Assert(tmpl.criticalFailedCount, Equals, 0) + c.Assert(tmpl.normalMsgs[1], Matches, "local disk resources are rich, estimate sorted data size 1000B, local available is 2KiB") + + // 2. source-size is bigger than disk-size, with default disk-quota will trigger a critical error + rc.checkTemplate = NewSimpleTemplate() + err = rc.localResource(4096) + c.Assert(err, IsNil) + tmpl = rc.checkTemplate.(*SimpleTemplate) + c.Assert(tmpl.warnFailedCount, Equals, 1) + c.Assert(tmpl.criticalFailedCount, Equals, 1) + c.Assert(tmpl.criticalMsgs[0], Matches, "local disk space may not enough to finish import, estimate sorted data size is 4KiB, but local available is 2KiB, please set `tikv-importer.disk-quota` to a smaller value than 2KiB or change `mydumper.sorted-kv-dir` to another disk with enough space to finish imports") + + // 3. source-size is bigger than disk-size, with a vaild disk-quota will trigger a warning + rc.checkTemplate = NewSimpleTemplate() + rc.cfg.TikvImporter.DiskQuota = config.ByteSize(1024) + err = rc.localResource(4096) + c.Assert(err, IsNil) + tmpl = rc.checkTemplate.(*SimpleTemplate) + c.Assert(tmpl.warnFailedCount, Equals, 1) + c.Assert(tmpl.criticalFailedCount, Equals, 0) + c.Assert(tmpl.normalMsgs[1], Matches, "local disk space may not enough to finish import, estimate sorted data size is 4KiB, but local available is 2KiB,we will use disk-quota \\(size: 1KiB\\) to finish imports, which may slow down import") +} diff --git a/br/pkg/lightning/restore/check_template.go b/br/pkg/lightning/restore/check_template.go index 3fb8c22904caa..f38e23aa00f8e 100644 --- a/br/pkg/lightning/restore/check_template.go +++ b/br/pkg/lightning/restore/check_template.go @@ -51,7 +51,8 @@ type SimpleTemplate struct { count int warnFailedCount int criticalFailedCount int - failedMsg []string + normalMsgs []string // only used in unit test now + criticalMsgs []string t table.Writer } @@ -65,16 +66,12 @@ func NewSimpleTemplate() Template { {Name: "Passed", WidthMax: 6}, }) return &SimpleTemplate{ - 0, - 0, - 0, - make([]string, 0), - t, + t: t, } } func (c *SimpleTemplate) FailedMsg() string { - return strings.Join(c.failedMsg, ";\n") + return strings.Join(c.criticalMsgs, ";\n") } func (c *SimpleTemplate) Collect(t CheckType, passed bool, msg string) { @@ -87,7 +84,11 @@ func (c *SimpleTemplate) Collect(t CheckType, passed bool, msg string) { c.warnFailedCount++ } } - c.failedMsg = append(c.failedMsg, msg) + if !passed && t == Critical { + c.criticalMsgs = append(c.criticalMsgs, msg) + } else { + c.normalMsgs = append(c.normalMsgs, msg) + } c.t.AppendRow(table.Row{c.count, msg, t, passed}) c.t.AppendSeparator() } @@ -108,7 +109,7 @@ func (c *SimpleTemplate) FailedCount(t CheckType) int { func (c *SimpleTemplate) Output() string { c.t.SetAllowedRowLength(170) - c.t.SetRowPainter(table.RowPainter(func(row table.Row) text.Colors { + c.t.SetRowPainter(func(row table.Row) text.Colors { if passed, ok := row[3].(bool); ok { if !passed { if typ, ok := row[2].(CheckType); ok { @@ -122,7 +123,7 @@ func (c *SimpleTemplate) Output() string { } } return nil - })) + }) res := c.t.Render() summary := "\n" if c.criticalFailedCount > 0 { diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index 544b91c0b5f90..49358a9aee102 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -1027,9 +1027,65 @@ func (m noopTableMetaMgr) UpdateTableBaseChecksum(ctx context.Context, checksum } func (m noopTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) (bool, bool, *verify.KVChecksum, error) { - return false, false, nil, nil + return true, true, &verify.KVChecksum{}, nil } func (m noopTableMetaMgr) FinishTable(ctx context.Context) error { return nil } + +type singleMgrBuilder struct{} + +func (b singleMgrBuilder) Init(context.Context) error { + return nil +} + +func (b singleMgrBuilder) TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr { + return &singleTaskMetaMgr{ + pd: pd, + } +} + +func (b singleMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr { + return noopTableMetaMgr{} +} + +type singleTaskMetaMgr struct { + pd *pdutil.PdController +} + +func (m *singleTaskMetaMgr) InitTask(ctx context.Context, source int64) error { + return nil +} + +func (m *singleTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error { + _, err := action(nil) + return err +} + +func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) { + return m.pd.RemoveSchedulers(ctx) +} + +func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { + return true, nil +} + +func (m *singleTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error) { + return true, true, nil +} + +func (m *singleTaskMetaMgr) Cleanup(ctx context.Context) error { + return nil +} + +func (m *singleTaskMetaMgr) CleanupTask(ctx context.Context) error { + return nil +} + +func (m *singleTaskMetaMgr) CleanupAllMetas(ctx context.Context) error { + return nil +} + +func (m *singleTaskMetaMgr) Close() { +} diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 0ea46ea67bf14..fcebfa5f139cc 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -379,14 +379,17 @@ func NewRestoreControllerWithPauser( } var metaBuilder metaMgrBuilder - switch cfg.TikvImporter.Backend { - case config.BackendLocal, config.BackendImporter: + isSSTImport := cfg.TikvImporter.Backend == config.BackendLocal || cfg.TikvImporter.Backend == config.BackendImporter + switch { + case isSSTImport && cfg.TikvImporter.IncrementalImport: metaBuilder = &dbMetaMgrBuilder{ db: db, taskID: cfg.TaskID, schema: cfg.App.MetaSchemaName, needChecksum: cfg.PostRestore.Checksum != config.OpLevelOff, } + case isSSTImport: + metaBuilder = singleMgrBuilder{} default: metaBuilder = noopMetaMgrBuilder{} } @@ -1926,8 +1929,7 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error { if !taskExist && rc.taskMgr != nil { rc.taskMgr.CleanupTask(ctx) } - return errors.Errorf("tidb-lightning check failed."+ - " Please fix the failed check(s):\n %s", rc.checkTemplate.FailedMsg()) + return errors.Errorf("tidb-lightning pre-check failed: %s", rc.checkTemplate.FailedMsg()) } return nil } @@ -1978,6 +1980,11 @@ func (rc *Controller) DataCheck(ctx context.Context) error { } else { rc.checkTemplate.Collect(Critical, true, "table schemas are valid") } + + if err := rc.checkTableEmpty(ctx); err != nil { + return errors.Trace(err) + } + return nil } diff --git a/br/pkg/storage/gcs.go b/br/pkg/storage/gcs.go index 07ce5c8a862b9..e4835e0eb6111 100644 --- a/br/pkg/storage/gcs.go +++ b/br/pkg/storage/gcs.go @@ -180,11 +180,6 @@ func (s *gcsStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin opt = &WalkOption{} } - maxKeys := int64(1000) - if opt.ListCount > 0 { - maxKeys = opt.ListCount - } - prefix := path.Join(s.gcs.Prefix, opt.SubDir) if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") { prefix += "/" @@ -194,7 +189,7 @@ func (s *gcsStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin // only need each object's name and size query.SetAttrSelection([]string{"Name", "Size"}) iter := s.bucket.Objects(ctx, query) - for i := int64(0); i != maxKeys; i++ { + for { attrs, err := iter.Next() if err == iterator.Done { break diff --git a/br/pkg/storage/gcs_test.go b/br/pkg/storage/gcs_test.go index c3e63d6d410a2..ccf3927497bea 100644 --- a/br/pkg/storage/gcs_test.go +++ b/br/pkg/storage/gcs_test.go @@ -4,6 +4,7 @@ package storage import ( "context" + "fmt" "io" "os" @@ -95,6 +96,31 @@ func (r *testStorageSuite) TestGCS(c *C) { c.Assert(list, Equals, "keykey1key2") c.Assert(totalSize, Equals, int64(42)) + // test 1003 files + totalSize = 0 + for i := 0; i < 1000; i += 1 { + err = stg.WriteFile(ctx, fmt.Sprintf("f%d", i), []byte("data")) + c.Assert(err, IsNil) + } + filesSet := make(map[string]struct{}, 1003) + err = stg.WalkDir(ctx, nil, func(name string, size int64) error { + filesSet[name] = struct{}{} + totalSize += size + return nil + }) + c.Assert(err, IsNil) + c.Assert(totalSize, Equals, int64(42+4000)) + _, ok := filesSet["key"] + c.Assert(ok, IsTrue) + _, ok = filesSet["key1"] + c.Assert(ok, IsTrue) + _, ok = filesSet["key2"] + c.Assert(ok, IsTrue) + for i := 0; i < 1000; i += 1 { + _, ok = filesSet[fmt.Sprintf("f%d", i)] + c.Assert(ok, IsTrue) + } + efr, err := stg.Open(ctx, "key2") c.Assert(err, IsNil) diff --git a/br/tests/lightning_distributed_import/config.toml b/br/tests/lightning_distributed_import/config.toml index 200af8e45dfdc..947b16037dd5d 100644 --- a/br/tests/lightning_distributed_import/config.toml +++ b/br/tests/lightning_distributed_import/config.toml @@ -1,6 +1,7 @@ [tikv-importer] backend = 'local' duplicate-resolution = 'none' +incremental-import = true [post-restore] checksum = "required" diff --git a/br/tests/lightning_duplicate_detection/config1.toml b/br/tests/lightning_duplicate_detection/config1.toml index 0b2b6df2a70e8..6497e9e30949b 100644 --- a/br/tests/lightning_duplicate_detection/config1.toml +++ b/br/tests/lightning_duplicate_detection/config1.toml @@ -6,6 +6,7 @@ table-concurrency = 10 [tikv-importer] backend = "local" duplicate-resolution = 'record' +incremental-import = true [checkpoint] enable = true diff --git a/br/tests/lightning_duplicate_detection/config2.toml b/br/tests/lightning_duplicate_detection/config2.toml index e978ffb9cd8b5..760f50168508a 100644 --- a/br/tests/lightning_duplicate_detection/config2.toml +++ b/br/tests/lightning_duplicate_detection/config2.toml @@ -6,6 +6,7 @@ table-concurrency = 10 [tikv-importer] backend = "local" duplicate-resolution = 'record' +incremental-import = true [checkpoint] enable = true diff --git a/br/tests/lightning_incremental/config.toml b/br/tests/lightning_incremental/config.toml index e69de29bb2d1d..761e60b91b804 100644 --- a/br/tests/lightning_incremental/config.toml +++ b/br/tests/lightning_incremental/config.toml @@ -0,0 +1,2 @@ +[tikv-importer] +incremental-import = true diff --git a/br/tests/lightning_local_backend/run.sh b/br/tests/lightning_local_backend/run.sh index 6d0e7e9864145..5843210fea738 100755 --- a/br/tests/lightning_local_backend/run.sh +++ b/br/tests/lightning_local_backend/run.sh @@ -20,12 +20,23 @@ check_cluster_version 4 0 0 'local backend' || exit 0 ENGINE_COUNT=6 -# First, verify that inject with not leader error is fine. -rm -f "$TEST_DIR/lightning-local.log" +# Test check table contains data rm -f "/tmp/tidb_lightning_checkpoint_local_backend_test.pb" +rm -rf $TEST_DIR/lightning.log run_sql 'DROP DATABASE IF EXISTS cpeng;' -export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/backend/local/FailIngestMeta=1*return("notleader")' +run_sql 'CREATE DATABASE cpeng;' +run_sql 'CREATE TABLE cpeng.a (c int);' +run_sql 'CREATE TABLE cpeng.b (c int);' +run_sql "INSERT INTO cpeng.a values (1), (2);" +run_sql "INSERT INTO cpeng.b values (3);" +! run_lightning --backend local --enable-checkpoint=0 +grep -Fq 'table(s) [`cpeng`.`a`, `cpeng`.`b`] are not empty' $TEST_DIR/lightning.log + +# First, verify that inject with not leader error is fine. +export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/backend/local/FailIngestMeta=1*return("notleader")' +rm -f "$TEST_DIR/lightning-local.log" +run_sql 'DROP DATABASE IF EXISTS cpeng;' run_lightning --backend local --enable-checkpoint=1 --log-file "$TEST_DIR/lightning-local.log" --config "tests/$TEST_NAME/config.toml" # Check that everything is correctly imported diff --git a/br/tests/lightning_tidb_rowid/data/rowid.pre_rebase-schema.sql b/br/tests/lightning_tidb_rowid/data/rowid.pre_rebase-schema.sql index 887540be58110..1738b64457de6 100644 --- a/br/tests/lightning_tidb_rowid/data/rowid.pre_rebase-schema.sql +++ b/br/tests/lightning_tidb_rowid/data/rowid.pre_rebase-schema.sql @@ -1 +1 @@ -create table pre_rebase (pk varchar(6) primary key) auto_increment=70000; +create table pre_rebase (pk varchar(6) primary key /*T![clustered_index] NONCLUSTERED */) auto_increment=70000; diff --git a/br/tests/lightning_tidb_rowid/run.sh b/br/tests/lightning_tidb_rowid/run.sh index e877f420cf43f..ae762c514d93c 100755 --- a/br/tests/lightning_tidb_rowid/run.sh +++ b/br/tests/lightning_tidb_rowid/run.sh @@ -58,8 +58,13 @@ for BACKEND in local importer tidb; do run_sql 'SELECT count(*), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.pre_rebase' check_contains 'count(*): 1' - check_contains 'min(_tidb_rowid): 70000' - check_contains 'max(_tidb_rowid): 70000' + if [ "$BACKEND" == 'tidb' ]; then + check_contains 'min(_tidb_rowid): 70000' + check_contains 'max(_tidb_rowid): 70000' + else + check_contains 'min(_tidb_rowid): 1' + check_contains 'max(_tidb_rowid): 1' + fi run_sql 'INSERT INTO rowid.pre_rebase VALUES ("?")' run_sql 'SELECT _tidb_rowid > 70000 FROM rowid.pre_rebase WHERE pk = "?"' check_contains '_tidb_rowid > 70000: 1'