Skip to content

Commit

Permalink
Merge branch 'master' into support-tiflash-extract-duration
Browse files Browse the repository at this point in the history
  • Loading branch information
birdstorm authored Nov 14, 2022
2 parents 3c0ca84 + 78d1905 commit 5ea3ddc
Show file tree
Hide file tree
Showing 21 changed files with 2,657 additions and 997 deletions.
3 changes: 3 additions & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"domainctx.go",
"optimize_trace.go",
"plan_replayer.go",
"plan_replayer_dump.go",
"schema_checker.go",
"schema_validator.go",
"sysvar_cache.go",
Expand Down Expand Up @@ -55,8 +56,10 @@ go_library(
"//util/logutil",
"//util/memory",
"//util/memoryusagealarm",
"//util/printer",
"//util/servermemorylimit",
"//util/sqlexec",
"@com_github_burntsushi_toml//:toml",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
13 changes: 10 additions & 3 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1533,9 +1533,16 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) {

// SetupPlanReplayerHandle setup plan replayer handle
func (do *Domain) SetupPlanReplayerHandle(ctx sessionctx.Context) {
do.planReplayerHandle = &planReplayerHandle{}
do.planReplayerHandle.sctxMu.sctx = ctx
do.dumpFileGcChecker.setupPlanReplayerHandle(do.planReplayerHandle)
do.planReplayerHandle = &planReplayerHandle{
planReplayerTaskCollectorHandle: &planReplayerTaskCollectorHandle{
sctx: ctx,
},
}
}

// SetupDumpFileGCChecker setup sctx
func (do *Domain) SetupDumpFileGCChecker(ctx sessionctx.Context) {
do.dumpFileGcChecker.setupSctx(ctx)
}

var planReplayerHandleLease = 10 * time.Second
Expand Down
69 changes: 29 additions & 40 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ import (
// For now it is used by `plan replayer` and `trace plan` statement
type dumpFileGcChecker struct {
sync.Mutex
gcLease time.Duration
paths []string
planReplayerHandle *planReplayerHandle
gcLease time.Duration
paths []string
sctx sessionctx.Context
}

// GetPlanReplayerDirName returns plan replayer directory path.
Expand Down Expand Up @@ -85,8 +85,8 @@ func (p *dumpFileGcChecker) gcDumpFiles(t time.Duration) {
}
}

func (p *dumpFileGcChecker) setupPlanReplayerHandle(handle *planReplayerHandle) {
p.planReplayerHandle = handle
func (p *dumpFileGcChecker) setupSctx(sctx sessionctx.Context) {
p.sctx = sctx
}

func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) {
Expand All @@ -113,39 +113,36 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) {
continue
}
logutil.BgLogger().Info("dumpFileGcChecker successful", zap.String("filename", fileName))
if isPlanReplayer && p.planReplayerHandle != nil {
p.planReplayerHandle.deletePlanReplayerStatus(context.Background(), fileName)
if isPlanReplayer && p.sctx != nil {
deletePlanReplayerStatus(context.Background(), p.sctx, fileName)
}
}
}
}

type planReplayerHandle struct {
sctxMu struct {
sync.Mutex
sctx sessionctx.Context
}
*planReplayerTaskCollectorHandle
}

type planReplayerTaskCollectorHandle struct {
taskMu struct {
sync.RWMutex
tasks map[PlanReplayerTaskKey]struct{}
}
sctx sessionctx.Context
}

// DeletePlanReplayerStatus delete mysql.plan_replayer_status record
func (h *planReplayerHandle) deletePlanReplayerStatus(ctx context.Context, token string) {
func deletePlanReplayerStatus(ctx context.Context, sctx sessionctx.Context, token string) {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
exec := sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx1, fmt.Sprintf("delete from mysql.plan_replayer_status where token = %v", token))
if err != nil {
logutil.BgLogger().Warn("delete mysql.plan_replayer_status record failed", zap.String("token", token), zap.Error(err))
}
}

// InsertPlanReplayerStatus insert mysql.plan_replayer_status record
func (h *planReplayerHandle) InsertPlanReplayerStatus(ctx context.Context, records []PlanReplayerStatusRecord) {
// insertPlanReplayerStatus insert mysql.plan_replayer_status record
func insertPlanReplayerStatus(ctx context.Context, sctx sessionctx.Context, records []PlanReplayerStatusRecord) {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
var instance string
serverInfo, err := infosync.GetServerInfo()
Expand All @@ -158,18 +155,16 @@ func (h *planReplayerHandle) InsertPlanReplayerStatus(ctx context.Context, recor
for _, record := range records {
if !record.Internal {
if len(record.FailedReason) > 0 {
h.insertExternalPlanReplayerErrorStatusRecord(ctx1, instance, record)
insertExternalPlanReplayerErrorStatusRecord(ctx1, sctx, instance, record)
} else {
h.insertExternalPlanReplayerSuccessStatusRecord(ctx1, instance, record)
insertExternalPlanReplayerSuccessStatusRecord(ctx1, sctx, instance, record)
}
}
}
}

func (h *planReplayerHandle) insertExternalPlanReplayerErrorStatusRecord(ctx context.Context, instance string, record PlanReplayerStatusRecord) {
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
func insertExternalPlanReplayerErrorStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) {
exec := sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, fmt.Sprintf(
"insert into mysql.plan_replayer_status (origin_sql, fail_reason, instance) values ('%s','%s','%s')",
record.OriginSQL, record.FailedReason, instance))
Expand All @@ -179,10 +174,8 @@ func (h *planReplayerHandle) insertExternalPlanReplayerErrorStatusRecord(ctx con
}
}

func (h *planReplayerHandle) insertExternalPlanReplayerSuccessStatusRecord(ctx context.Context, instance string, record PlanReplayerStatusRecord) {
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
func insertExternalPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) {
exec := sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, fmt.Sprintf(
"insert into mysql.plan_replayer_status (origin_sql, token, instance) values ('%s','%s','%s')",
record.OriginSQL, record.Token, instance))
Expand All @@ -193,15 +186,15 @@ func (h *planReplayerHandle) insertExternalPlanReplayerSuccessStatusRecord(ctx c
}

// CollectPlanReplayerTask collects all unhandled plan replayer task
func (h *planReplayerHandle) CollectPlanReplayerTask(ctx context.Context) error {
func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask(ctx context.Context) error {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
allKeys, err := h.collectAllPlanReplayerTask(ctx1)
if err != nil {
return err
}
tasks := make([]PlanReplayerTaskKey, 0)
for _, key := range allKeys {
unhandled, err := h.checkUnHandledReplayerTask(ctx1, key)
unhandled, err := checkUnHandledReplayerTask(ctx1, h.sctx, key)
if err != nil {
return err
}
Expand All @@ -214,7 +207,7 @@ func (h *planReplayerHandle) CollectPlanReplayerTask(ctx context.Context) error
}

// GetTasks get all tasks
func (h *planReplayerHandle) GetTasks() []PlanReplayerTaskKey {
func (h *planReplayerTaskCollectorHandle) GetTasks() []PlanReplayerTaskKey {
tasks := make([]PlanReplayerTaskKey, 0)
h.taskMu.RLock()
defer h.taskMu.RUnlock()
Expand All @@ -224,7 +217,7 @@ func (h *planReplayerHandle) GetTasks() []PlanReplayerTaskKey {
return tasks
}

func (h *planReplayerHandle) setupTasks(tasks []PlanReplayerTaskKey) {
func (h *planReplayerTaskCollectorHandle) setupTasks(tasks []PlanReplayerTaskKey) {
r := make(map[PlanReplayerTaskKey]struct{})
for _, task := range tasks {
r[task] = struct{}{}
Expand All @@ -234,10 +227,8 @@ func (h *planReplayerHandle) setupTasks(tasks []PlanReplayerTaskKey) {
h.taskMu.tasks = r
}

func (h *planReplayerHandle) collectAllPlanReplayerTask(ctx context.Context) ([]PlanReplayerTaskKey, error) {
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context.Context) ([]PlanReplayerTaskKey, error) {
exec := h.sctx.(sqlexec.SQLExecutor)
rs, err := exec.ExecuteInternal(ctx, "select sql_digest, plan_digest from mysql.plan_replayer_task")
if err != nil {
return nil, err
Expand All @@ -261,10 +252,8 @@ func (h *planReplayerHandle) collectAllPlanReplayerTask(ctx context.Context) ([]
return allKeys, nil
}

func (h *planReplayerHandle) checkUnHandledReplayerTask(ctx context.Context, task PlanReplayerTaskKey) (bool, error) {
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, task PlanReplayerTaskKey) (bool, error) {
exec := sctx.(sqlexec.SQLExecutor)
rs, err := exec.ExecuteInternal(ctx, fmt.Sprintf("select * from mysql.plan_replayer_status where sql_digest = '%v' and plan_digest = '%v' and fail_reason is null", task.sqlDigest, task.planDigest))
if err != nil {
return false, err
Expand Down
Loading

0 comments on commit 5ea3ddc

Please sign in to comment.