Skip to content

Commit

Permalink
Merge branch 'release-5.4' into release-5.4-1287eab595d0
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros authored Apr 27, 2022
2 parents 1543821 + 9687ea4 commit 81a3a87
Show file tree
Hide file tree
Showing 15 changed files with 222 additions and 50 deletions.
32 changes: 27 additions & 5 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,15 +1035,18 @@ func (m noopTableMetaMgr) FinishTable(ctx context.Context) error {
return nil
}

type singleMgrBuilder struct{}
type singleMgrBuilder struct {
taskID int64
}

func (b singleMgrBuilder) Init(context.Context) error {
return nil
}

func (b singleMgrBuilder) TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr {
return &singleTaskMetaMgr{
pd: pd,
pd: pd,
taskID: b.taskID,
}
}

Expand All @@ -1052,15 +1055,34 @@ func (b singleMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr {
}

type singleTaskMetaMgr struct {
pd *pdutil.PdController
pd *pdutil.PdController
taskID int64
initialized bool
sourceBytes uint64
clusterAvail uint64
}

func (m *singleTaskMetaMgr) InitTask(ctx context.Context, source int64) error {
m.sourceBytes = uint64(source)
m.initialized = true
return nil
}

func (m *singleTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error {
_, err := action(nil)
newTasks, err := action([]taskMeta{
{
taskID: m.taskID,
status: taskMetaStatusInitial,
sourceBytes: m.sourceBytes,
clusterAvail: m.clusterAvail,
},
})
for _, t := range newTasks {
if m.taskID == t.taskID {
m.sourceBytes = t.sourceBytes
m.clusterAvail = t.clusterAvail
}
}
return err
}

Expand All @@ -1069,7 +1091,7 @@ func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdut
}

func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
return true, nil
return m.initialized, nil
}

func (m *singleTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error) {
Expand Down
26 changes: 26 additions & 0 deletions br/pkg/lightning/restore/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"database/sql/driver"
"sort"
"time"

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
Expand Down Expand Up @@ -323,3 +324,28 @@ func (s *taskMetaMgrSuite) TestCheckTasksExclusively(c *C) {
c.Assert(err, IsNil)

}

func (s *taskMetaMgrSuite) TestSingleTaskMetaMgr(c *C) {
metaBuilder := singleMgrBuilder{
taskID: time.Now().UnixNano(),
}
metaMgr := metaBuilder.TaskMetaMgr(nil)

ok, err := metaMgr.CheckTaskExist(context.Background())
c.Assert(err, IsNil)
c.Assert(ok, IsFalse)

err = metaMgr.InitTask(context.Background(), 1<<30)
c.Assert(err, IsNil)

ok, err = metaMgr.CheckTaskExist(context.Background())
c.Assert(err, IsNil)
c.Assert(ok, IsTrue)

err = metaMgr.CheckTasksExclusively(context.Background(), func(tasks []taskMeta) ([]taskMeta, error) {
c.Assert(len(tasks), Equals, 1)
c.Assert(tasks[0].sourceBytes, Equals, uint64(1<<30))
return nil, nil
})
c.Assert(err, IsNil)
}
18 changes: 16 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,9 @@ func NewRestoreControllerWithPauser(
needChecksum: cfg.PostRestore.Checksum != config.OpLevelOff,
}
case isSSTImport:
metaBuilder = singleMgrBuilder{}
metaBuilder = singleMgrBuilder{
taskID: cfg.TaskID,
}
default:
metaBuilder = noopMetaMgrBuilder{}
}
Expand Down Expand Up @@ -1893,7 +1895,19 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
if err = rc.taskMgr.InitTask(ctx, source); err != nil {
return errors.Trace(err)
}
if rc.cfg.App.CheckRequirements {
}
if rc.cfg.App.CheckRequirements {
needCheck := true
if rc.cfg.Checkpoint.Enable {
taskCheckpoints, err := rc.checkpointsDB.TaskCheckpoint(ctx)
if err != nil {
return errors.Trace(err)
}
// If task checkpoint is initialized, it means check has been performed before.
// We don't need and shouldn't check again, because lightning may have already imported some data.
needCheck = taskCheckpoints == nil
}
if needCheck {
err = rc.localResource(source)
if err != nil {
return errors.Trace(err)
Expand Down
37 changes: 19 additions & 18 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,25 @@ func (do *Domain) Init(ddlLease time.Duration, sysExecutorFactory func(*Domain)
do.ddl = d
}
})

if config.GetGlobalConfig().Experimental.EnableGlobalKill {
if do.etcdClient != nil {
err := do.acquireServerID(ctx)
if err != nil {
logutil.BgLogger().Error("acquire serverID failed", zap.Error(err))
do.isLostConnectionToPD.Set(1) // will retry in `do.serverIDKeeper`
} else {
do.isLostConnectionToPD.Set(0)
}

do.wg.Add(1)
go do.serverIDKeeper()
} else {
// set serverID for standalone deployment to enable 'KILL'.
atomic.StoreUint64(&do.serverID, serverIDForStandalone)
}
}

// step 1: prepare the info/schema syncer which domain reload needed.
skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, skipRegisterToDashboard)
Expand All @@ -833,24 +852,6 @@ func (do *Domain) Init(ddlLease time.Duration, sysExecutorFactory func(*Domain)
return err
}

if config.GetGlobalConfig().Experimental.EnableGlobalKill {
if do.etcdClient != nil {
err := do.acquireServerID(ctx)
if err != nil {
logutil.BgLogger().Error("acquire serverID failed", zap.Error(err))
do.isLostConnectionToPD.Set(1) // will retry in `do.serverIDKeeper`
} else {
do.isLostConnectionToPD.Set(0)
}

do.wg.Add(1)
go do.serverIDKeeper()
} else {
// set serverID for standalone deployment to enable 'KILL'.
atomic.StoreUint64(&do.serverID, serverIDForStandalone)
}
}

// Only when the store is local that the lease value is 0.
// If the store is local, it doesn't need loadSchemaInLoop.
if ddlLease > 0 {
Expand Down
3 changes: 0 additions & 3 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,6 @@ func (e *CTEExec) Close() (err error) {
func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) {
e.curIter = 0
e.iterInTbl.SetIter(e.curIter)
// This means iterInTbl's can be read.
defer close(e.iterInTbl.GetBegCh())
chks := make([]*chunk.Chunk, 0, 10)
for {
if e.limitDone(e.iterInTbl) {
Expand Down Expand Up @@ -384,7 +382,6 @@ func (e *CTEExec) setupTblsForNewIteration() (err error) {
if err = e.iterInTbl.Reopen(); err != nil {
return err
}
defer close(e.iterInTbl.GetBegCh())
if e.isDistinct {
// Already deduplicated by resTbl, adding directly is ok.
for _, chk := range chks {
Expand Down
3 changes: 0 additions & 3 deletions executor/cte_table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ func (e *CTETableReaderExec) Open(ctx context.Context) error {
func (e *CTETableReaderExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
req.Reset()

// Wait until iterInTbl can be read. This is controlled by corresponding CTEExec.
<-e.iterInTbl.GetBegCh()

// We should read `iterInTbl` from the beginning when the next iteration starts.
// Can not directly judge whether to start the next iteration based on e.chkIdx,
// because some operators(Selection) may use forloop to read all data in `iterInTbl`.
Expand Down
32 changes: 32 additions & 0 deletions executor/cte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -408,3 +410,33 @@ func TestSpillToDisk(t *testing.T) {
}
rows.Check(testkit.Rows(resRows...))
}

func TestCTEExecError(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists src;")
tk.MustExec("create table src(first int, second int);")

insertStr := fmt.Sprintf("insert into src values (%d, %d)", rand.Intn(1000), rand.Intn(1000))
for i := 0; i < 1000; i++ {
insertStr += fmt.Sprintf(",(%d, %d)", rand.Intn(1000), rand.Intn(1000))
}
insertStr += ";"
tk.MustExec(insertStr)

// Increase projection concurrency and decrease chunk size
// to increase the probability of reproducing the problem.
tk.MustExec("set tidb_max_chunk_size = 32")
tk.MustExec("set tidb_projection_concurrency = 20")
for i := 0; i < 10; i++ {
err := tk.QueryToErr("with recursive cte(iter, first, second, result) as " +
"(select 1, first, second, first+second from src " +
" union all " +
"select iter+1, second, result, second+result from cte where iter < 80 )" +
"select * from cte")
require.True(t, terror.ErrorEqual(err, types.ErrOverflow))
}
}
9 changes: 7 additions & 2 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ type IndexNestedLoopHashJoin struct {
// taskCh is only used when `keepOuterOrder` is true.
taskCh chan *indexHashJoinTask

stats *indexLookUpJoinRuntimeStats
stats *indexLookUpJoinRuntimeStats
prepared bool
}

type indexHashJoinOuterWorker struct {
Expand Down Expand Up @@ -155,7 +156,6 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error {
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.finished.Store(false)
e.startWorkers(ctx)
return nil
}

Expand Down Expand Up @@ -229,6 +229,10 @@ func (e *IndexNestedLoopHashJoin) wait4JoinWorkers() {

// Next implements the IndexNestedLoopHashJoin Executor interface.
func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) error {
if !e.prepared {
e.startWorkers(ctx)
e.prepared = true
}
req.Reset()
if e.keepOuterOrder {
return e.runInOrder(ctx, req)
Expand Down Expand Up @@ -321,6 +325,7 @@ func (e *IndexNestedLoopHashJoin) Close() error {
}
e.joinChkResourceCh = nil
e.finished.Store(false)
e.prepared = false
return e.baseExecutor.Close()
}

Expand Down
7 changes: 6 additions & 1 deletion executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type IndexLookUpJoin struct {

stats *indexLookUpJoinRuntimeStats
finished *atomic.Value
prepared bool
}

type outerCtx struct {
Expand Down Expand Up @@ -174,7 +175,6 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error {
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.cancelFunc = nil
e.startWorkers(ctx)
return nil
}

Expand Down Expand Up @@ -258,6 +258,10 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork

// Next implements the Executor interface.
func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error {
if !e.prepared {
e.startWorkers(ctx)
e.prepared = true
}
if e.isOuterJoin {
atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows()))
}
Expand Down Expand Up @@ -764,6 +768,7 @@ func (e *IndexLookUpJoin) Close() error {
e.memTracker = nil
e.task = nil
e.finished.Store(false)
e.prepared = false
return e.baseExecutor.Close()
}

Expand Down
7 changes: 6 additions & 1 deletion executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type IndexLookUpMergeJoin struct {
lastColHelper *plannercore.ColWithCmpFuncManager

memTracker *memory.Tracker // track memory usage
prepared bool
}

type outerMergeCtx struct {
Expand Down Expand Up @@ -184,7 +185,6 @@ func (e *IndexLookUpMergeJoin) Open(ctx context.Context) error {
}
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.startWorkers(ctx)
return nil
}

Expand Down Expand Up @@ -271,6 +271,10 @@ func (e *IndexLookUpMergeJoin) newInnerMergeWorker(taskCh chan *lookUpMergeJoinT

// Next implements the Executor interface
func (e *IndexLookUpMergeJoin) Next(ctx context.Context, req *chunk.Chunk) error {
if !e.prepared {
e.startWorkers(ctx)
e.prepared = true
}
if e.isOuterJoin {
atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows()))
}
Expand Down Expand Up @@ -753,5 +757,6 @@ func (e *IndexLookUpMergeJoin) Close() error {
// cancelFunc control the outer worker and outer worker close the task channel.
e.workerWg.Wait()
e.memTracker = nil
e.prepared = false
return e.baseExecutor.Close()
}
6 changes: 3 additions & 3 deletions executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,7 @@ func (s *testInfoschemaClusterTableSuite) TestTableStorageStats(c *C) {
tk.MustQuery("select TABLE_SCHEMA, sum(TABLE_SIZE) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'test' group by TABLE_SCHEMA;").Check(testkit.Rows(
"test 2",
))
c.Assert(len(tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()), Equals, 28)
c.Assert(len(tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()), Equals, 29)

// More tests about the privileges.
tk.MustExec("create user 'testuser'@'localhost'")
Expand All @@ -944,14 +944,14 @@ func (s *testInfoschemaClusterTableSuite) TestTableStorageStats(c *C) {
Hostname: "localhost",
}, nil, nil), Equals, true)

tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("28"))
tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("29"))

c.Assert(tk.Se.Auth(&auth.UserIdentity{
Username: "testuser3",
Hostname: "localhost",
}, nil, nil), Equals, true)

tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("28"))
tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("29"))
}

func (s *testInfoschemaTableSuite) TestSequences(c *C) {
Expand Down
3 changes: 2 additions & 1 deletion planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ func (f *Fragment) init(p PhysicalPlan) error {
}
f.TableScan = x
case *PhysicalExchangeReceiver:
f.singleton = x.children[0].(*PhysicalExchangeSender).ExchangeType == tipb.ExchangeType_PassThrough
// TODO: after we support partial merge, we should check whether all the target exchangeReceiver is same.
f.singleton = f.singleton || x.children[0].(*PhysicalExchangeSender).ExchangeType == tipb.ExchangeType_PassThrough
f.ExchangeReceivers = append(f.ExchangeReceivers, x)
case *PhysicalUnionAll:
return errors.New("unexpected union all detected")
Expand Down
Loading

0 comments on commit 81a3a87

Please sign in to comment.