Skip to content

Commit

Permalink
*: add statistic lock/unlock function (#38387)
Browse files Browse the repository at this point in the history
  • Loading branch information
pingandb authored Nov 11, 2022
1 parent 4b08ccc commit 238caf6
Show file tree
Hide file tree
Showing 23 changed files with 9,972 additions and 8,825 deletions.
4 changes: 3 additions & 1 deletion ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func TestGetFlashbackKeyRanges(t *testing.T) {
// 3: (stats_extended)
// 4: (stats_fm_sketch)
// 5: (stats_history, stats_meta_history)
require.Len(t, kvRanges, 6)
// 6: (stats_table_locked)
require.Len(t, kvRanges, 7)

tk.MustExec("use test")
tk.MustExec("CREATE TABLE employees (" +
Expand All @@ -73,6 +74,7 @@ func TestGetFlashbackKeyRanges(t *testing.T) {
tk.MustExec("truncate table mysql.stats_fm_sketch")
tk.MustExec("truncate table mysql.stats_history")
tk.MustExec("truncate table mysql.stats_meta_history")
tk.MustExec("truncate table mysql.stats_table_locked")
kvRanges, err = ddl.GetFlashbackKeyRanges(se)
require.NoError(t, err)
require.Len(t, kvRanges, 2)
Expand Down
6 changes: 6 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1811,6 +1811,7 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
gcStatsTicker := time.NewTicker(100 * lease)
dumpFeedbackTicker := time.NewTicker(200 * lease)
loadFeedbackTicker := time.NewTicker(5 * lease)
loadLockedTablesTicker := time.NewTicker(5 * lease)
dumpColStatsUsageTicker := time.NewTicker(100 * lease)
readMemTricker := time.NewTicker(memory.ReadMemInterval)
statsHandle := do.StatsHandle()
Expand Down Expand Up @@ -1851,6 +1852,11 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
if err != nil {
logutil.BgLogger().Debug("update stats using feedback failed", zap.Error(err))
}
case <-loadLockedTablesTicker.C:
err := statsHandle.LoadLockedTables()
if err != nil {
logutil.BgLogger().Debug("load locked table failed", zap.Error(err))
}
case <-dumpFeedbackTicker.C:
err := statsHandle.DumpStatsFeedbackToKV()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ go_library(
"joiner.go",
"load_data.go",
"load_stats.go",
"lock_stats.go",
"mem_reader.go",
"memtable_reader.go",
"merge_join.go",
Expand Down
85 changes: 77 additions & 8 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,27 +82,97 @@ const (

// Next implements the Executor Next interface.
func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
var tasks []*analyzeTask
tids := make([]int64, 0)
skipedTables := make([]string, 0)
is := e.ctx.GetInfoSchema().(infoschema.InfoSchema)
for _, task := range e.tasks {
var tableID statistics.AnalyzeTableID
switch task.taskType {
case colTask:
tableID = task.colExec.tableID
case idxTask:
tableID = task.idxExec.tableID
case fastTask:
tableID = task.fastExec.tableID
case pkIncrementalTask:
tableID = task.colIncrementalExec.tableID
case idxIncrementalTask:
tableID = task.idxIncrementalExec.tableID
}
// skip locked tables
if !statsHandle.IsTableLocked(tableID.TableID) {
tasks = append(tasks, task)
}
// generate warning message
dup := false
for _, id := range tids {
if id == tableID.TableID {
dup = true
break
}
}
//avoid generate duplicate tables
if !dup {
if statsHandle.IsTableLocked(tableID.TableID) {
tbl, ok := is.TableByID(tableID.TableID)
if !ok {
return nil
}
skipedTables = append(skipedTables, tbl.Meta().Name.L)
}
tids = append(tids, tableID.TableID)
}
}

if len(skipedTables) > 0 {
tables := skipedTables[0]
for i, table := range skipedTables {
if i == 0 {
continue
}
tables += ", " + table
}
var msg string
if len(tids) > 1 {
if len(tids) > len(skipedTables) {
msg = "skip analyze locked tables: " + tables + ", other tables will be analyzed"
} else {
msg = "skip analyze locked tables: " + tables
}
} else {
msg = "skip analyze locked table: " + tables
}

e.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New(msg))
}

if len(tasks) == 0 {
return nil
}

concurrency, err := getBuildStatsConcurrency(e.ctx)
if err != nil {
return err
}
taskCh := make(chan *analyzeTask, len(e.tasks))
resultsCh := make(chan *statistics.AnalyzeResults, len(e.tasks))
if len(e.tasks) < concurrency {
concurrency = len(e.tasks)
taskCh := make(chan *analyzeTask, len(tasks))
resultsCh := make(chan *statistics.AnalyzeResults, len(tasks))
if len(tasks) < concurrency {
concurrency = len(tasks)
}
for i := 0; i < concurrency; i++ {
e.wg.Run(func() { e.analyzeWorker(taskCh, resultsCh) })
}
for _, task := range e.tasks {
for _, task := range tasks {
prepareV2AnalyzeJobInfo(task.colExec, false)
AddNewAnalyzeJob(e.ctx, task.job)
}
failpoint.Inject("mockKillPendingAnalyzeJob", func() {
dom := domain.GetDomain(e.ctx)
dom.SysProcTracker().KillSysProcess(util.GetAutoAnalyzeProcID(dom.ServerID))
})
for _, task := range e.tasks {
for _, task := range tasks {
taskCh <- task
}
close(taskCh)
Expand All @@ -113,7 +183,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
needGlobalStats := pruneMode == variable.Dynamic
globalStatsMap := make(map[globalStatsKey]globalStatsInfo)
err = e.handleResultsError(ctx, concurrency, needGlobalStats, globalStatsMap, resultsCh)
for _, task := range e.tasks {
for _, task := range tasks {
if task.colExec != nil && task.colExec.memTracker != nil {
task.colExec.memTracker.Detach()
}
Expand All @@ -135,7 +205,6 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
if err != nil {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
}
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
if e.ctx.GetSessionVars().InRestrictedSQL {
return statsHandle.Update(e.ctx.GetInfoSchema().(infoschema.InfoSchema))
}
Expand Down
20 changes: 20 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
return b.buildLoadData(v)
case *plannercore.LoadStats:
return b.buildLoadStats(v)
case *plannercore.LockStats:
return b.buildLockStats(v)
case *plannercore.UnlockStats:
return b.buildUnlockStats(v)
case *plannercore.IndexAdvise:
return b.buildIndexAdvise(v)
case *plannercore.PlanReplayer:
Expand Down Expand Up @@ -960,6 +964,22 @@ func (b *executorBuilder) buildLoadStats(v *plannercore.LoadStats) Executor {
return e
}

func (b *executorBuilder) buildLockStats(v *plannercore.LockStats) Executor {
e := &LockStatsExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
Tables: v.Tables,
}
return e
}

func (b *executorBuilder) buildUnlockStats(v *plannercore.UnlockStats) Executor {
e := &UnlockStatsExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
Tables: v.Tables,
}
return e
}

func (b *executorBuilder) buildIndexAdvise(v *plannercore.IndexAdvise) Executor {
e := &IndexAdviseExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
Expand Down
2 changes: 1 addition & 1 deletion executor/infoschema_cluster_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestTableStorageStats(t *testing.T) {
"test 2",
))
rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()
result := 39
result := 40
require.Len(t, rows, result)

// More tests about the privileges.
Expand Down
157 changes: 157 additions & 0 deletions executor/lock_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/util/chunk"
)

var _ Executor = &LockStatsExec{}
var _ Executor = &UnlockStatsExec{}

// LockStatsExec represents a lock statistic executor.
type LockStatsExec struct {
baseExecutor
Tables []*ast.TableName
}

// lockStatsVarKeyType is a dummy type to avoid naming collision in context.
type lockStatsVarKeyType int

// String defines a Stringer function for debugging and pretty printing.
func (k lockStatsVarKeyType) String() string {
return "lock_stats_var"
}

// LockStatsVarKey is a variable key for lock statistic.
const LockStatsVarKey lockStatsVarKeyType = 0

// Next implements the Executor Next interface.
func (e *LockStatsExec) Next(ctx context.Context, req *chunk.Chunk) error {
do := domain.GetDomain(e.ctx)
is := do.InfoSchema()
h := do.StatsHandle()
if h == nil {
return errors.New("Lock Stats: handle is nil")
}

tableNum := len(e.Tables)
if tableNum == 0 {
return errors.New("Lock Stats: table should not empty ")
}

tids := make([]int64, 0, len(e.Tables))
pids := make([]int64, 0)
for _, table := range e.Tables {
tbl, err := is.TableByName(table.Schema, table.Name)
if err != nil {
return err
}
tids = append(tids, tbl.Meta().ID)

pi := tbl.Meta().GetPartitionInfo()
if pi == nil {
continue
}
for _, p := range pi.Definitions {
pids = append(pids, p.ID)
}
}
msg, err := h.AddLockedTables(tids, pids, e.Tables)
if msg != "" {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New(msg))
}
return err
}

// Close implements the Executor Close interface.
func (e *LockStatsExec) Close() error {
return nil
}

// Open implements the Executor Open interface.
func (e *LockStatsExec) Open(ctx context.Context) error {
return nil
}

// UnlockStatsExec represents a unlock statistic executor.
type UnlockStatsExec struct {
baseExecutor
Tables []*ast.TableName
}

// unlockStatsVarKeyType is a dummy type to avoid naming collision in context.
type unlockStatsVarKeyType int

// String defines a Stringer function for debugging and pretty printing.
func (k unlockStatsVarKeyType) String() string {
return "unlock_stats_var"
}

// UnlockStatsVarKey is a variable key for unlock statistic.
const UnlockStatsVarKey unlockStatsVarKeyType = 0

// Next implements the Executor Next interface.
func (e *UnlockStatsExec) Next(ctx context.Context, req *chunk.Chunk) error {
do := domain.GetDomain(e.ctx)
is := do.InfoSchema()
h := do.StatsHandle()
if h == nil {
return errors.New("Unlock Stats: handle is nil")
}

tableNum := len(e.Tables)
if tableNum == 0 {
return errors.New("Unlock Stats: table should not empty ")
}

tids := make([]int64, 0, len(e.Tables))
pids := make([]int64, 0)
for _, table := range e.Tables {
tbl, err := is.TableByName(table.Schema, table.Name)
if err != nil {
return err
}
tids = append(tids, tbl.Meta().ID)

pi := tbl.Meta().GetPartitionInfo()
if pi == nil {
continue
}
for _, p := range pi.Definitions {
pids = append(pids, p.ID)
}
}
msg, err := h.RemoveLockedTables(tids, pids, e.Tables)
if msg != "" {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New(msg))
}
return err
}

// Close implements the Executor Close interface.
func (e *UnlockStatsExec) Close() error {
return nil
}

// Open implements the Executor Open interface.
func (e *UnlockStatsExec) Open(ctx context.Context) error {
return nil
}
2 changes: 2 additions & 0 deletions executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ func (e *ShowExec) fetchAll(ctx context.Context) error {
case ast.ShowStatsHealthy:
e.fetchShowStatsHealthy()
return nil
case ast.ShowStatsLocked:
return e.fetchShowStatsLocked()
case ast.ShowHistogramsInFlight:
e.fetchShowHistogramsInFlight()
return nil
Expand Down
Loading

0 comments on commit 238caf6

Please sign in to comment.