diff --git a/pkg/ttl/ttlworker/job_manager.go b/pkg/ttl/ttlworker/job_manager.go index ac41ea138f1fb..2a835f15f0a05 100644 --- a/pkg/ttl/ttlworker/job_manager.go +++ b/pkg/ttl/ttlworker/job_manager.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/pkg/ttl/client" "github.com/pingcap/tidb/pkg/ttl/metrics" "github.com/pingcap/tidb/pkg/ttl/session" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/timeutil" clientv3 "go.etcd.io/etcd/client/v3" @@ -856,26 +857,7 @@ func (m *JobManager) appendLockedJob(id string, se session.Session, createTime t // updateHeartBeat updates the heartbeat for all task with current instance as owner func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) { for _, job := range m.localJobs() { -<<<<<<< HEAD - if job.createTime.Add(ttlJobTimeout).Before(now) { - logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id)) - summary, err := summarizeErr(errors.New("job is timeout")) - if err != nil { - logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err)) - } - err = job.finish(se, now, summary) - if err != nil { - logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err)) - continue - } - m.removeJob(job) - } - - sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id) - _, err := se.ExecuteSQL(ctx, sql, args...) -======= err := m.updateHeartBeatForJob(ctx, se, now, job) ->>>>>>> 0392cdda767 (ttl: fix the issue that one task losing heartbeat will block other tasks (#57919)) if err != nil { logutil.Logger(m.ctx).Warn("fail to update heartbeat for job", zap.Error(err), zap.String("jobID", job.id)) } diff --git a/pkg/ttl/ttlworker/job_manager_integration_test.go b/pkg/ttl/ttlworker/job_manager_integration_test.go index e0a70952b9fba..2e75c903b6f2a 100644 --- a/pkg/ttl/ttlworker/job_manager_integration_test.go +++ b/pkg/ttl/ttlworker/job_manager_integration_test.go @@ -1480,9 +1480,9 @@ func TestJobHeartBeatFailNotBlockOthers(t *testing.T) { tk.MustExec("use test") tk.MustExec("CREATE TABLE t1 (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR") tk.MustExec("CREATE TABLE t2 (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR") - testTable1, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t1")) + testTable1, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) require.NoError(t, err) - testTable2, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t2")) + testTable2, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t2")) require.NoError(t, err) ctx := context.Background() diff --git a/pkg/ttl/ttlworker/task_manager.go b/pkg/ttl/ttlworker/task_manager.go index 0a07f540fb1c7..4f14e11108a02 100644 --- a/pkg/ttl/ttlworker/task_manager.go +++ b/pkg/ttl/ttlworker/task_manager.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/pkg/ttl/cache" "github.com/pingcap/tidb/pkg/ttl/metrics" "github.com/pingcap/tidb/pkg/ttl/session" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" @@ -435,28 +436,9 @@ func (m *taskManager) syncTaskFromTable(se session.Session, jobID string, scanID // updateHeartBeat updates the heartbeat for all tasks with current instance as owner func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) { for _, task := range m.runningTasks { -<<<<<<< HEAD - state := &cache.TTLTaskState{ - TotalRows: task.statistics.TotalRows.Load(), - SuccessRows: task.statistics.SuccessRows.Load(), - ErrorRows: task.statistics.ErrorRows.Load(), - } - if task.result != nil && task.result.err != nil { - state.ScanTaskErr = task.result.err.Error() - } - - sql, args, err := updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state) - if err != nil { - return err - } - _, err = se.ExecuteSQL(ctx, sql, args...) - if err != nil { - return errors.Wrapf(err, "execute sql: %s", sql) -======= err := m.updateHeartBeatForTask(ctx, se, now, task) if err != nil { logutil.Logger(m.ctx).Warn("fail to update task heart beat", zap.Error(err), zap.String("jobID", task.JobID), zap.Int64("scanID", task.ScanID)) ->>>>>>> 0392cdda767 (ttl: fix the issue that one task losing heartbeat will block other tasks (#57919)) } } } @@ -472,7 +454,7 @@ func (m *taskManager) updateHeartBeatForTask(ctx context.Context, se session.Ses } intest.Assert(se.GetSessionVars().Location().String() == now.Location().String()) - sql, args, err := updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state, m.id) + sql, args, err := updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state) if err != nil { return err } diff --git a/pkg/ttl/ttlworker/task_manager_integration_test.go b/pkg/ttl/ttlworker/task_manager_integration_test.go index f03b23b243a0c..4d997b91cfb90 100644 --- a/pkg/ttl/ttlworker/task_manager_integration_test.go +++ b/pkg/ttl/ttlworker/task_manager_integration_test.go @@ -355,187 +355,10 @@ func TestMeetTTLRunningTasks(t *testing.T) { require.False(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(3, cache.TaskStatusWaiting)) require.True(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(3, cache.TaskStatusRunning)) } -<<<<<<< HEAD -======= - -func TestShrinkScanWorkerTimeout(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - pool := wrapPoolForTest(dom.SysSessionPool()) - defer pool.AssertNoSessionInUse(t) - waitAndStopTTLManager(t, dom) - tk := testkit.NewTestKit(t, store) - sessionFactory := sessionFactory(t, store) - - tk.MustExec("set global tidb_ttl_running_tasks = 32") - - tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day") - testTable, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) - require.NoError(t, err) - for id := 0; id < 4; id++ { - sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW() - INTERVAL 1 DAY, NOW())", testTable.Meta().ID, id) - tk.MustExec(sql) - } - - se := sessionFactory() - now := se.Now() - - isc := cache.NewInfoSchemaCache(time.Minute) - require.NoError(t, isc.Update(se)) - m := ttlworker.NewTaskManager(context.Background(), pool, isc, "scan-manager-1", store) - - startBlockNotifyCh := make(chan struct{}) - blockCancelCh := make(chan struct{}) - - workers := []ttlworker.Worker{} - for j := 0; j < 4; j++ { - scanWorker := ttlworker.NewMockScanWorker(t) - if j == 0 { - scanWorker.SetCtx(func(ctx context.Context) context.Context { - return context.WithValue(ctx, ttlworker.TTLScanPostScanHookForTest{}, func() { - startBlockNotifyCh <- struct{}{} - <-blockCancelCh - }) - }) - } - scanWorker.Start() - workers = append(workers, scanWorker) - } - - m.SetScanWorkers4Test(workers) - - m.RescheduleTasks(se, now) - require.Len(t, m.GetRunningTasks(), 4) - tk.MustQuery("SELECT count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("4")) - <-startBlockNotifyCh - - // shrink scan workers, one of them will timeout - require.Error(t, m.ResizeScanWorkers(0)) - require.Len(t, m.GetScanWorkers(), 0) - - // the canceled 3 tasks are still running, but they have results, so after `CheckFinishedTask`, it should be finished - tk.MustQuery("SELECT count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("4")) - m.CheckFinishedTask(se, now) - require.Len(t, m.GetRunningTasks(), 0) - // now, the task should be finished - tk.MustQuery("SELECT count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("0")) - // the first task will be finished with "timeout to cancel scan task" - // other tasks will finish with table not found because we didn't mock the table in this test. - tk.MustQuery("SELECT scan_id, json_extract(state, '$.scan_task_err') from mysql.tidb_ttl_task").Sort().Check(testkit.Rows( - "0 \"timeout to cancel scan task\"", - "1 \"table 'test.t' meta changed, should abort current job: [schema:1146]Table 'test.t' doesn't exist\"", - "2 \"table 'test.t' meta changed, should abort current job: [schema:1146]Table 'test.t' doesn't exist\"", - "3 \"table 'test.t' meta changed, should abort current job: [schema:1146]Table 'test.t' doesn't exist\"", - )) - - require.NoError(t, m.ResizeDelWorkers(0)) - close(blockCancelCh) -} - -func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - pool := wrapPoolForTest(dom.SysSessionPool()) - waitAndStopTTLManager(t, dom) - tk := testkit.NewTestKit(t, store) - sessionFactory := sessionFactory(t, store) - se := sessionFactory() - - tk.MustExec("set global tidb_ttl_running_tasks = 128") - defer tk.MustExec("set global tidb_ttl_running_tasks = -1") - - tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day") - table, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) - require.NoError(t, err) - // 4 tasks are inserted into the table - for i := 0; i < 4; i++ { - sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, i) - tk.MustExec(sql) - } - isc := cache.NewInfoSchemaCache(time.Second) - require.NoError(t, isc.Update(se)) - - workers := []ttlworker.Worker{} - for j := 0; j < 8; j++ { - scanWorker := ttlworker.NewMockScanWorker(t) - scanWorker.Start() - workers = append(workers, scanWorker) - } - - now := se.Now() - m1 := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-1", store) - m1.SetScanWorkers4Test(workers[0:4]) - m1.RescheduleTasks(se, now) - m2 := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-2", store) - m2.SetScanWorkers4Test(workers[4:]) - - // All tasks should be scheduled to m1 and running - tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-1'").Check(testkit.Rows("4")) - - var cancelCount atomic.Uint32 - for i := 0; i < 4; i++ { - task := m1.GetRunningTasks()[i] - task.SetCancel(func() { - cancelCount.Add(1) - }) - } - - // After a period of time, the tasks lost heartbeat and will be re-asisgned to m2 - now = now.Add(time.Hour) - m2.RescheduleTasks(se, now) - - // All tasks should be scheduled to m2 and running - tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-2'").Check(testkit.Rows("4")) - - // Then m1 cannot update the heartbeat of its task - for i := 0; i < 4; i++ { - require.Error(t, m1.UpdateHeartBeatForTask(context.Background(), se, now.Add(time.Hour), m1.GetRunningTasks()[i])) - } - tk.MustQuery("select owner_hb_time from mysql.tidb_ttl_task").Check(testkit.Rows( - now.Format(time.DateTime), - now.Format(time.DateTime), - now.Format(time.DateTime), - now.Format(time.DateTime), - )) - - // m2 can successfully update the heartbeat - for i := 0; i < 4; i++ { - require.NoError(t, m2.UpdateHeartBeatForTask(context.Background(), se, now.Add(time.Hour), m2.GetRunningTasks()[i])) - } - tk.MustQuery("select owner_hb_time from mysql.tidb_ttl_task").Check(testkit.Rows( - now.Add(time.Hour).Format(time.DateTime), - now.Add(time.Hour).Format(time.DateTime), - now.Add(time.Hour).Format(time.DateTime), - now.Add(time.Hour).Format(time.DateTime), - )) - - // Although m1 cannot finish the task. It'll also try to cancel the task. - for _, task := range m1.GetRunningTasks() { - task.SetResult(nil) - } - m1.CheckFinishedTask(se, now) - tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("4")) - require.Equal(t, uint32(4), cancelCount.Load()) - - // Then the tasks in m1 should be cancelled again in `CheckInvalidTask`. - m1.CheckInvalidTask(se) - require.Equal(t, uint32(8), cancelCount.Load()) - - // m2 can finish the task - for _, task := range m2.GetRunningTasks() { - task.SetResult(nil) - } - m2.CheckFinishedTask(se, now) - tk.MustQuery("select status, state, owner_id from mysql.tidb_ttl_task").Sort().Check(testkit.Rows( - `finished {"total_rows":0,"success_rows":0,"error_rows":0,"scan_task_err":""} task-manager-2`, - `finished {"total_rows":0,"success_rows":0,"error_rows":0,"scan_task_err":""} task-manager-2`, - `finished {"total_rows":0,"success_rows":0,"error_rows":0,"scan_task_err":""} task-manager-2`, - `finished {"total_rows":0,"success_rows":0,"error_rows":0,"scan_task_err":""} task-manager-2`, - )) -} func TestHeartBeatErrorNotBlockOthers(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - pool := wrapPoolForTest(dom.SysSessionPool()) - defer pool.AssertNoSessionInUse(t) + pool := dom.SysSessionPool() waitAndStopTTLManager(t, dom) tk := testkit.NewTestKit(t, store) sessionFactory := sessionFactory(t, store) @@ -543,7 +366,7 @@ func TestHeartBeatErrorNotBlockOthers(t *testing.T) { tk.MustExec("set global tidb_ttl_running_tasks = 32") tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day") - testTable, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + testTable, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) for id := 0; id < 4; id++ { sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW() - INTERVAL 1 DAY, NOW())", testTable.Meta().ID, id) @@ -588,4 +411,3 @@ func TestHeartBeatErrorNotBlockOthers(t *testing.T) { fmt.Sprintf("3 %s", now.Format(time.DateTime)), )) } ->>>>>>> 0392cdda767 (ttl: fix the issue that one task losing heartbeat will block other tasks (#57919)) diff --git a/pkg/ttl/ttlworker/task_manager_test.go b/pkg/ttl/ttlworker/task_manager_test.go index 49a3c75b95447..f422f87748bf3 100644 --- a/pkg/ttl/ttlworker/task_manager_test.go +++ b/pkg/ttl/ttlworker/task_manager_test.go @@ -86,8 +86,6 @@ func (t *runningScanTask) SetResult(err error) { t.result = t.ttlScanTask.result(err) } -<<<<<<< HEAD -======= // SetCancel sets the cancel function of the task func (t *runningScanTask) SetCancel(cancel func()) { t.cancel = cancel @@ -108,7 +106,6 @@ func (m *taskManager) UpdateHeartBeatForTask(ctx context.Context, se session.Ses return m.updateHeartBeatForTask(ctx, se, now, task) } ->>>>>>> 0392cdda767 (ttl: fix the issue that one task losing heartbeat will block other tasks (#57919)) func TestResizeWorkers(t *testing.T) { tbl := newMockTTLTbl(t, "t1")