diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index a8b43167c83ea..d3e1712679a55 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -1035,7 +1035,9 @@ func (m noopTableMetaMgr) FinishTable(ctx context.Context) error { return nil } -type singleMgrBuilder struct{} +type singleMgrBuilder struct { + taskID int64 +} func (b singleMgrBuilder) Init(context.Context) error { return nil @@ -1043,7 +1045,8 @@ func (b singleMgrBuilder) Init(context.Context) error { func (b singleMgrBuilder) TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr { return &singleTaskMetaMgr{ - pd: pd, + pd: pd, + taskID: b.taskID, } } @@ -1052,15 +1055,34 @@ func (b singleMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr { } type singleTaskMetaMgr struct { - pd *pdutil.PdController + pd *pdutil.PdController + taskID int64 + initialized bool + sourceBytes uint64 + clusterAvail uint64 } func (m *singleTaskMetaMgr) InitTask(ctx context.Context, source int64) error { + m.sourceBytes = uint64(source) + m.initialized = true return nil } func (m *singleTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error { - _, err := action(nil) + newTasks, err := action([]taskMeta{ + { + taskID: m.taskID, + status: taskMetaStatusInitial, + sourceBytes: m.sourceBytes, + clusterAvail: m.clusterAvail, + }, + }) + for _, t := range newTasks { + if m.taskID == t.taskID { + m.sourceBytes = t.sourceBytes + m.clusterAvail = t.clusterAvail + } + } return err } @@ -1069,7 +1091,7 @@ func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdut } func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { - return true, nil + return m.initialized, nil } func (m *singleTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error) { diff --git a/br/pkg/lightning/restore/meta_manager_test.go b/br/pkg/lightning/restore/meta_manager_test.go index 31a3c35569c67..28c3e74b91215 100644 --- a/br/pkg/lightning/restore/meta_manager_test.go +++ b/br/pkg/lightning/restore/meta_manager_test.go @@ -6,6 +6,7 @@ import ( "context" "database/sql/driver" "sort" + "time" "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" @@ -323,3 +324,28 @@ func (s *taskMetaMgrSuite) TestCheckTasksExclusively(c *C) { c.Assert(err, IsNil) } + +func (s *taskMetaMgrSuite) TestSingleTaskMetaMgr(c *C) { + metaBuilder := singleMgrBuilder{ + taskID: time.Now().UnixNano(), + } + metaMgr := metaBuilder.TaskMetaMgr(nil) + + ok, err := metaMgr.CheckTaskExist(context.Background()) + c.Assert(err, IsNil) + c.Assert(ok, IsFalse) + + err = metaMgr.InitTask(context.Background(), 1<<30) + c.Assert(err, IsNil) + + ok, err = metaMgr.CheckTaskExist(context.Background()) + c.Assert(err, IsNil) + c.Assert(ok, IsTrue) + + err = metaMgr.CheckTasksExclusively(context.Background(), func(tasks []taskMeta) ([]taskMeta, error) { + c.Assert(len(tasks), Equals, 1) + c.Assert(tasks[0].sourceBytes, Equals, uint64(1<<30)) + return nil, nil + }) + c.Assert(err, IsNil) +} diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index f8bcebbfcd0e0..907d00d979a95 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -389,7 +389,9 @@ func NewRestoreControllerWithPauser( needChecksum: cfg.PostRestore.Checksum != config.OpLevelOff, } case isSSTImport: - metaBuilder = singleMgrBuilder{} + metaBuilder = singleMgrBuilder{ + taskID: cfg.TaskID, + } default: metaBuilder = noopMetaMgrBuilder{} } @@ -1893,7 +1895,19 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error { if err = rc.taskMgr.InitTask(ctx, source); err != nil { return errors.Trace(err) } - if rc.cfg.App.CheckRequirements { + } + if rc.cfg.App.CheckRequirements { + needCheck := true + if rc.cfg.Checkpoint.Enable { + taskCheckpoints, err := rc.checkpointsDB.TaskCheckpoint(ctx) + if err != nil { + return errors.Trace(err) + } + // If task checkpoint is initialized, it means check has been performed before. + // We don't need and shouldn't check again, because lightning may have already imported some data. + needCheck = taskCheckpoints == nil + } + if needCheck { err = rc.localResource(source) if err != nil { return errors.Trace(err) diff --git a/domain/domain.go b/domain/domain.go index 073a2dbb0b94a..94191d0c91394 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -811,6 +811,25 @@ func (do *Domain) Init(ddlLease time.Duration, sysExecutorFactory func(*Domain) do.ddl = d } }) + + if config.GetGlobalConfig().Experimental.EnableGlobalKill { + if do.etcdClient != nil { + err := do.acquireServerID(ctx) + if err != nil { + logutil.BgLogger().Error("acquire serverID failed", zap.Error(err)) + do.isLostConnectionToPD.Set(1) // will retry in `do.serverIDKeeper` + } else { + do.isLostConnectionToPD.Set(0) + } + + do.wg.Add(1) + go do.serverIDKeeper() + } else { + // set serverID for standalone deployment to enable 'KILL'. + atomic.StoreUint64(&do.serverID, serverIDForStandalone) + } + } + // step 1: prepare the info/schema syncer which domain reload needed. skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, skipRegisterToDashboard) @@ -833,24 +852,6 @@ func (do *Domain) Init(ddlLease time.Duration, sysExecutorFactory func(*Domain) return err } - if config.GetGlobalConfig().Experimental.EnableGlobalKill { - if do.etcdClient != nil { - err := do.acquireServerID(ctx) - if err != nil { - logutil.BgLogger().Error("acquire serverID failed", zap.Error(err)) - do.isLostConnectionToPD.Set(1) // will retry in `do.serverIDKeeper` - } else { - do.isLostConnectionToPD.Set(0) - } - - do.wg.Add(1) - go do.serverIDKeeper() - } else { - // set serverID for standalone deployment to enable 'KILL'. - atomic.StoreUint64(&do.serverID, serverIDForStandalone) - } - } - // Only when the store is local that the lease value is 0. // If the store is local, it doesn't need loadSchemaInLoop. if ddlLease > 0 { diff --git a/executor/cte.go b/executor/cte.go index a9f083fa70479..7ee4a78dc417b 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -230,8 +230,6 @@ func (e *CTEExec) Close() (err error) { func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { e.curIter = 0 e.iterInTbl.SetIter(e.curIter) - // This means iterInTbl's can be read. - defer close(e.iterInTbl.GetBegCh()) chks := make([]*chunk.Chunk, 0, 10) for { if e.limitDone(e.iterInTbl) { @@ -384,7 +382,6 @@ func (e *CTEExec) setupTblsForNewIteration() (err error) { if err = e.iterInTbl.Reopen(); err != nil { return err } - defer close(e.iterInTbl.GetBegCh()) if e.isDistinct { // Already deduplicated by resTbl, adding directly is ok. for _, chk := range chks { diff --git a/executor/cte_table_reader.go b/executor/cte_table_reader.go index efd5a0387e6cb..4afd8aabbb79f 100644 --- a/executor/cte_table_reader.go +++ b/executor/cte_table_reader.go @@ -41,9 +41,6 @@ func (e *CTETableReaderExec) Open(ctx context.Context) error { func (e *CTETableReaderExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { req.Reset() - // Wait until iterInTbl can be read. This is controlled by corresponding CTEExec. - <-e.iterInTbl.GetBegCh() - // We should read `iterInTbl` from the beginning when the next iteration starts. // Can not directly judge whether to start the next iteration based on e.chkIdx, // because some operators(Selection) may use forloop to read all data in `iterInTbl`. diff --git a/executor/cte_test.go b/executor/cte_test.go index bf6d33ede4a42..92c1c861fcc23 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -22,7 +22,9 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/types" "github.com/stretchr/testify/require" ) @@ -408,3 +410,33 @@ func TestSpillToDisk(t *testing.T) { } rows.Check(testkit.Rows(resRows...)) } + +func TestCTEExecError(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists src;") + tk.MustExec("create table src(first int, second int);") + + insertStr := fmt.Sprintf("insert into src values (%d, %d)", rand.Intn(1000), rand.Intn(1000)) + for i := 0; i < 1000; i++ { + insertStr += fmt.Sprintf(",(%d, %d)", rand.Intn(1000), rand.Intn(1000)) + } + insertStr += ";" + tk.MustExec(insertStr) + + // Increase projection concurrency and decrease chunk size + // to increase the probability of reproducing the problem. + tk.MustExec("set tidb_max_chunk_size = 32") + tk.MustExec("set tidb_projection_concurrency = 20") + for i := 0; i < 10; i++ { + err := tk.QueryToErr("with recursive cte(iter, first, second, result) as " + + "(select 1, first, second, first+second from src " + + " union all " + + "select iter+1, second, result, second+result from cte where iter < 80 )" + + "select * from cte") + require.True(t, terror.ErrorEqual(err, types.ErrOverflow)) + } +} diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 503dad70a8731..069c7b49a51bc 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -70,7 +70,8 @@ type IndexNestedLoopHashJoin struct { // taskCh is only used when `keepOuterOrder` is true. taskCh chan *indexHashJoinTask - stats *indexLookUpJoinRuntimeStats + stats *indexLookUpJoinRuntimeStats + prepared bool } type indexHashJoinOuterWorker struct { @@ -155,7 +156,6 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error { e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } e.finished.Store(false) - e.startWorkers(ctx) return nil } @@ -229,6 +229,10 @@ func (e *IndexNestedLoopHashJoin) wait4JoinWorkers() { // Next implements the IndexNestedLoopHashJoin Executor interface. func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) error { + if !e.prepared { + e.startWorkers(ctx) + e.prepared = true + } req.Reset() if e.keepOuterOrder { return e.runInOrder(ctx, req) @@ -321,6 +325,7 @@ func (e *IndexNestedLoopHashJoin) Close() error { } e.joinChkResourceCh = nil e.finished.Store(false) + e.prepared = false return e.baseExecutor.Close() } diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 44a7bdb4b9ca1..994385566dde3 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -86,6 +86,7 @@ type IndexLookUpJoin struct { stats *indexLookUpJoinRuntimeStats finished *atomic.Value + prepared bool } type outerCtx struct { @@ -174,7 +175,6 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error { e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } e.cancelFunc = nil - e.startWorkers(ctx) return nil } @@ -258,6 +258,10 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork // Next implements the Executor interface. func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error { + if !e.prepared { + e.startWorkers(ctx) + e.prepared = true + } if e.isOuterJoin { atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows())) } @@ -764,6 +768,7 @@ func (e *IndexLookUpJoin) Close() error { e.memTracker = nil e.task = nil e.finished.Store(false) + e.prepared = false return e.baseExecutor.Close() } diff --git a/executor/index_lookup_merge_join.go b/executor/index_lookup_merge_join.go index 7e4ac6a515ae9..999d22144bee9 100644 --- a/executor/index_lookup_merge_join.go +++ b/executor/index_lookup_merge_join.go @@ -73,6 +73,7 @@ type IndexLookUpMergeJoin struct { lastColHelper *plannercore.ColWithCmpFuncManager memTracker *memory.Tracker // track memory usage + prepared bool } type outerMergeCtx struct { @@ -184,7 +185,6 @@ func (e *IndexLookUpMergeJoin) Open(ctx context.Context) error { } e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) - e.startWorkers(ctx) return nil } @@ -271,6 +271,10 @@ func (e *IndexLookUpMergeJoin) newInnerMergeWorker(taskCh chan *lookUpMergeJoinT // Next implements the Executor interface func (e *IndexLookUpMergeJoin) Next(ctx context.Context, req *chunk.Chunk) error { + if !e.prepared { + e.startWorkers(ctx) + e.prepared = true + } if e.isOuterJoin { atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows())) } @@ -753,5 +757,6 @@ func (e *IndexLookUpMergeJoin) Close() error { // cancelFunc control the outer worker and outer worker close the task channel. e.workerWg.Wait() e.memTracker = nil + e.prepared = false return e.baseExecutor.Close() } diff --git a/executor/infoschema_reader_test.go b/executor/infoschema_reader_test.go index 5e40147b99939..62345610c28b7 100644 --- a/executor/infoschema_reader_test.go +++ b/executor/infoschema_reader_test.go @@ -918,7 +918,7 @@ func (s *testInfoschemaClusterTableSuite) TestTableStorageStats(c *C) { tk.MustQuery("select TABLE_SCHEMA, sum(TABLE_SIZE) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'test' group by TABLE_SCHEMA;").Check(testkit.Rows( "test 2", )) - c.Assert(len(tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()), Equals, 28) + c.Assert(len(tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()), Equals, 29) // More tests about the privileges. tk.MustExec("create user 'testuser'@'localhost'") @@ -944,14 +944,14 @@ func (s *testInfoschemaClusterTableSuite) TestTableStorageStats(c *C) { Hostname: "localhost", }, nil, nil), Equals, true) - tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("28")) + tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("29")) c.Assert(tk.Se.Auth(&auth.UserIdentity{ Username: "testuser3", Hostname: "localhost", }, nil, nil), Equals, true) - tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("28")) + tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("29")) } func (s *testInfoschemaTableSuite) TestSequences(c *C) { diff --git a/planner/core/fragment.go b/planner/core/fragment.go index 491f35aed83e7..f84c1421185c3 100644 --- a/planner/core/fragment.go +++ b/planner/core/fragment.go @@ -126,7 +126,8 @@ func (f *Fragment) init(p PhysicalPlan) error { } f.TableScan = x case *PhysicalExchangeReceiver: - f.singleton = x.children[0].(*PhysicalExchangeSender).ExchangeType == tipb.ExchangeType_PassThrough + // TODO: after we support partial merge, we should check whether all the target exchangeReceiver is same. + f.singleton = f.singleton || x.children[0].(*PhysicalExchangeSender).ExchangeType == tipb.ExchangeType_PassThrough f.ExchangeReceivers = append(f.ExchangeReceivers, x) case *PhysicalUnionAll: return errors.New("unexpected union all detected") diff --git a/planner/core/fragment_test.go b/planner/core/fragment_test.go new file mode 100644 index 0000000000000..fa8ec9e99763c --- /dev/null +++ b/planner/core/fragment_test.go @@ -0,0 +1,53 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "github.com/pingcap/tipb/go-tipb" + "github.com/stretchr/testify/require" + + "testing" +) + +func TestFragmentInitSingleton(t *testing.T) { + r1, r2 := &PhysicalExchangeReceiver{}, &PhysicalExchangeReceiver{} + r1.SetChildren(&PhysicalExchangeSender{ExchangeType: tipb.ExchangeType_PassThrough}) + r2.SetChildren(&PhysicalExchangeSender{ExchangeType: tipb.ExchangeType_Broadcast}) + p := &PhysicalHashJoin{} + + f := &Fragment{} + p.SetChildren(r1, r1) + err := f.init(p) + require.NoError(t, err) + require.Equal(t, f.singleton, true) + + f = &Fragment{} + p.SetChildren(r1, r2) + err = f.init(p) + require.NoError(t, err) + require.Equal(t, f.singleton, true) + + f = &Fragment{} + p.SetChildren(r2, r1) + err = f.init(p) + require.NoError(t, err) + require.Equal(t, f.singleton, true) + + f = &Fragment{} + p.SetChildren(r2, r2) + err = f.init(p) + require.NoError(t, err) + require.Equal(t, f.singleton, false) +} diff --git a/session/bootstrap.go b/session/bootstrap.go index a3466cb8c87cb..5a1d4edb697b5 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -376,6 +376,16 @@ const ( column_ids TEXT(19372), PRIMARY KEY (table_id) CLUSTERED );` + // CreateStatsHistory stores the historical stats. + CreateStatsHistory = `CREATE TABLE IF NOT EXISTS mysql.stats_history ( + table_id bigint(64) NOT NULL, + stats_data longblob NOT NULL, + seq_no bigint(64) NOT NULL comment 'sequence number of the gzipped data slice', + version bigint(64) NOT NULL comment 'stats version which corresponding to stats:version in EXPLAIN', + create_time datetime(6) NOT NULL, + UNIQUE KEY table_version_seq (table_id, version, seq_no), + KEY table_create_time (table_id, create_time, seq_no) + );` ) // bootstrap initiates system DB for a store. @@ -557,11 +567,14 @@ const ( version81 = 81 // version82 adds the mysql.analyze_options table version82 = 82 + // version83 adds the tables mysql.stats_history. + // (In TiDB 6.0 and later this is done in version86.) /And update mysql.tables_priv from SET('Select','Insert','Update') to SET('Select','Insert','Update','References'). + version83 = 83 ) // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version -var currentBootstrapVersion int64 = version82 +var currentBootstrapVersion int64 = version83 var ( bootstrapVersion = []func(Session, int64){ @@ -647,6 +660,7 @@ var ( upgradeToVer80, upgradeToVer81, upgradeToVer82, + upgradeToVer83, } ) @@ -1703,6 +1717,14 @@ func upgradeToVer82(s Session, ver int64) { doReentrantDDL(s, CreateAnalyzeOptionsTable) } +func upgradeToVer83(s Session, ver int64) { + if ver >= version83 { + return + } + doReentrantDDL(s, CreateStatsHistory) + doReentrantDDL(s, "ALTER TABLE mysql.tables_priv MODIFY COLUMN Column_priv SET('Select','Insert','Update','References')") +} + func writeOOMAction(s Session) { comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`, @@ -1789,6 +1811,8 @@ func doDDLWorks(s Session) { mustExecute(s, CreateTableCacheMetaTable) // Create analyze_options table. mustExecute(s, CreateAnalyzeOptionsTable) + // Create stats_history table. + mustExecute(s, CreateStatsHistory) } // doDMLWorks executes DML statements in bootstrap stage. diff --git a/util/cteutil/storage.go b/util/cteutil/storage.go index 19b1bd5151fdc..a629398000898 100644 --- a/util/cteutil/storage.go +++ b/util/cteutil/storage.go @@ -82,11 +82,6 @@ type Storage interface { SetIter(iter int) GetIter() int - // We use this channel to notify reader that Storage is ready to read. - // It exists only to solve the special implementation of IndexLookUpJoin. - // We will find a better way and remove this later. - GetBegCh() chan struct{} - GetMemTracker() *memory.Tracker GetDiskTracker() *disk.Tracker ActionSpill() *chunk.SpillDiskAction @@ -239,11 +234,6 @@ func (s *StorageRC) GetIter() int { return s.iter } -// GetBegCh impls Storage GetBegCh interface. -func (s *StorageRC) GetBegCh() chan struct{} { - return s.begCh -} - // GetMemTracker impls Storage GetMemTracker interface. func (s *StorageRC) GetMemTracker() *memory.Tracker { return s.rc.GetMemTracker()