From ea9c4abcd6c3cf1d86e04014b4f9e230ed1ff4fc Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 7 Jan 2025 22:18:28 +0800 Subject: [PATCH] ttl: fix the issue that one task losing heartbeat will block other tasks (#57919) (#58710) close pingcap/tidb#57915 --- pkg/ttl/ttlworker/job_manager.go | 57 +++++++++------- .../ttlworker/job_manager_integration_test.go | 45 ++++++++++++- pkg/ttl/ttlworker/job_manager_test.go | 8 ++- pkg/ttl/ttlworker/task_manager.go | 50 ++++++++------ .../task_manager_integration_test.go | 65 ++++++++++++++++++- pkg/ttl/ttlworker/task_manager_test.go | 15 +++-- 6 files changed, 184 insertions(+), 56 deletions(-) diff --git a/pkg/ttl/ttlworker/job_manager.go b/pkg/ttl/ttlworker/job_manager.go index f7278dfb6a1e7..a0047f641b0c0 100644 --- a/pkg/ttl/ttlworker/job_manager.go +++ b/pkg/ttl/ttlworker/job_manager.go @@ -231,10 +231,7 @@ func (m *JobManager) jobLoop() error { // Job Schedule loop: case <-updateJobHeartBeatTicker: updateHeartBeatCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) - err = m.updateHeartBeat(updateHeartBeatCtx, se, now) - if err != nil { - logutil.Logger(m.ctx).Warn("fail to update job heart beat", zap.Error(err)) - } + m.updateHeartBeat(updateHeartBeatCtx, se, now) cancel() case <-jobCheckTicker: m.checkFinishedJob(se) @@ -277,10 +274,7 @@ func (m *JobManager) jobLoop() error { m.taskManager.resizeWorkersWithSysVar() case <-updateTaskHeartBeatTicker: updateHeartBeatCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) - err = m.taskManager.updateHeartBeat(updateHeartBeatCtx, se, now) - if err != nil { - logutil.Logger(m.ctx).Warn("fail to update task heart beat", zap.Error(err)) - } + m.taskManager.updateHeartBeat(updateHeartBeatCtx, se, now) cancel() case <-checkScanTaskFinishedTicker: if m.taskManager.handleScanFinishedTask() { @@ -897,29 +891,42 @@ func (m *JobManager) appendLockedJob(id string, se session.Session, createTime t } // updateHeartBeat updates the heartbeat for all task with current instance as owner -func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) error { +func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) { for _, job := range m.localJobs() { - if job.createTime.Add(ttlJobTimeout).Before(now) { - logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id)) - summary, err := summarizeErr(errors.New("job is timeout")) - if err != nil { - logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err)) - } - err = job.finish(se, now, summary) - if err != nil { - logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err)) - continue - } - m.removeJob(job) + err := m.updateHeartBeatForJob(ctx, se, now, job) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to update heartbeat for job", zap.Error(err), zap.String("jobID", job.id)) } + } +} - intest.Assert(se.GetSessionVars().TimeZone.String() == now.Location().String()) - sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id) - _, err := se.ExecuteSQL(ctx, sql, args...) +func (m *JobManager) updateHeartBeatForJob(ctx context.Context, se session.Session, now time.Time, job *ttlJob) error { + if job.createTime.Add(ttlJobTimeout).Before(now) { + logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id)) + summary, err := summarizeErr(errors.New("job is timeout")) if err != nil { - return errors.Wrapf(err, "execute sql: %s", sql) + return errors.Wrapf(err, "fail to summarize job") } + err = job.finish(se, now, summary) + if err != nil { + return errors.Wrapf(err, "fail to finish job") + } + m.removeJob(job) + return nil } + + intest.Assert(se.GetSessionVars().TimeZone.String() == now.Location().String()) + sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id) + _, err := se.ExecuteSQL(ctx, sql, args...) + if err != nil { + return errors.Wrapf(err, "execute sql: %s", sql) + } + + if se.GetSessionVars().StmtCtx.AffectedRows() != 1 { + return errors.Errorf("fail to update job heartbeat, maybe the owner is not myself (%s), affected rows: %d", + m.id, se.GetSessionVars().StmtCtx.AffectedRows()) + } + return nil } diff --git a/pkg/ttl/ttlworker/job_manager_integration_test.go b/pkg/ttl/ttlworker/job_manager_integration_test.go index c37d3ed237b54..6d7788851bf40 100644 --- a/pkg/ttl/ttlworker/job_manager_integration_test.go +++ b/pkg/ttl/ttlworker/job_manager_integration_test.go @@ -677,7 +677,7 @@ func TestJobTimeout(t *testing.T) { require.Equal(t, now.Format(time.RFC3339), newTableStatus.CurrentJobStatusUpdateTime.Format(time.RFC3339)) // the timeout will be checked while updating heartbeat - require.NoError(t, m2.UpdateHeartBeat(ctx, se, now.Add(7*time.Hour))) + require.NoError(t, m2.UpdateHeartBeatForJob(ctx, se, now.Add(7*time.Hour), m2.RunningJobs()[0])) tk.MustQuery("select last_job_summary->>'$.scan_task_err' from mysql.tidb_ttl_table_status").Check(testkit.Rows("job is timeout")) tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) } @@ -1617,3 +1617,46 @@ func TestTimerJobAfterDropTable(t *testing.T) { require.NotNil(t, job) require.True(t, job.Finished) } + +func TestJobHeartBeatFailNotBlockOthers(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + waitAndStopTTLManager(t, dom) + tk := testkit.NewTestKit(t, store) + + sessionFactory := sessionFactory(t, dom) + se := sessionFactory() + + tk.MustExec("use test") + tk.MustExec("CREATE TABLE t1 (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR") + tk.MustExec("CREATE TABLE t2 (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR") + testTable1, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t1")) + require.NoError(t, err) + testTable2, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t2")) + require.NoError(t, err) + + ctx := context.Background() + m := ttlworker.NewJobManager("test-ttl-job-manager-1", nil, store, nil, nil) + + now := se.Now() + // acquire two jobs + require.NoError(t, m.InfoSchemaCache().Update(se)) + require.NoError(t, m.TableStatusCache().Update(ctx, se)) + _, err = m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable1.Meta().ID], now, uuid.NewString(), false) + require.NoError(t, err) + _, err = m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable2.Meta().ID], now, uuid.NewString(), false) + require.NoError(t, err) + tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running", "running")) + + // assign the first job to another manager + tk.MustExec("update mysql.tidb_ttl_table_status set current_job_owner_id = 'test-ttl-job-manager-2' where table_id = ?", testTable1.Meta().ID) + // the heartbeat of the first job will fail, but the second one will still success + now = now.Add(time.Hour) + require.Error(t, m.UpdateHeartBeatForJob(context.Background(), se, now, m.RunningJobs()[0])) + require.NoError(t, m.UpdateHeartBeatForJob(context.Background(), se, now, m.RunningJobs()[1])) + + now = now.Add(time.Hour) + m.UpdateHeartBeat(ctx, se, now) + tk.MustQuery("select table_id, current_job_owner_hb_time from mysql.tidb_ttl_table_status").Sort().Check(testkit.Rows( + fmt.Sprintf("%d %s", testTable1.Meta().ID, now.Add(-2*time.Hour).Format(time.DateTime)), + fmt.Sprintf("%d %s", testTable2.Meta().ID, now.Format(time.DateTime)))) +} diff --git a/pkg/ttl/ttlworker/job_manager_test.go b/pkg/ttl/ttlworker/job_manager_test.go index ec24c91d5c679..bb539f996ed9d 100644 --- a/pkg/ttl/ttlworker/job_manager_test.go +++ b/pkg/ttl/ttlworker/job_manager_test.go @@ -194,8 +194,12 @@ func (m *JobManager) TaskManager() *taskManager { } // UpdateHeartBeat is an exported version of updateHeartBeat for test -func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) error { - return m.updateHeartBeat(ctx, se, now) +func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) { + m.updateHeartBeat(ctx, se, now) +} + +func (m *JobManager) UpdateHeartBeatForJob(ctx context.Context, se session.Session, now time.Time, job *ttlJob) error { + return m.updateHeartBeatForJob(ctx, se, now, job) } // ReportMetrics is an exported version of reportMetrics diff --git a/pkg/ttl/ttlworker/task_manager.go b/pkg/ttl/ttlworker/task_manager.go index 9285d86970e4d..d8dfd3e608ed6 100644 --- a/pkg/ttl/ttlworker/task_manager.go +++ b/pkg/ttl/ttlworker/task_manager.go @@ -439,32 +439,40 @@ func (m *taskManager) syncTaskFromTable(se session.Session, jobID string, scanID } // updateHeartBeat updates the heartbeat for all tasks with current instance as owner -func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) error { +func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) { for _, task := range m.runningTasks { - state := &cache.TTLTaskState{ - TotalRows: task.statistics.TotalRows.Load(), - SuccessRows: task.statistics.SuccessRows.Load(), - ErrorRows: task.statistics.ErrorRows.Load(), - } - if task.result != nil && task.result.err != nil { - state.ScanTaskErr = task.result.err.Error() - } - - intest.Assert(se.GetSessionVars().Location().String() == now.Location().String()) - sql, args, err := updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state, m.id) + err := m.updateHeartBeatForTask(ctx, se, now, task) if err != nil { - return err - } - _, err = se.ExecuteSQL(ctx, sql, args...) - if err != nil { - return errors.Wrapf(err, "execute sql: %s", sql) + logutil.Logger(m.ctx).Warn("fail to update task heart beat", zap.Error(err), zap.String("jobID", task.JobID), zap.Int64("scanID", task.ScanID)) } + } +} - if se.GetSessionVars().StmtCtx.AffectedRows() != 1 { - return errors.Errorf("fail to update task status, maybe the owner is not myself (%s), affected rows: %d", - m.id, se.GetSessionVars().StmtCtx.AffectedRows()) - } +func (m *taskManager) updateHeartBeatForTask(ctx context.Context, se session.Session, now time.Time, task *runningScanTask) error { + state := &cache.TTLTaskState{ + TotalRows: task.statistics.TotalRows.Load(), + SuccessRows: task.statistics.SuccessRows.Load(), + ErrorRows: task.statistics.ErrorRows.Load(), + } + if task.result != nil && task.result.err != nil { + state.ScanTaskErr = task.result.err.Error() + } + + intest.Assert(se.GetSessionVars().Location().String() == now.Location().String()) + sql, args, err := updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state, m.id) + if err != nil { + return err } + _, err = se.ExecuteSQL(ctx, sql, args...) + if err != nil { + return errors.Wrapf(err, "execute sql: %s", sql) + } + + if se.GetSessionVars().StmtCtx.AffectedRows() != 1 { + return errors.Errorf("fail to update task heartbeat, maybe the owner is not myself (%s), affected rows: %d", + m.id, se.GetSessionVars().StmtCtx.AffectedRows()) + } + return nil } diff --git a/pkg/ttl/ttlworker/task_manager_integration_test.go b/pkg/ttl/ttlworker/task_manager_integration_test.go index 9f95edd13cd78..195a57fe939e1 100644 --- a/pkg/ttl/ttlworker/task_manager_integration_test.go +++ b/pkg/ttl/ttlworker/task_manager_integration_test.go @@ -481,7 +481,9 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) { tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-2'").Check(testkit.Rows("4")) // Then m1 cannot update the heartbeat of its task - require.Error(t, m1.UpdateHeartBeat(context.Background(), se, now.Add(time.Hour))) + for i := 0; i < 4; i++ { + require.Error(t, m1.UpdateHeartBeatForTask(context.Background(), se, now.Add(time.Hour), m1.GetRunningTasks()[i])) + } tk.MustQuery("select owner_hb_time from mysql.tidb_ttl_task").Check(testkit.Rows( now.Format(time.DateTime), now.Format(time.DateTime), @@ -490,7 +492,9 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) { )) // m2 can successfully update the heartbeat - require.NoError(t, m2.UpdateHeartBeat(context.Background(), se, now.Add(time.Hour))) + for i := 0; i < 4; i++ { + require.NoError(t, m2.UpdateHeartBeatForTask(context.Background(), se, now.Add(time.Hour), m2.GetRunningTasks()[i])) + } tk.MustQuery("select owner_hb_time from mysql.tidb_ttl_task").Check(testkit.Rows( now.Add(time.Hour).Format(time.DateTime), now.Add(time.Hour).Format(time.DateTime), @@ -522,3 +526,60 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) { `finished {"total_rows":0,"success_rows":0,"error_rows":0,"scan_task_err":""} task-manager-2`, )) } + +func TestHeartBeatErrorNotBlockOthers(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + pool := wrapPoolForTest(dom.SysSessionPool()) + defer pool.AssertNoSessionInUse(t) + waitAndStopTTLManager(t, dom) + tk := testkit.NewTestKit(t, store) + sessionFactory := sessionFactory(t, dom) + + tk.MustExec("set global tidb_ttl_running_tasks = 32") + + tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day") + testTable, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + for id := 0; id < 4; id++ { + sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW() - INTERVAL 1 DAY, NOW())", testTable.Meta().ID, id) + tk.MustExec(sql) + } + + se := sessionFactory() + now := se.Now() + + isc := cache.NewInfoSchemaCache(time.Minute) + require.NoError(t, isc.Update(se)) + m := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-1", store) + workers := []ttlworker.Worker{} + for j := 0; j < 4; j++ { + scanWorker := ttlworker.NewMockScanWorker(t) + scanWorker.Start() + workers = append(workers, scanWorker) + } + m.SetScanWorkers4Test(workers) + m.RescheduleTasks(se, now) + + // All tasks should be scheduled to m1 and running + tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-1'").Check(testkit.Rows("4")) + + // Mock the situation that the owner of task 0 has changed + tk.MustExec("update mysql.tidb_ttl_task set owner_id = 'task-manager-2' where scan_id = 0") + tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-1'").Check(testkit.Rows("3")) + + now = now.Add(time.Hour) + require.Error(t, m.UpdateHeartBeatForTask(context.Background(), se, now, m.GetRunningTasks()[0])) + for i := 1; i < 4; i++ { + require.NoError(t, m.UpdateHeartBeatForTask(context.Background(), se, now, m.GetRunningTasks()[i])) + } + + now = now.Add(time.Hour) + m.UpdateHeartBeat(context.Background(), se, now) + tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-1'").Check(testkit.Rows("3")) + tk.MustQuery("select scan_id, owner_hb_time from mysql.tidb_ttl_task").Sort().Check(testkit.Rows( + fmt.Sprintf("0 %s", now.Add(-2*time.Hour).Format(time.DateTime)), + fmt.Sprintf("1 %s", now.Format(time.DateTime)), + fmt.Sprintf("2 %s", now.Format(time.DateTime)), + fmt.Sprintf("3 %s", now.Format(time.DateTime)), + )) +} diff --git a/pkg/ttl/ttlworker/task_manager_test.go b/pkg/ttl/ttlworker/task_manager_test.go index bd2381b1e3eff..2b51aa12960e1 100644 --- a/pkg/ttl/ttlworker/task_manager_test.go +++ b/pkg/ttl/ttlworker/task_manager_test.go @@ -101,6 +101,16 @@ func (t *runningScanTask) SetResult(err error) { t.result = t.ttlScanTask.result(err) } +// UpdateHeartBeat is an exported version of updateHeartBeat +func (m *taskManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) { + m.updateHeartBeat(ctx, se, now) +} + +// UpdateHeartBeatForTask is an exported version of updateHeartBeatForTask +func (m *taskManager) UpdateHeartBeatForTask(ctx context.Context, se session.Session, now time.Time, task *runningScanTask) error { + return m.updateHeartBeatForTask(ctx, se, now, task) +} + // SetCancel sets the cancel function of the task func (t *runningScanTask) SetCancel(cancel func()) { t.cancel = cancel @@ -111,11 +121,6 @@ func (m *taskManager) CheckInvalidTask(se session.Session) { m.checkInvalidTask(se) } -// UpdateHeartBeat is an exported version of updateHeartBeat -func (m *taskManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) error { - return m.updateHeartBeat(ctx, se, now) -} - func TestResizeWorkers(t *testing.T) { tbl := newMockTTLTbl(t, "t1")