From 42ad0d6630593698d69dd09905ca7f260a5b7ae6 Mon Sep 17 00:00:00 2001 From: yisaer Date: Wed, 23 Nov 2022 16:27:16 +0800 Subject: [PATCH 1/9] support Signed-off-by: yisaer --- config/config.go | 17 +-- domain/domain.go | 35 +++++- domain/plan_replayer.go | 160 ++++++++++++++++++++++++---- domain/plan_replayer_handle_test.go | 5 +- session/session.go | 15 ++- 5 files changed, 195 insertions(+), 37 deletions(-) diff --git a/config/config.go b/config/config.go index 1ba2c7fbd1595..7e7e0c35377f0 100644 --- a/config/config.go +++ b/config/config.go @@ -655,14 +655,15 @@ type Performance struct { // Deprecated MemProfileInterval string `toml:"-" json:"-"` - IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"` - PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"` - GOGC int `toml:"gogc" json:"gogc"` - EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"` - StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"` - StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"` - AnalyzePartitionConcurrencyQuota uint `toml:"analyze-partition-concurrency-quota" json:"analyze-partition-concurrency-quota"` - EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"` + IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"` + PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"` + GOGC int `toml:"gogc" json:"gogc"` + EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"` + StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"` + StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"` + AnalyzePartitionConcurrencyQuota uint `toml:"analyze-partition-concurrency-quota" json:"analyze-partition-concurrency-quota"` + PlanReplayerDumpWorkerConcurrency uint `toml:"plan-replayer-dump-worker-concurrency" json:"plan-replayer-dump-worker-concurrency"` + EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"` // The following items are deprecated. We need to keep them here temporarily // to support the upgrade process. They can be removed in future. diff --git a/domain/domain.go b/domain/domain.go index 66fcf3ca0e3b3..b4e6523d6d03a 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1572,23 +1572,46 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) { } // SetupPlanReplayerHandle setup plan replayer handle -func (do *Domain) SetupPlanReplayerHandle(collectorSctx, dumperSctx sessionctx.Context) { +func (do *Domain) SetupPlanReplayerHandle(collectorSctx sessionctx.Context, workersSctxs []sessionctx.Context) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) do.planReplayerHandle = &planReplayerHandle{} do.planReplayerHandle.planReplayerTaskCollectorHandle = &planReplayerTaskCollectorHandle{ ctx: ctx, sctx: collectorSctx, } + taskCH := make(chan *PlanReplayerDumpTask, 16) + finished := &atomic.Bool{} + taskStatus := &planReplayerDumpTaskStatus{} + taskStatus.finishedTaskMu.finishedTask = map[PlanReplayerTaskKey]struct{}{} + taskStatus.runningTaskMu.runningTasks = map[PlanReplayerTaskKey]struct{}{} + do.planReplayerHandle.planReplayerTaskDumpHandle = &planReplayerTaskDumpHandle{ - ctx: ctx, - sctx: dumperSctx, - taskCH: make(chan *PlanReplayerDumpTask, 16), + taskCH: taskCH, + status: taskStatus, + finished: finished, + } + senderTaskCH := make(chan *PlanReplayerDumpTask, len(workersSctxs)) + do.planReplayerHandle.planReplayerTaskDumpHandle.sender = &planReplayerTaskDumpSender{ + taskCH: senderTaskCH, + } + do.planReplayerHandle.planReplayerTaskDumpHandle.workers = make([]*planReplayerTaskDumpWorker, 0) + for i := 0; i < len(workersSctxs); i++ { + worker := &planReplayerTaskDumpWorker{ + ctx: ctx, + sctx: workersSctxs[i], + taskCH: senderTaskCH, + status: taskStatus, + taskHandle: do.planReplayerHandle.planReplayerTaskCollectorHandle, + finished: finished, + } + do.planReplayerHandle.planReplayerTaskDumpHandle.workers = append(do.planReplayerHandle.planReplayerTaskDumpHandle.workers, worker) } } // SetupDumpFileGCChecker setup sctx func (do *Domain) SetupDumpFileGCChecker(ctx sessionctx.Context) { do.dumpFileGcChecker.setupSctx(ctx) + do.dumpFileGcChecker.planReplayerTaskStatus = do.planReplayerHandle.status } var planReplayerHandleLease atomic.Uint64 @@ -1635,9 +1658,13 @@ func (do *Domain) StartPlanReplayerHandle() { logutil.BgLogger().Info("PlanReplayerTaskDumpHandle exited.") util.Recover(metrics.LabelDomain, "PlanReplayerTaskDumpHandle", nil, false) }() + for _, worker := range do.planReplayerHandle.planReplayerTaskDumpHandle.workers { + go worker.run() + } for { select { case <-do.exit: + do.planReplayerHandle.planReplayerTaskDumpHandle.Finish() return case task := <-do.planReplayerHandle.planReplayerTaskDumpHandle.taskCH: do.planReplayerHandle.HandlePlanReplayerDumpTask(task) diff --git a/domain/plan_replayer.go b/domain/plan_replayer.go index f20db239a6ca4..23d9cfa4cb217 100644 --- a/domain/plan_replayer.go +++ b/domain/plan_replayer.go @@ -25,6 +25,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -49,9 +50,10 @@ import ( // For now it is used by `plan replayer` and `trace plan` statement type dumpFileGcChecker struct { sync.Mutex - gcLease time.Duration - paths []string - sctx sessionctx.Context + gcLease time.Duration + paths []string + sctx sessionctx.Context + planReplayerTaskStatus *planReplayerDumpTaskStatus } // GetPlanReplayerDirName returns plan replayer directory path. @@ -119,6 +121,7 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) { logutil.BgLogger().Info("dumpFileGcChecker successful", zap.String("filename", fileName)) if isPlanReplayer && p.sctx != nil { deletePlanReplayerStatus(context.Background(), p.sctx, fileName) + p.planReplayerTaskStatus.clearFinishedTask() } } } @@ -130,12 +133,8 @@ type planReplayerHandle struct { } // HandlePlanReplayerDumpTask handle dump task -func (h *planReplayerHandle) HandlePlanReplayerDumpTask(task *PlanReplayerDumpTask) bool { - success := h.dumpPlanReplayerDumpTask(task) - if success { - h.removeTask(task.PlanReplayerTaskKey) - } - return success +func (h *planReplayerHandle) HandlePlanReplayerDumpTask(task *PlanReplayerDumpTask) { + h.sender.sendTask(task) } type planReplayerTaskCollectorHandle struct { @@ -270,21 +269,111 @@ func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context return allKeys, nil } -type planReplayerTaskDumpHandle struct { - ctx context.Context - sctx sessionctx.Context +type planReplayerTaskDumpSender struct { taskCH chan *PlanReplayerDumpTask } -// DrainTask drain a task for unit test -func (h *planReplayerTaskDumpHandle) DrainTask() *PlanReplayerDumpTask { - return <-h.taskCH +func (s *planReplayerTaskDumpSender) sendTask(task *PlanReplayerDumpTask) { + s.taskCH <- task +} + +type planReplayerDumpTaskStatus struct { + runningTaskMu struct { + sync.RWMutex + runningTasks map[PlanReplayerTaskKey]struct{} + } + + finishedTaskMu struct { + sync.RWMutex + finishedTask map[PlanReplayerTaskKey]struct{} + } +} + +// GetRunningTaskStatusLen used for unit test +func (r *planReplayerDumpTaskStatus) GetRunningTaskStatusLen() int { + r.runningTaskMu.RLock() + defer r.runningTaskMu.RUnlock() + return len(r.runningTaskMu.runningTasks) +} + +// GetFinishedTaskStatusLen used for unit test +func (r *planReplayerDumpTaskStatus) GetFinishedTaskStatusLen() int { + r.finishedTaskMu.RLock() + defer r.finishedTaskMu.RUnlock() + return len(r.finishedTaskMu.finishedTask) +} + +func (r *planReplayerDumpTaskStatus) occupyRunningTaskKey(task *PlanReplayerDumpTask) bool { + r.runningTaskMu.Lock() + defer r.runningTaskMu.Unlock() + _, ok := r.runningTaskMu.runningTasks[task.PlanReplayerTaskKey] + if ok { + return false + } + r.runningTaskMu.runningTasks[task.PlanReplayerTaskKey] = struct{}{} + return true +} + +func (r *planReplayerDumpTaskStatus) releaseRunningTaskKey(task *PlanReplayerDumpTask) { + r.runningTaskMu.Lock() + defer r.runningTaskMu.Unlock() + delete(r.runningTaskMu.runningTasks, task.PlanReplayerTaskKey) +} + +func (r *planReplayerDumpTaskStatus) checkTaskKeyFinishedBefore(task *PlanReplayerDumpTask) bool { + r.finishedTaskMu.RLock() + defer r.finishedTaskMu.RUnlock() + _, ok := r.finishedTaskMu.finishedTask[task.PlanReplayerTaskKey] + return ok +} + +func (r *planReplayerDumpTaskStatus) setTaskFinished(task *PlanReplayerDumpTask) { + r.finishedTaskMu.Lock() + defer r.finishedTaskMu.Unlock() + r.finishedTaskMu.finishedTask[task.PlanReplayerTaskKey] = struct{}{} +} + +func (r *planReplayerDumpTaskStatus) clearFinishedTask() { + r.finishedTaskMu.Lock() + defer r.finishedTaskMu.Unlock() + r.finishedTaskMu.finishedTask = map[PlanReplayerTaskKey]struct{}{} +} + +type planReplayerTaskDumpWorker struct { + ctx context.Context + sctx sessionctx.Context + taskCH <-chan *PlanReplayerDumpTask + status *planReplayerDumpTaskStatus + taskHandle *planReplayerTaskCollectorHandle + finished *atomic.Bool +} + +func (w *planReplayerTaskDumpWorker) run() { + for task := range w.taskCH { + if w.status.checkTaskKeyFinishedBefore(task) { + continue + } + if !w.status.occupyRunningTaskKey(task) { + continue + } + w.HandleTask(task) + w.status.releaseRunningTaskKey(task) + if w.finished.Load() { + return + } + } } -// HandlePlanReplayerDumpTask handled the task -func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayerDumpTask) (success bool) { +// HandleTask handled task +func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (success bool) { + defer func() { + if success { + w.taskHandle.removeTask(task.PlanReplayerTaskKey) + w.status.setTaskFinished(task) + } + }() taskKey := task.PlanReplayerTaskKey - unhandled, err := checkUnHandledReplayerTask(h.ctx, h.sctx, taskKey) + unhandled, err := checkUnHandledReplayerTask(w.ctx, w.sctx, taskKey) if err != nil { logutil.BgLogger().Warn("check plan replayer capture task failed", zap.String("sqlDigest", taskKey.SQLDigest), @@ -303,13 +392,13 @@ func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayer zap.String("sqlDigest", taskKey.SQLDigest), zap.String("planDigest", taskKey.PlanDigest), zap.Error(err)) - return + return false } task.Zf = file task.FileName = fileName task.EncodedPlan, _ = task.EncodePlan(task.SessionVars.StmtCtx, false) jsStats := make(map[int64]*handle.JSONTable) - is := GetDomain(h.sctx).InfoSchema() + is := GetDomain(w.sctx).InfoSchema() for tblID, stat := range task.TblStats { tbl, ok := is.TableByID(tblID) if !ok { @@ -329,7 +418,7 @@ func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayer } jsStats[tblID] = r } - err = DumpPlanReplayerInfo(h.ctx, h.sctx, task) + err = DumpPlanReplayerInfo(w.ctx, w.sctx, task) if err != nil { logutil.BgLogger().Warn("dump plan replayer capture task result failed", zap.String("sqlDigest", taskKey.SQLDigest), @@ -340,6 +429,35 @@ func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayer return true } +type planReplayerTaskDumpHandle struct { + taskCH chan *PlanReplayerDumpTask + finished *atomic.Bool + status *planReplayerDumpTaskStatus + + sender *planReplayerTaskDumpSender + workers []*planReplayerTaskDumpWorker +} + +// GetTaskStatus used for test +func (h *planReplayerTaskDumpHandle) GetTaskStatus() *planReplayerDumpTaskStatus { + return h.status +} + +// GetWorker used for test +func (h *planReplayerTaskDumpHandle) GetWorker() *planReplayerTaskDumpWorker { + return h.workers[0] +} + +// Finish make finished flag ture +func (h *planReplayerTaskDumpHandle) Finish() { + h.finished.Store(true) +} + +// DrainTask drain a task for unit test +func (h *planReplayerTaskDumpHandle) DrainTask() *PlanReplayerDumpTask { + return <-h.taskCH +} + // SendTask send dumpTask in background task handler func (h *planReplayerTaskDumpHandle) SendTask(task *PlanReplayerDumpTask) { select { diff --git a/domain/plan_replayer_handle_test.go b/domain/plan_replayer_handle_test.go index 5a824ef4eeeb6..dccb400ecd5b6 100644 --- a/domain/plan_replayer_handle_test.go +++ b/domain/plan_replayer_handle_test.go @@ -89,8 +89,11 @@ func TestPlanReplayerHandleDumpTask(t *testing.T) { tk.MustQuery("select * from t;") task := prHandle.DrainTask() require.NotNil(t, task) - success := prHandle.HandlePlanReplayerDumpTask(task) + worker := prHandle.GetWorker() + success := worker.HandleTask(task) require.True(t, success) + require.Equal(t, prHandle.GetTaskStatus().GetRunningTaskStatusLen(), 0) + require.Equal(t, prHandle.GetTaskStatus().GetFinishedTaskStatusLen(), 1) // assert memory task consumed require.Len(t, prHandle.GetTasks(), 0) diff --git a/session/session.go b/session/session.go index 51a3e22d39aac..940359e4b0a39 100644 --- a/session/session.go +++ b/session/session.go @@ -2938,7 +2938,7 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { analyzeConcurrencyQuota := int(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota) concurrency := int(config.GetGlobalConfig().Performance.StatsLoadConcurrency) - ses, err := createSessions(store, 9) + ses, err := createSessions(store, 8) if err != nil { return nil, err } @@ -3012,11 +3012,20 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { }() } + planReplayerWorkerCnt := config.GetGlobalConfig().Performance.PlanReplayerDumpWorkerConcurrency + planReplayerWorkersSctx := make([]sessionctx.Context, planReplayerWorkerCnt) + pworkerSes, err := createSessions(store, 2) + if err != nil { + return nil, err + } + for i := 0; i < int(planReplayerWorkerCnt); i++ { + planReplayerWorkersSctx[i] = pworkerSes[i] + } // setup plan replayer handle - dom.SetupPlanReplayerHandle(ses[6], ses[7]) + dom.SetupPlanReplayerHandle(ses[6], planReplayerWorkersSctx) dom.StartPlanReplayerHandle() // setup dumpFileGcChecker - dom.SetupDumpFileGCChecker(ses[8]) + dom.SetupDumpFileGCChecker(ses[7]) dom.DumpFileGcCheckerLoop() // A sub context for update table stats, and other contexts for concurrent stats loading. From 3946c7ee48b8fd8ed7362b333c4752424f93002a Mon Sep 17 00:00:00 2001 From: yisaer Date: Thu, 24 Nov 2022 11:01:01 +0800 Subject: [PATCH 2/9] support Signed-off-by: yisaer --- config/config.go | 21 +++++++++++---------- session/session.go | 2 +- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/config/config.go b/config/config.go index 7e7e0c35377f0..afd4ed07707e0 100644 --- a/config/config.go +++ b/config/config.go @@ -921,16 +921,17 @@ var defaultConf = Config{ CommitterConcurrency: defTiKVCfg.CommitterConcurrency, MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour // TODO: set indexUsageSyncLease to 60s. - IndexUsageSyncLease: "0s", - GOGC: 100, - EnforceMPP: false, - PlanReplayerGCLease: "10m", - StatsLoadConcurrency: 5, - StatsLoadQueueSize: 1000, - AnalyzePartitionConcurrencyQuota: 16, - EnableStatsCacheMemQuota: false, - RunAutoAnalyze: true, - EnableLoadFMSketch: false, + IndexUsageSyncLease: "0s", + GOGC: 100, + EnforceMPP: false, + PlanReplayerGCLease: "10m", + StatsLoadConcurrency: 5, + StatsLoadQueueSize: 1000, + AnalyzePartitionConcurrencyQuota: 16, + PlanReplayerDumpWorkerConcurrency: 1, + EnableStatsCacheMemQuota: false, + RunAutoAnalyze: true, + EnableLoadFMSketch: false, }, ProxyProtocol: ProxyProtocol{ Networks: "", diff --git a/session/session.go b/session/session.go index 940359e4b0a39..e4e9441948da5 100644 --- a/session/session.go +++ b/session/session.go @@ -3014,7 +3014,7 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { planReplayerWorkerCnt := config.GetGlobalConfig().Performance.PlanReplayerDumpWorkerConcurrency planReplayerWorkersSctx := make([]sessionctx.Context, planReplayerWorkerCnt) - pworkerSes, err := createSessions(store, 2) + pworkerSes, err := createSessions(store, int(planReplayerWorkerCnt)) if err != nil { return nil, err } From f7efd5ea9a1e5e53071a8dd84d46ba2c06a03c57 Mon Sep 17 00:00:00 2001 From: yisaer Date: Thu, 24 Nov 2022 11:24:35 +0800 Subject: [PATCH 3/9] support Signed-off-by: yisaer --- domain/plan_replayer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/domain/plan_replayer.go b/domain/plan_replayer.go index 23d9cfa4cb217..9cfa3d657e524 100644 --- a/domain/plan_replayer.go +++ b/domain/plan_replayer.go @@ -450,6 +450,7 @@ func (h *planReplayerTaskDumpHandle) GetWorker() *planReplayerTaskDumpWorker { // Finish make finished flag ture func (h *planReplayerTaskDumpHandle) Finish() { + close(h.sender.taskCH) h.finished.Store(true) } From 227345b2c8ccaa3273d96b0f88eacc156580bf4a Mon Sep 17 00:00:00 2001 From: yisaer Date: Thu, 24 Nov 2022 11:29:04 +0800 Subject: [PATCH 4/9] support Signed-off-by: yisaer --- domain/domain.go | 8 +------- domain/plan_replayer.go | 21 ++++----------------- 2 files changed, 5 insertions(+), 24 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index b4e6523d6d03a..1a4c60c7429ef 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1590,16 +1590,12 @@ func (do *Domain) SetupPlanReplayerHandle(collectorSctx sessionctx.Context, work status: taskStatus, finished: finished, } - senderTaskCH := make(chan *PlanReplayerDumpTask, len(workersSctxs)) - do.planReplayerHandle.planReplayerTaskDumpHandle.sender = &planReplayerTaskDumpSender{ - taskCH: senderTaskCH, - } do.planReplayerHandle.planReplayerTaskDumpHandle.workers = make([]*planReplayerTaskDumpWorker, 0) for i := 0; i < len(workersSctxs); i++ { worker := &planReplayerTaskDumpWorker{ ctx: ctx, sctx: workersSctxs[i], - taskCH: senderTaskCH, + taskCH: taskCH, status: taskStatus, taskHandle: do.planReplayerHandle.planReplayerTaskCollectorHandle, finished: finished, @@ -1666,8 +1662,6 @@ func (do *Domain) StartPlanReplayerHandle() { case <-do.exit: do.planReplayerHandle.planReplayerTaskDumpHandle.Finish() return - case task := <-do.planReplayerHandle.planReplayerTaskDumpHandle.taskCH: - do.planReplayerHandle.HandlePlanReplayerDumpTask(task) } } }() diff --git a/domain/plan_replayer.go b/domain/plan_replayer.go index 9cfa3d657e524..1d31086331c6d 100644 --- a/domain/plan_replayer.go +++ b/domain/plan_replayer.go @@ -132,11 +132,6 @@ type planReplayerHandle struct { *planReplayerTaskDumpHandle } -// HandlePlanReplayerDumpTask handle dump task -func (h *planReplayerHandle) HandlePlanReplayerDumpTask(task *PlanReplayerDumpTask) { - h.sender.sendTask(task) -} - type planReplayerTaskCollectorHandle struct { taskMu struct { sync.RWMutex @@ -269,14 +264,6 @@ func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context return allKeys, nil } -type planReplayerTaskDumpSender struct { - taskCH chan *PlanReplayerDumpTask -} - -func (s *planReplayerTaskDumpSender) sendTask(task *PlanReplayerDumpTask) { - s.taskCH <- task -} - type planReplayerDumpTaskStatus struct { runningTaskMu struct { sync.RWMutex @@ -433,9 +420,7 @@ type planReplayerTaskDumpHandle struct { taskCH chan *PlanReplayerDumpTask finished *atomic.Bool status *planReplayerDumpTaskStatus - - sender *planReplayerTaskDumpSender - workers []*planReplayerTaskDumpWorker + workers []*planReplayerTaskDumpWorker } // GetTaskStatus used for test @@ -450,7 +435,7 @@ func (h *planReplayerTaskDumpHandle) GetWorker() *planReplayerTaskDumpWorker { // Finish make finished flag ture func (h *planReplayerTaskDumpHandle) Finish() { - close(h.sender.taskCH) + close(h.taskCH) h.finished.Store(true) } @@ -464,6 +449,8 @@ func (h *planReplayerTaskDumpHandle) SendTask(task *PlanReplayerDumpTask) { select { case h.taskCH <- task: default: + logutil.BgLogger().Info("discard one plan replayer dump task", + zap.String("sql digest", task.SQLDigest), zap.String("plan digest", task.PlanDigest)) // TODO: add metrics here // directly discard the task if the task channel is full in order not to block the query process } From d002af8a372f6b9ef9776c0675314f693bc1a6b6 Mon Sep 17 00:00:00 2001 From: yisaer Date: Thu, 24 Nov 2022 11:56:31 +0800 Subject: [PATCH 5/9] support Signed-off-by: yisaer --- domain/domain.go | 18 ++++----- domain/plan_replayer.go | 87 ++++++++++++++++++++--------------------- executor/compiler.go | 1 + 3 files changed, 50 insertions(+), 56 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 1a4c60c7429ef..328f29016db3e 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1580,25 +1580,21 @@ func (do *Domain) SetupPlanReplayerHandle(collectorSctx sessionctx.Context, work sctx: collectorSctx, } taskCH := make(chan *PlanReplayerDumpTask, 16) - finished := &atomic.Bool{} taskStatus := &planReplayerDumpTaskStatus{} taskStatus.finishedTaskMu.finishedTask = map[PlanReplayerTaskKey]struct{}{} taskStatus.runningTaskMu.runningTasks = map[PlanReplayerTaskKey]struct{}{} do.planReplayerHandle.planReplayerTaskDumpHandle = &planReplayerTaskDumpHandle{ - taskCH: taskCH, - status: taskStatus, - finished: finished, + taskCH: taskCH, + status: taskStatus, } do.planReplayerHandle.planReplayerTaskDumpHandle.workers = make([]*planReplayerTaskDumpWorker, 0) for i := 0; i < len(workersSctxs); i++ { worker := &planReplayerTaskDumpWorker{ - ctx: ctx, - sctx: workersSctxs[i], - taskCH: taskCH, - status: taskStatus, - taskHandle: do.planReplayerHandle.planReplayerTaskCollectorHandle, - finished: finished, + ctx: ctx, + sctx: workersSctxs[i], + taskCH: taskCH, + status: taskStatus, } do.planReplayerHandle.planReplayerTaskDumpHandle.workers = append(do.planReplayerHandle.planReplayerTaskDumpHandle.workers, worker) } @@ -1660,7 +1656,7 @@ func (do *Domain) StartPlanReplayerHandle() { for { select { case <-do.exit: - do.planReplayerHandle.planReplayerTaskDumpHandle.Finish() + do.planReplayerHandle.planReplayerTaskDumpHandle.Close() return } } diff --git a/domain/plan_replayer.go b/domain/plan_replayer.go index 1d31086331c6d..ff9d5b654976f 100644 --- a/domain/plan_replayer.go +++ b/domain/plan_replayer.go @@ -25,7 +25,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "github.com/pingcap/errors" @@ -127,20 +126,6 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) { } } -type planReplayerHandle struct { - *planReplayerTaskCollectorHandle - *planReplayerTaskDumpHandle -} - -type planReplayerTaskCollectorHandle struct { - taskMu struct { - sync.RWMutex - tasks map[PlanReplayerTaskKey]struct{} - } - ctx context.Context - sctx sessionctx.Context -} - func deletePlanReplayerStatus(ctx context.Context, sctx sessionctx.Context, token string) { ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) exec := sctx.(sqlexec.SQLExecutor) @@ -192,6 +177,35 @@ func insertPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx. } } +type planReplayerHandle struct { + *planReplayerTaskCollectorHandle + *planReplayerTaskDumpHandle +} + +// SendTask send dumpTask in background task handler +func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) { + select { + case h.planReplayerTaskDumpHandle.taskCH <- task: + // we directly remove the task key if we put task in channel successfully, if the task was failed to dump, + // the task handle will re-add the task in next loop + h.planReplayerTaskCollectorHandle.removeTask(task.PlanReplayerTaskKey) + default: + // TODO: add metrics here + // directly discard the task if the task channel is full in order not to block the query process + logutil.BgLogger().Info("discard one plan replayer dump task", + zap.String("sql digest", task.SQLDigest), zap.String("plan digest", task.PlanDigest)) + } +} + +type planReplayerTaskCollectorHandle struct { + taskMu struct { + sync.RWMutex + tasks map[PlanReplayerTaskKey]struct{} + } + ctx context.Context + sctx sessionctx.Context +} + // CollectPlanReplayerTask collects all unhandled plan replayer task func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask() error { allKeys, err := h.collectAllPlanReplayerTask(h.ctx) @@ -265,11 +279,13 @@ func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context } type planReplayerDumpTaskStatus struct { + // running task records the task running by all workers in order to avoid multi workers running the same task key runningTaskMu struct { sync.RWMutex runningTasks map[PlanReplayerTaskKey]struct{} } + // finished task records the finished task in order to avoid running finished task key finishedTaskMu struct { sync.RWMutex finishedTask map[PlanReplayerTaskKey]struct{} @@ -327,12 +343,10 @@ func (r *planReplayerDumpTaskStatus) clearFinishedTask() { } type planReplayerTaskDumpWorker struct { - ctx context.Context - sctx sessionctx.Context - taskCH <-chan *PlanReplayerDumpTask - status *planReplayerDumpTaskStatus - taskHandle *planReplayerTaskCollectorHandle - finished *atomic.Bool + ctx context.Context + sctx sessionctx.Context + taskCH <-chan *PlanReplayerDumpTask + status *planReplayerDumpTaskStatus } func (w *planReplayerTaskDumpWorker) run() { @@ -340,14 +354,12 @@ func (w *planReplayerTaskDumpWorker) run() { if w.status.checkTaskKeyFinishedBefore(task) { continue } - if !w.status.occupyRunningTaskKey(task) { + successOccupy := w.status.occupyRunningTaskKey(task) + if !successOccupy { continue } w.HandleTask(task) w.status.releaseRunningTaskKey(task) - if w.finished.Load() { - return - } } } @@ -355,7 +367,6 @@ func (w *planReplayerTaskDumpWorker) run() { func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (success bool) { defer func() { if success { - w.taskHandle.removeTask(task.PlanReplayerTaskKey) w.status.setTaskFinished(task) } }() @@ -417,10 +428,9 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc } type planReplayerTaskDumpHandle struct { - taskCH chan *PlanReplayerDumpTask - finished *atomic.Bool - status *planReplayerDumpTaskStatus - workers []*planReplayerTaskDumpWorker + taskCH chan *PlanReplayerDumpTask + status *planReplayerDumpTaskStatus + workers []*planReplayerTaskDumpWorker } // GetTaskStatus used for test @@ -433,10 +443,9 @@ func (h *planReplayerTaskDumpHandle) GetWorker() *planReplayerTaskDumpWorker { return h.workers[0] } -// Finish make finished flag ture -func (h *planReplayerTaskDumpHandle) Finish() { +// Close make finished flag ture +func (h *planReplayerTaskDumpHandle) Close() { close(h.taskCH) - h.finished.Store(true) } // DrainTask drain a task for unit test @@ -444,18 +453,6 @@ func (h *planReplayerTaskDumpHandle) DrainTask() *PlanReplayerDumpTask { return <-h.taskCH } -// SendTask send dumpTask in background task handler -func (h *planReplayerTaskDumpHandle) SendTask(task *PlanReplayerDumpTask) { - select { - case h.taskCH <- task: - default: - logutil.BgLogger().Info("discard one plan replayer dump task", - zap.String("sql digest", task.SQLDigest), zap.String("plan digest", task.PlanDigest)) - // TODO: add metrics here - // directly discard the task if the task channel is full in order not to block the query process - } -} - func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, task PlanReplayerTaskKey) (bool, error) { exec := sctx.(sqlexec.SQLExecutor) rs, err := exec.ExecuteInternal(ctx, fmt.Sprintf("select * from mysql.plan_replayer_status where sql_digest = '%v' and plan_digest = '%v' and fail_reason is null", task.SQLDigest, task.PlanDigest)) diff --git a/executor/compiler.go b/executor/compiler.go index 6d62b6e3272cf..923f3936a3e97 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -170,6 +170,7 @@ func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode for _, task := range tasks { if task.SQLDigest == sqlDigest.String() && task.PlanDigest == planDigest.String() { sendPlanReplayerDumpTask(sqlDigest.String(), planDigest.String(), sctx, stmtNode) + return } } } From 94b8373c46bb913b819e16854edfbc3c27642528 Mon Sep 17 00:00:00 2001 From: yisaer Date: Thu, 24 Nov 2022 13:48:36 +0800 Subject: [PATCH 6/9] add log Signed-off-by: yisaer --- domain/domain.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 328f29016db3e..8e197a42e14b6 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1653,13 +1653,8 @@ func (do *Domain) StartPlanReplayerHandle() { for _, worker := range do.planReplayerHandle.planReplayerTaskDumpHandle.workers { go worker.run() } - for { - select { - case <-do.exit: - do.planReplayerHandle.planReplayerTaskDumpHandle.Close() - return - } - } + <-do.exit + do.planReplayerHandle.planReplayerTaskDumpHandle.Close() }() } From 630fab204a9f7173e80bf14dd03c1516a3760be5 Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 28 Nov 2022 14:17:28 +0800 Subject: [PATCH 7/9] fix Signed-off-by: yisaer --- executor/compiler.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/executor/compiler.go b/executor/compiler.go index 923f3936a3e97..25dac1f8d8cc3 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -164,7 +164,11 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS } func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode) { - tasks := domain.GetDomain(sctx).GetPlanReplayerHandle().GetTasks() + handle := domain.GetDomain(sctx).GetPlanReplayerHandle() + if handle == nil { + return + } + tasks := handle.GetTasks() _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() _, planDigest := getPlanDigest(sctx.GetSessionVars().StmtCtx) for _, task := range tasks { From 3a908f9962cc73e3425ffd4ed3cf142e5ff9dba7 Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 28 Nov 2022 14:22:28 +0800 Subject: [PATCH 8/9] fix Signed-off-by: yisaer --- executor/compiler.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/executor/compiler.go b/executor/compiler.go index 25dac1f8d8cc3..5d16a4fbea6e7 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -164,7 +164,11 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS } func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode) { - handle := domain.GetDomain(sctx).GetPlanReplayerHandle() + dom := domain.GetDomain(sctx) + if dom == nil { + return + } + handle := dom.GetPlanReplayerHandle() if handle == nil { return } From 164f95800d40b6d3125e99dbc1e97e475d73aade Mon Sep 17 00:00:00 2001 From: yisaer Date: Wed, 7 Dec 2022 12:15:41 +0800 Subject: [PATCH 9/9] fix stats --- domain/plan_replayer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/domain/plan_replayer.go b/domain/plan_replayer.go index ff9d5b654976f..d237445f5404d 100644 --- a/domain/plan_replayer.go +++ b/domain/plan_replayer.go @@ -416,6 +416,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc } jsStats[tblID] = r } + task.JSONTblStats = jsStats err = DumpPlanReplayerInfo(w.ctx, w.sctx, task) if err != nil { logutil.BgLogger().Warn("dump plan replayer capture task result failed",