Skip to content

Commit

Permalink
fix conflict
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
  • Loading branch information
YangKeao committed Feb 11, 2025
1 parent 365d76a commit d8845a8
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 224 deletions.
20 changes: 1 addition & 19 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/ttl/client"
"github.com/pingcap/tidb/pkg/ttl/metrics"
"github.com/pingcap/tidb/pkg/ttl/session"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/timeutil"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -856,26 +857,7 @@ func (m *JobManager) appendLockedJob(id string, se session.Session, createTime t
// updateHeartBeat updates the heartbeat for all task with current instance as owner
func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
for _, job := range m.localJobs() {
<<<<<<< HEAD
if job.createTime.Add(ttlJobTimeout).Before(now) {
logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id))
summary, err := summarizeErr(errors.New("job is timeout"))
if err != nil {
logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err))
}
err = job.finish(se, now, summary)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
continue
}
m.removeJob(job)
}

sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id)
_, err := se.ExecuteSQL(ctx, sql, args...)
=======
err := m.updateHeartBeatForJob(ctx, se, now, job)
>>>>>>> 0392cdda767 (ttl: fix the issue that one task losing heartbeat will block other tasks (#57919))
if err != nil {
logutil.Logger(m.ctx).Warn("fail to update heartbeat for job", zap.Error(err), zap.String("jobID", job.id))
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1480,9 +1480,9 @@ func TestJobHeartBeatFailNotBlockOthers(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("CREATE TABLE t1 (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
tk.MustExec("CREATE TABLE t2 (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
testTable1, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t1"))
testTable1, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
require.NoError(t, err)
testTable2, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t2"))
testTable2, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t2"))
require.NoError(t, err)

ctx := context.Background()
Expand Down
22 changes: 2 additions & 20 deletions pkg/ttl/ttlworker/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/pkg/ttl/cache"
"github.com/pingcap/tidb/pkg/ttl/metrics"
"github.com/pingcap/tidb/pkg/ttl/session"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
Expand Down Expand Up @@ -435,28 +436,9 @@ func (m *taskManager) syncTaskFromTable(se session.Session, jobID string, scanID
// updateHeartBeat updates the heartbeat for all tasks with current instance as owner
func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
for _, task := range m.runningTasks {
<<<<<<< HEAD
state := &cache.TTLTaskState{
TotalRows: task.statistics.TotalRows.Load(),
SuccessRows: task.statistics.SuccessRows.Load(),
ErrorRows: task.statistics.ErrorRows.Load(),
}
if task.result != nil && task.result.err != nil {
state.ScanTaskErr = task.result.err.Error()
}

sql, args, err := updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state)
if err != nil {
return err
}
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
=======
err := m.updateHeartBeatForTask(ctx, se, now, task)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to update task heart beat", zap.Error(err), zap.String("jobID", task.JobID), zap.Int64("scanID", task.ScanID))
>>>>>>> 0392cdda767 (ttl: fix the issue that one task losing heartbeat will block other tasks (#57919))
}
}
}
Expand All @@ -472,7 +454,7 @@ func (m *taskManager) updateHeartBeatForTask(ctx context.Context, se session.Ses
}

intest.Assert(se.GetSessionVars().Location().String() == now.Location().String())
sql, args, err := updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state, m.id)
sql, args, err := updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state)
if err != nil {
return err
}
Expand Down
182 changes: 2 additions & 180 deletions pkg/ttl/ttlworker/task_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,195 +355,18 @@ func TestMeetTTLRunningTasks(t *testing.T) {
require.False(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(3, cache.TaskStatusWaiting))
require.True(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(3, cache.TaskStatusRunning))
}
<<<<<<< HEAD
=======

func TestShrinkScanWorkerTimeout(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
pool := wrapPoolForTest(dom.SysSessionPool())
defer pool.AssertNoSessionInUse(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)
sessionFactory := sessionFactory(t, store)

tk.MustExec("set global tidb_ttl_running_tasks = 32")

tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day")
testTable, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
for id := 0; id < 4; id++ {
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW() - INTERVAL 1 DAY, NOW())", testTable.Meta().ID, id)
tk.MustExec(sql)
}

se := sessionFactory()
now := se.Now()

isc := cache.NewInfoSchemaCache(time.Minute)
require.NoError(t, isc.Update(se))
m := ttlworker.NewTaskManager(context.Background(), pool, isc, "scan-manager-1", store)

startBlockNotifyCh := make(chan struct{})
blockCancelCh := make(chan struct{})

workers := []ttlworker.Worker{}
for j := 0; j < 4; j++ {
scanWorker := ttlworker.NewMockScanWorker(t)
if j == 0 {
scanWorker.SetCtx(func(ctx context.Context) context.Context {
return context.WithValue(ctx, ttlworker.TTLScanPostScanHookForTest{}, func() {
startBlockNotifyCh <- struct{}{}
<-blockCancelCh
})
})
}
scanWorker.Start()
workers = append(workers, scanWorker)
}

m.SetScanWorkers4Test(workers)

m.RescheduleTasks(se, now)
require.Len(t, m.GetRunningTasks(), 4)
tk.MustQuery("SELECT count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("4"))
<-startBlockNotifyCh

// shrink scan workers, one of them will timeout
require.Error(t, m.ResizeScanWorkers(0))
require.Len(t, m.GetScanWorkers(), 0)

// the canceled 3 tasks are still running, but they have results, so after `CheckFinishedTask`, it should be finished
tk.MustQuery("SELECT count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("4"))
m.CheckFinishedTask(se, now)
require.Len(t, m.GetRunningTasks(), 0)
// now, the task should be finished
tk.MustQuery("SELECT count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("0"))
// the first task will be finished with "timeout to cancel scan task"
// other tasks will finish with table not found because we didn't mock the table in this test.
tk.MustQuery("SELECT scan_id, json_extract(state, '$.scan_task_err') from mysql.tidb_ttl_task").Sort().Check(testkit.Rows(
"0 \"timeout to cancel scan task\"",
"1 \"table 'test.t' meta changed, should abort current job: [schema:1146]Table 'test.t' doesn't exist\"",
"2 \"table 'test.t' meta changed, should abort current job: [schema:1146]Table 'test.t' doesn't exist\"",
"3 \"table 'test.t' meta changed, should abort current job: [schema:1146]Table 'test.t' doesn't exist\"",
))

require.NoError(t, m.ResizeDelWorkers(0))
close(blockCancelCh)
}

func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
pool := wrapPoolForTest(dom.SysSessionPool())
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)
sessionFactory := sessionFactory(t, store)
se := sessionFactory()

tk.MustExec("set global tidb_ttl_running_tasks = 128")
defer tk.MustExec("set global tidb_ttl_running_tasks = -1")

tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day")
table, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
// 4 tasks are inserted into the table
for i := 0; i < 4; i++ {
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, i)
tk.MustExec(sql)
}
isc := cache.NewInfoSchemaCache(time.Second)
require.NoError(t, isc.Update(se))

workers := []ttlworker.Worker{}
for j := 0; j < 8; j++ {
scanWorker := ttlworker.NewMockScanWorker(t)
scanWorker.Start()
workers = append(workers, scanWorker)
}

now := se.Now()
m1 := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-1", store)
m1.SetScanWorkers4Test(workers[0:4])
m1.RescheduleTasks(se, now)
m2 := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-2", store)
m2.SetScanWorkers4Test(workers[4:])

// All tasks should be scheduled to m1 and running
tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-1'").Check(testkit.Rows("4"))

var cancelCount atomic.Uint32
for i := 0; i < 4; i++ {
task := m1.GetRunningTasks()[i]
task.SetCancel(func() {
cancelCount.Add(1)
})
}

// After a period of time, the tasks lost heartbeat and will be re-asisgned to m2
now = now.Add(time.Hour)
m2.RescheduleTasks(se, now)

// All tasks should be scheduled to m2 and running
tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-2'").Check(testkit.Rows("4"))

// Then m1 cannot update the heartbeat of its task
for i := 0; i < 4; i++ {
require.Error(t, m1.UpdateHeartBeatForTask(context.Background(), se, now.Add(time.Hour), m1.GetRunningTasks()[i]))
}
tk.MustQuery("select owner_hb_time from mysql.tidb_ttl_task").Check(testkit.Rows(
now.Format(time.DateTime),
now.Format(time.DateTime),
now.Format(time.DateTime),
now.Format(time.DateTime),
))

// m2 can successfully update the heartbeat
for i := 0; i < 4; i++ {
require.NoError(t, m2.UpdateHeartBeatForTask(context.Background(), se, now.Add(time.Hour), m2.GetRunningTasks()[i]))
}
tk.MustQuery("select owner_hb_time from mysql.tidb_ttl_task").Check(testkit.Rows(
now.Add(time.Hour).Format(time.DateTime),
now.Add(time.Hour).Format(time.DateTime),
now.Add(time.Hour).Format(time.DateTime),
now.Add(time.Hour).Format(time.DateTime),
))

// Although m1 cannot finish the task. It'll also try to cancel the task.
for _, task := range m1.GetRunningTasks() {
task.SetResult(nil)
}
m1.CheckFinishedTask(se, now)
tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("4"))
require.Equal(t, uint32(4), cancelCount.Load())

// Then the tasks in m1 should be cancelled again in `CheckInvalidTask`.
m1.CheckInvalidTask(se)
require.Equal(t, uint32(8), cancelCount.Load())

// m2 can finish the task
for _, task := range m2.GetRunningTasks() {
task.SetResult(nil)
}
m2.CheckFinishedTask(se, now)
tk.MustQuery("select status, state, owner_id from mysql.tidb_ttl_task").Sort().Check(testkit.Rows(
`finished {"total_rows":0,"success_rows":0,"error_rows":0,"scan_task_err":""} task-manager-2`,
`finished {"total_rows":0,"success_rows":0,"error_rows":0,"scan_task_err":""} task-manager-2`,
`finished {"total_rows":0,"success_rows":0,"error_rows":0,"scan_task_err":""} task-manager-2`,
`finished {"total_rows":0,"success_rows":0,"error_rows":0,"scan_task_err":""} task-manager-2`,
))
}

func TestHeartBeatErrorNotBlockOthers(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
pool := wrapPoolForTest(dom.SysSessionPool())
defer pool.AssertNoSessionInUse(t)
pool := dom.SysSessionPool()
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)
sessionFactory := sessionFactory(t, store)

tk.MustExec("set global tidb_ttl_running_tasks = 32")

tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day")
testTable, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
testTable, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
for id := 0; id < 4; id++ {
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW() - INTERVAL 1 DAY, NOW())", testTable.Meta().ID, id)
Expand Down Expand Up @@ -588,4 +411,3 @@ func TestHeartBeatErrorNotBlockOthers(t *testing.T) {
fmt.Sprintf("3 %s", now.Format(time.DateTime)),
))
}
>>>>>>> 0392cdda767 (ttl: fix the issue that one task losing heartbeat will block other tasks (#57919))
3 changes: 0 additions & 3 deletions pkg/ttl/ttlworker/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ func (t *runningScanTask) SetResult(err error) {
t.result = t.ttlScanTask.result(err)
}

<<<<<<< HEAD
=======
// SetCancel sets the cancel function of the task
func (t *runningScanTask) SetCancel(cancel func()) {
t.cancel = cancel
Expand All @@ -108,7 +106,6 @@ func (m *taskManager) UpdateHeartBeatForTask(ctx context.Context, se session.Ses
return m.updateHeartBeatForTask(ctx, se, now, task)
}

>>>>>>> 0392cdda767 (ttl: fix the issue that one task losing heartbeat will block other tasks (#57919))
func TestResizeWorkers(t *testing.T) {
tbl := newMockTTLTbl(t, "t1")

Expand Down

0 comments on commit d8845a8

Please sign in to comment.