Skip to content

Commit

Permalink
ttl: fix the issue that one task losing heartbeat will block other ta…
Browse files Browse the repository at this point in the history
…sks (#57919) (#58710)

close #57915
  • Loading branch information
ti-chi-bot authored Jan 7, 2025
1 parent c470fb2 commit ea9c4ab
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 56 deletions.
57 changes: 32 additions & 25 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,7 @@ func (m *JobManager) jobLoop() error {
// Job Schedule loop:
case <-updateJobHeartBeatTicker:
updateHeartBeatCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout)
err = m.updateHeartBeat(updateHeartBeatCtx, se, now)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to update job heart beat", zap.Error(err))
}
m.updateHeartBeat(updateHeartBeatCtx, se, now)
cancel()
case <-jobCheckTicker:
m.checkFinishedJob(se)
Expand Down Expand Up @@ -277,10 +274,7 @@ func (m *JobManager) jobLoop() error {
m.taskManager.resizeWorkersWithSysVar()
case <-updateTaskHeartBeatTicker:
updateHeartBeatCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout)
err = m.taskManager.updateHeartBeat(updateHeartBeatCtx, se, now)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to update task heart beat", zap.Error(err))
}
m.taskManager.updateHeartBeat(updateHeartBeatCtx, se, now)
cancel()
case <-checkScanTaskFinishedTicker:
if m.taskManager.handleScanFinishedTask() {
Expand Down Expand Up @@ -897,29 +891,42 @@ func (m *JobManager) appendLockedJob(id string, se session.Session, createTime t
}

// updateHeartBeat updates the heartbeat for all task with current instance as owner
func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
for _, job := range m.localJobs() {
if job.createTime.Add(ttlJobTimeout).Before(now) {
logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id))
summary, err := summarizeErr(errors.New("job is timeout"))
if err != nil {
logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err))
}
err = job.finish(se, now, summary)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
continue
}
m.removeJob(job)
err := m.updateHeartBeatForJob(ctx, se, now, job)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to update heartbeat for job", zap.Error(err), zap.String("jobID", job.id))
}
}
}

intest.Assert(se.GetSessionVars().TimeZone.String() == now.Location().String())
sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id)
_, err := se.ExecuteSQL(ctx, sql, args...)
func (m *JobManager) updateHeartBeatForJob(ctx context.Context, se session.Session, now time.Time, job *ttlJob) error {
if job.createTime.Add(ttlJobTimeout).Before(now) {
logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id))
summary, err := summarizeErr(errors.New("job is timeout"))
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
return errors.Wrapf(err, "fail to summarize job")
}
err = job.finish(se, now, summary)
if err != nil {
return errors.Wrapf(err, "fail to finish job")
}
m.removeJob(job)
return nil
}

intest.Assert(se.GetSessionVars().TimeZone.String() == now.Location().String())
sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id)
_, err := se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}

if se.GetSessionVars().StmtCtx.AffectedRows() != 1 {
return errors.Errorf("fail to update job heartbeat, maybe the owner is not myself (%s), affected rows: %d",
m.id, se.GetSessionVars().StmtCtx.AffectedRows())
}

return nil
}

Expand Down
45 changes: 44 additions & 1 deletion pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ func TestJobTimeout(t *testing.T) {
require.Equal(t, now.Format(time.RFC3339), newTableStatus.CurrentJobStatusUpdateTime.Format(time.RFC3339))

// the timeout will be checked while updating heartbeat
require.NoError(t, m2.UpdateHeartBeat(ctx, se, now.Add(7*time.Hour)))
require.NoError(t, m2.UpdateHeartBeatForJob(ctx, se, now.Add(7*time.Hour), m2.RunningJobs()[0]))
tk.MustQuery("select last_job_summary->>'$.scan_task_err' from mysql.tidb_ttl_table_status").Check(testkit.Rows("job is timeout"))
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
}
Expand Down Expand Up @@ -1617,3 +1617,46 @@ func TestTimerJobAfterDropTable(t *testing.T) {
require.NotNil(t, job)
require.True(t, job.Finished)
}

func TestJobHeartBeatFailNotBlockOthers(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, dom)
se := sessionFactory()

tk.MustExec("use test")
tk.MustExec("CREATE TABLE t1 (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
tk.MustExec("CREATE TABLE t2 (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
testTable1, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t1"))
require.NoError(t, err)
testTable2, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t2"))
require.NoError(t, err)

ctx := context.Background()
m := ttlworker.NewJobManager("test-ttl-job-manager-1", nil, store, nil, nil)

now := se.Now()
// acquire two jobs
require.NoError(t, m.InfoSchemaCache().Update(se))
require.NoError(t, m.TableStatusCache().Update(ctx, se))
_, err = m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable1.Meta().ID], now, uuid.NewString(), false)
require.NoError(t, err)
_, err = m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable2.Meta().ID], now, uuid.NewString(), false)
require.NoError(t, err)
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running", "running"))

// assign the first job to another manager
tk.MustExec("update mysql.tidb_ttl_table_status set current_job_owner_id = 'test-ttl-job-manager-2' where table_id = ?", testTable1.Meta().ID)
// the heartbeat of the first job will fail, but the second one will still success
now = now.Add(time.Hour)
require.Error(t, m.UpdateHeartBeatForJob(context.Background(), se, now, m.RunningJobs()[0]))
require.NoError(t, m.UpdateHeartBeatForJob(context.Background(), se, now, m.RunningJobs()[1]))

now = now.Add(time.Hour)
m.UpdateHeartBeat(ctx, se, now)
tk.MustQuery("select table_id, current_job_owner_hb_time from mysql.tidb_ttl_table_status").Sort().Check(testkit.Rows(
fmt.Sprintf("%d %s", testTable1.Meta().ID, now.Add(-2*time.Hour).Format(time.DateTime)),
fmt.Sprintf("%d %s", testTable2.Meta().ID, now.Format(time.DateTime))))
}
8 changes: 6 additions & 2 deletions pkg/ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,12 @@ func (m *JobManager) TaskManager() *taskManager {
}

// UpdateHeartBeat is an exported version of updateHeartBeat for test
func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
return m.updateHeartBeat(ctx, se, now)
func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
m.updateHeartBeat(ctx, se, now)
}

func (m *JobManager) UpdateHeartBeatForJob(ctx context.Context, se session.Session, now time.Time, job *ttlJob) error {
return m.updateHeartBeatForJob(ctx, se, now, job)
}

// ReportMetrics is an exported version of reportMetrics
Expand Down
50 changes: 29 additions & 21 deletions pkg/ttl/ttlworker/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,32 +439,40 @@ func (m *taskManager) syncTaskFromTable(se session.Session, jobID string, scanID
}

// updateHeartBeat updates the heartbeat for all tasks with current instance as owner
func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
for _, task := range m.runningTasks {
state := &cache.TTLTaskState{
TotalRows: task.statistics.TotalRows.Load(),
SuccessRows: task.statistics.SuccessRows.Load(),
ErrorRows: task.statistics.ErrorRows.Load(),
}
if task.result != nil && task.result.err != nil {
state.ScanTaskErr = task.result.err.Error()
}

intest.Assert(se.GetSessionVars().Location().String() == now.Location().String())
sql, args, err := updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state, m.id)
err := m.updateHeartBeatForTask(ctx, se, now, task)
if err != nil {
return err
}
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
logutil.Logger(m.ctx).Warn("fail to update task heart beat", zap.Error(err), zap.String("jobID", task.JobID), zap.Int64("scanID", task.ScanID))
}
}
}

if se.GetSessionVars().StmtCtx.AffectedRows() != 1 {
return errors.Errorf("fail to update task status, maybe the owner is not myself (%s), affected rows: %d",
m.id, se.GetSessionVars().StmtCtx.AffectedRows())
}
func (m *taskManager) updateHeartBeatForTask(ctx context.Context, se session.Session, now time.Time, task *runningScanTask) error {
state := &cache.TTLTaskState{
TotalRows: task.statistics.TotalRows.Load(),
SuccessRows: task.statistics.SuccessRows.Load(),
ErrorRows: task.statistics.ErrorRows.Load(),
}
if task.result != nil && task.result.err != nil {
state.ScanTaskErr = task.result.err.Error()
}

intest.Assert(se.GetSessionVars().Location().String() == now.Location().String())
sql, args, err := updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state, m.id)
if err != nil {
return err
}
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}

if se.GetSessionVars().StmtCtx.AffectedRows() != 1 {
return errors.Errorf("fail to update task heartbeat, maybe the owner is not myself (%s), affected rows: %d",
m.id, se.GetSessionVars().StmtCtx.AffectedRows())
}

return nil
}

Expand Down
65 changes: 63 additions & 2 deletions pkg/ttl/ttlworker/task_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,9 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) {
tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-2'").Check(testkit.Rows("4"))

// Then m1 cannot update the heartbeat of its task
require.Error(t, m1.UpdateHeartBeat(context.Background(), se, now.Add(time.Hour)))
for i := 0; i < 4; i++ {
require.Error(t, m1.UpdateHeartBeatForTask(context.Background(), se, now.Add(time.Hour), m1.GetRunningTasks()[i]))
}
tk.MustQuery("select owner_hb_time from mysql.tidb_ttl_task").Check(testkit.Rows(
now.Format(time.DateTime),
now.Format(time.DateTime),
Expand All @@ -490,7 +492,9 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) {
))

// m2 can successfully update the heartbeat
require.NoError(t, m2.UpdateHeartBeat(context.Background(), se, now.Add(time.Hour)))
for i := 0; i < 4; i++ {
require.NoError(t, m2.UpdateHeartBeatForTask(context.Background(), se, now.Add(time.Hour), m2.GetRunningTasks()[i]))
}
tk.MustQuery("select owner_hb_time from mysql.tidb_ttl_task").Check(testkit.Rows(
now.Add(time.Hour).Format(time.DateTime),
now.Add(time.Hour).Format(time.DateTime),
Expand Down Expand Up @@ -522,3 +526,60 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) {
`finished {"total_rows":0,"success_rows":0,"error_rows":0,"scan_task_err":""} task-manager-2`,
))
}

func TestHeartBeatErrorNotBlockOthers(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
pool := wrapPoolForTest(dom.SysSessionPool())
defer pool.AssertNoSessionInUse(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)
sessionFactory := sessionFactory(t, dom)

tk.MustExec("set global tidb_ttl_running_tasks = 32")

tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day")
testTable, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
for id := 0; id < 4; id++ {
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW() - INTERVAL 1 DAY, NOW())", testTable.Meta().ID, id)
tk.MustExec(sql)
}

se := sessionFactory()
now := se.Now()

isc := cache.NewInfoSchemaCache(time.Minute)
require.NoError(t, isc.Update(se))
m := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-1", store)
workers := []ttlworker.Worker{}
for j := 0; j < 4; j++ {
scanWorker := ttlworker.NewMockScanWorker(t)
scanWorker.Start()
workers = append(workers, scanWorker)
}
m.SetScanWorkers4Test(workers)
m.RescheduleTasks(se, now)

// All tasks should be scheduled to m1 and running
tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-1'").Check(testkit.Rows("4"))

// Mock the situation that the owner of task 0 has changed
tk.MustExec("update mysql.tidb_ttl_task set owner_id = 'task-manager-2' where scan_id = 0")
tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-1'").Check(testkit.Rows("3"))

now = now.Add(time.Hour)
require.Error(t, m.UpdateHeartBeatForTask(context.Background(), se, now, m.GetRunningTasks()[0]))
for i := 1; i < 4; i++ {
require.NoError(t, m.UpdateHeartBeatForTask(context.Background(), se, now, m.GetRunningTasks()[i]))
}

now = now.Add(time.Hour)
m.UpdateHeartBeat(context.Background(), se, now)
tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-1'").Check(testkit.Rows("3"))
tk.MustQuery("select scan_id, owner_hb_time from mysql.tidb_ttl_task").Sort().Check(testkit.Rows(
fmt.Sprintf("0 %s", now.Add(-2*time.Hour).Format(time.DateTime)),
fmt.Sprintf("1 %s", now.Format(time.DateTime)),
fmt.Sprintf("2 %s", now.Format(time.DateTime)),
fmt.Sprintf("3 %s", now.Format(time.DateTime)),
))
}
15 changes: 10 additions & 5 deletions pkg/ttl/ttlworker/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ func (t *runningScanTask) SetResult(err error) {
t.result = t.ttlScanTask.result(err)
}

// UpdateHeartBeat is an exported version of updateHeartBeat
func (m *taskManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
m.updateHeartBeat(ctx, se, now)
}

// UpdateHeartBeatForTask is an exported version of updateHeartBeatForTask
func (m *taskManager) UpdateHeartBeatForTask(ctx context.Context, se session.Session, now time.Time, task *runningScanTask) error {
return m.updateHeartBeatForTask(ctx, se, now, task)
}

// SetCancel sets the cancel function of the task
func (t *runningScanTask) SetCancel(cancel func()) {
t.cancel = cancel
Expand All @@ -111,11 +121,6 @@ func (m *taskManager) CheckInvalidTask(se session.Session) {
m.checkInvalidTask(se)
}

// UpdateHeartBeat is an exported version of updateHeartBeat
func (m *taskManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
return m.updateHeartBeat(ctx, se, now)
}

func TestResizeWorkers(t *testing.T) {
tbl := newMockTTLTbl(t, "t1")

Expand Down

0 comments on commit ea9c4ab

Please sign in to comment.