diff --git a/executor/infoschema_cluster_table_test.go b/executor/infoschema_cluster_table_test.go index 5729af8de9dd1..be2f04cb5c6ac 100644 --- a/executor/infoschema_cluster_table_test.go +++ b/executor/infoschema_cluster_table_test.go @@ -290,7 +290,7 @@ func TestTableStorageStats(t *testing.T) { "test 2", )) rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows() - result := 44 + result := 45 require.Len(t, rows, result) // More tests about the privileges. diff --git a/expression/expression.go b/expression/expression.go index eeb407a983bee..6b6ce4c26e0a0 100644 --- a/expression/expression.go +++ b/expression/expression.go @@ -1576,6 +1576,8 @@ func Args2Expressions4Test(args ...interface{}) []Expression { ft = types.NewFieldType(mysql.TypeVarString) case types.KindMysqlTime: ft = types.NewFieldType(mysql.TypeTimestamp) + case types.KindBytes: + ft = types.NewFieldType(mysql.TypeBlob) default: exprs[i] = nil continue diff --git a/session/bootstrap.go b/session/bootstrap.go index 56509ffafebfd..74eecb28a68a4 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -479,7 +479,7 @@ const ( PRIMARY KEY (Host,User,Password_timestamp ) ) COMMENT='Password history for user accounts' ` - // CreateTTLTableStatus is a table about TTL task schedule + // CreateTTLTableStatus is a table about TTL job schedule CreateTTLTableStatus = `CREATE TABLE IF NOT EXISTS mysql.tidb_ttl_table_status ( table_id bigint(64) PRIMARY KEY, parent_table_id bigint(64), @@ -498,6 +498,24 @@ const ( current_job_state text DEFAULT NULL, current_job_status varchar(64) DEFAULT NULL, current_job_status_update_time timestamp NULL DEFAULT NULL);` + + // CreateTTLTask is a table about parallel ttl tasks + CreateTTLTask = `CREATE TABLE IF NOT EXISTS mysql.tidb_ttl_task ( + job_id varchar(64) NOT NULL, + table_id bigint(64) NOT NULL, + scan_id int NOT NULL, + scan_range_start BLOB, + scan_range_end BLOB, + expire_time timestamp NOT NULL, + owner_id varchar(64) DEFAULT NULL, + owner_addr varchar(64) DEFAULT NULL, + owner_hb_time timestamp DEFAULT NULL, + status varchar(64) DEFAULT 'waiting', + status_update_time timestamp NULL DEFAULT NULL, + state text, + created_time timestamp NOT NULL, + primary key(job_id, scan_id), + key(created_time));` ) // bootstrap initiates system DB for a store. @@ -739,11 +757,13 @@ const ( version109 = 109 // version110 sets tidb_enable_gc_aware_memory_track to off when a cluster upgrades from some version lower than v6.5.0. version110 = 110 + // version111 adds the table tidb_ttl_task + version111 = 111 ) // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version -var currentBootstrapVersion int64 = version109 +var currentBootstrapVersion int64 = version111 // DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it. var internalSQLTimeout = owner.ManagerSessionTTL + 15 @@ -861,6 +881,7 @@ var ( upgradeToVer108, upgradeToVer109, upgradeToVer110, + upgradeToVer111, } ) @@ -2213,6 +2234,13 @@ func upgradeToVer110(s Session, ver int64) { mysql.SystemDB, mysql.GlobalVariablesTable, variable.TiDBEnableGCAwareMemoryTrack, 0) } +func upgradeToVer111(s Session, ver int64) { + if ver >= version111 { + return + } + doReentrantDDL(s, CreateTTLTask) +} + func writeOOMAction(s Session) { comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`, @@ -2319,6 +2347,8 @@ func doDDLWorks(s Session) { mustExecute(s, CreateStatsTableLocked) // Create tidb_ttl_table_status table mustExecute(s, CreateTTLTableStatus) + // Create tidb_ttl_task table + mustExecute(s, CreateTTLTask) } // doBootstrapSQLFile executes SQL commands in a file as the last stage of bootstrap. diff --git a/store/mockstore/unistore/tikv/mvcc.go b/store/mockstore/unistore/tikv/mvcc.go index b173ff6b623d2..7cedfeabbbf1a 100644 --- a/store/mockstore/unistore/tikv/mvcc.go +++ b/store/mockstore/unistore/tikv/mvcc.go @@ -1018,16 +1018,16 @@ func (store *MVCCStore) buildPrewriteLock(reqCtx *requestCtx, m *kvrpcpb.Mutatio lock.Op = uint8(kvrpcpb.Op_Put) } if rowcodec.IsRowKey(m.Key) && lock.Op == uint8(kvrpcpb.Op_Put) { - if rowcodec.IsNewFormat(m.Value) { - reqCtx.buf = m.Value - } else { + if !rowcodec.IsNewFormat(m.Value) { reqCtx.buf, err = encodeFromOldRow(m.Value, reqCtx.buf) if err != nil { log.Error("encode data failed", zap.Binary("value", m.Value), zap.Binary("key", m.Key), zap.Stringer("op", m.Op), zap.Error(err)) return nil, err } + + lock.Value = make([]byte, len(reqCtx.buf)) + copy(lock.Value, reqCtx.buf) } - lock.Value = reqCtx.buf } lock.ForUpdateTS = req.ForUpdateTs diff --git a/ttl/cache/BUILD.bazel b/ttl/cache/BUILD.bazel index 8dd9724472dba..c926ee231cd93 100644 --- a/ttl/cache/BUILD.bazel +++ b/ttl/cache/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "base.go", "infoschema.go", "table.go", + "task.go", "ttlstatus.go", ], importpath = "github.com/pingcap/tidb/ttl/cache", @@ -40,6 +41,7 @@ go_test( "main_test.go", "split_test.go", "table_test.go", + "task_test.go", "ttlstatus_test.go", ], embed = [":cache"], @@ -49,6 +51,7 @@ go_test( "//kv", "//parser/model", "//server", + "//session", "//store/helper", "//tablecodec", "//testkit", diff --git a/ttl/cache/task.go b/ttl/cache/task.go new file mode 100644 index 0000000000000..2d06958ddb8da --- /dev/null +++ b/ttl/cache/task.go @@ -0,0 +1,183 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "encoding/json" + "time" + + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/codec" +) + +const selectFromTTLTask = `SELECT LOW_PRIORITY + job_id, + table_id, + scan_id, + scan_range_start, + scan_range_end, + expire_time, + owner_id, + owner_addr, + owner_hb_time, + status, + status_update_time, + state, + created_time FROM mysql.tidb_ttl_task` +const insertIntoTTLTask = `INSERT LOW_PRIORITY INTO mysql.tidb_ttl_task SET + job_id = %?, + table_id = %?, + scan_id = %?, + scan_range_start = %?, + scan_range_end = %?, + expire_time = %?, + created_time = %?` + +// SelectFromTTLTaskWithID returns an SQL statement to get all tasks of the specified job in mysql.tidb_ttl_task +func SelectFromTTLTaskWithID(jobID string) (string, []interface{}) { + return selectFromTTLTask + " WHERE job_id = %?", []interface{}{jobID} +} + +// InsertIntoTTLTask returns an SQL statement to insert a ttl task into mysql.tidb_ttl_task +func InsertIntoTTLTask(sctx sessionctx.Context, jobID string, tableID int64, scanID int, scanRangeStart []types.Datum, scanRangeEnd []types.Datum, expireTime time.Time, createdTime time.Time) (string, []interface{}, error) { + rangeStart, err := codec.EncodeKey(sctx.GetSessionVars().StmtCtx, []byte{}, scanRangeStart...) + if err != nil { + return "", nil, err + } + rangeEnd, err := codec.EncodeKey(sctx.GetSessionVars().StmtCtx, []byte{}, scanRangeEnd...) + if err != nil { + return "", nil, err + } + return insertIntoTTLTask, []interface{}{jobID, tableID, int64(scanID), rangeStart, rangeEnd, expireTime, createdTime}, nil +} + +// TaskStatus represents the current status of a task +type TaskStatus string + +const ( + // TaskStatusWaiting means the task hasn't started + TaskStatusWaiting TaskStatus = "waiting" + // TaskStatusRunning means this task is running + TaskStatusRunning = "running" + // TaskStatusFinished means this task has finished + TaskStatusFinished = "finished" +) + +// TTLTask is a row recorded in mysql.tidb_ttl_task +type TTLTask struct { + JobID string + TableID int64 + ScanID int64 + ScanRangeStart []types.Datum + ScanRangeEnd []types.Datum + ExpireTime time.Time + OwnerID string + OwnerAddr string + OwnerHBTime time.Time + Status TaskStatus + StatusUpdateTime time.Time + State *TTLTaskState + CreatedTime time.Time +} + +// TTLTaskState records the internal states of the ttl task +type TTLTaskState struct { + TotalRows uint64 `json:"total_rows"` + SuccessRows uint64 `json:"success_rows"` + ErrorRows uint64 `json:"error_rows"` + + ScanTaskErr string `json:"scan_task_err"` +} + +// RowToTTLTask converts a row into TTL task +func RowToTTLTask(sctx sessionctx.Context, row chunk.Row) (*TTLTask, error) { + var err error + timeZone := sctx.GetSessionVars().Location() + + task := &TTLTask{ + JobID: row.GetString(0), + TableID: row.GetInt64(1), + ScanID: row.GetInt64(2), + } + if !row.IsNull(3) { + scanRangeStartBuf := row.GetBytes(3) + // it's still posibble to be nil even this column is not NULL + if scanRangeStartBuf != nil { + task.ScanRangeStart, err = codec.Decode(scanRangeStartBuf, len(scanRangeStartBuf)) + if err != nil { + return nil, err + } + } + } + if !row.IsNull(4) { + scanRangeEndBuf := row.GetBytes(4) + // it's still posibble to be nil even this column is not NULL + if scanRangeEndBuf != nil { + task.ScanRangeEnd, err = codec.Decode(scanRangeEndBuf, len(scanRangeEndBuf)) + if err != nil { + return nil, err + } + } + } + + task.ExpireTime, err = row.GetTime(5).GoTime(timeZone) + if err != nil { + return nil, err + } + + if !row.IsNull(6) { + task.OwnerID = row.GetString(6) + } + if !row.IsNull(7) { + task.OwnerAddr = row.GetString(7) + } + if !row.IsNull(8) { + task.OwnerHBTime, err = row.GetTime(8).GoTime(timeZone) + if err != nil { + return nil, err + } + } + if !row.IsNull(9) { + status := row.GetString(9) + if len(status) == 0 { + status = "waiting" + } + task.Status = TaskStatus(status) + } + if !row.IsNull(10) { + task.StatusUpdateTime, err = row.GetTime(10).GoTime(timeZone) + if err != nil { + return nil, err + } + } + if !row.IsNull(11) { + stateStr := row.GetString(11) + state := &TTLTaskState{} + err = json.Unmarshal([]byte(stateStr), state) + if err != nil { + return nil, err + } + task.State = state + } + + task.CreatedTime, err = row.GetTime(12).GoTime(timeZone) + if err != nil { + return nil, err + } + + return task, nil +} diff --git a/ttl/cache/task_test.go b/ttl/cache/task_test.go new file mode 100644 index 0000000000000..555ad8566f887 --- /dev/null +++ b/ttl/cache/task_test.go @@ -0,0 +1,117 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache_test + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/codec" + "github.com/stretchr/testify/require" +) + +type taskGetter struct { + ctx context.Context + t *testing.T + tk *testkit.TestKit +} + +func newTaskGetter(ctx context.Context, t *testing.T, tk *testkit.TestKit) *taskGetter { + return &taskGetter{ + ctx, t, tk, + } +} + +func (tg *taskGetter) mustGetTestTask() *cache.TTLTask { + sql, args := cache.SelectFromTTLTaskWithID("test-job") + rs, err := tg.tk.Session().ExecuteInternal(tg.ctx, sql, args...) + require.NoError(tg.t, err) + rows, err := session.GetRows4Test(context.Background(), tg.tk.Session(), rs) + require.NoError(tg.t, err) + task, err := cache.RowToTTLTask(tg.tk.Session(), rows[0]) + require.NoError(tg.t, err) + return task +} + +func TestRowToTTLTask(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.Session().GetSessionVars().TimeZone = time.Local + + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL) + tg := newTaskGetter(ctx, t, tk) + + now := time.Now() + now = now.Round(time.Second) + + sql, args, err := cache.InsertIntoTTLTask(tk.Session(), "test-job", 1, 1, nil, nil, now, now) + require.NoError(t, err) + // tk.MustExec cannot handle the NULL parameter, use the `tk.Session().ExecuteInternal` instead here. + _, err = tk.Session().ExecuteInternal(ctx, sql, args...) + require.NoError(t, err) + task := tg.mustGetTestTask() + require.Equal(t, "test-job", task.JobID) + require.Equal(t, int64(1), task.TableID) + require.Equal(t, int64(1), task.ScanID) + require.Nil(t, task.ScanRangeStart) + require.Nil(t, task.ScanRangeEnd) + require.Equal(t, now, task.ExpireTime) + require.Equal(t, now, task.CreatedTime) + + rangeStart, err := codec.EncodeKey(tk.Session().GetSessionVars().StmtCtx, []byte{}, []types.Datum{types.NewDatum(1)}...) + require.NoError(t, err) + rangeEnd, err := codec.EncodeKey(tk.Session().GetSessionVars().StmtCtx, []byte{}, []types.Datum{types.NewDatum(2)}...) + require.NoError(t, err) + tk.MustExec("UPDATE mysql.tidb_ttl_task SET scan_range_start = ?, scan_range_end = ? WHERE job_id = 'test-job'", rangeStart, rangeEnd) + + task = tg.mustGetTestTask() + require.Equal(t, []types.Datum{types.NewDatum(1)}, task.ScanRangeStart) + require.Equal(t, []types.Datum{types.NewDatum(2)}, task.ScanRangeEnd) +} + +func TestInsertIntoTTLTask(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.Session().GetSessionVars().TimeZone = time.Local + + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL) + tg := newTaskGetter(ctx, t, tk) + + rangeStart := []types.Datum{types.NewDatum(1)} + rangeEnd := []types.Datum{types.NewDatum(2)} + + now := time.Now() + now = now.Round(time.Second) + + sql, args, err := cache.InsertIntoTTLTask(tk.Session(), "test-job", 1, 1, rangeStart, rangeEnd, now, now) + require.NoError(t, err) + // tk.MustExec cannot handle the NULL parameter, use the `tk.Session().ExecuteInternal` instead here. + _, err = tk.Session().ExecuteInternal(ctx, sql, args...) + require.NoError(t, err) + task := tg.mustGetTestTask() + require.Equal(t, "test-job", task.JobID) + require.Equal(t, int64(1), task.TableID) + require.Equal(t, int64(1), task.ScanID) + require.Equal(t, []types.Datum{types.NewDatum(1)}, task.ScanRangeStart) + require.Equal(t, []types.Datum{types.NewDatum(2)}, task.ScanRangeEnd) + require.Equal(t, now, task.ExpireTime) + require.Equal(t, now, task.CreatedTime) +} diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index 4657828a9061a..48e6a411b1826 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -31,6 +31,7 @@ go_library( "//util/logutil", "//util/sqlexec", "//util/timeutil", + "@com_github_google_uuid//:uuid", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", diff --git a/ttl/ttlworker/job.go b/ttl/ttlworker/job.go index 02f402f90888d..4b5ffb147bf1d 100644 --- a/ttl/ttlworker/job.go +++ b/ttl/ttlworker/job.go @@ -45,6 +45,7 @@ const finishJobTemplate = `UPDATE mysql.tidb_ttl_table_status current_job_status_update_time = NULL WHERE table_id = %? AND current_job_id = %?` const updateJobStateTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_state = %? WHERE table_id = %? AND current_job_id = %? AND current_job_owner_id = %?" +const removeTaskForJobTemplate = "DELETE FROM mysql.tidb_ttl_task WHERE job_id = %?" func updateJobCurrentStatusSQL(tableID int64, oldStatus cache.JobStatus, newStatus cache.JobStatus, jobID string) (string, []interface{}) { return updateJobCurrentStatusTemplate, []interface{}{string(newStatus), tableID, string(oldStatus), jobID} @@ -58,6 +59,10 @@ func updateJobState(tableID int64, currentJobID string, currentJobState string, return updateJobStateTemplate, []interface{}{currentJobState, tableID, currentJobID, currentJobOwnerID} } +func removeTaskForJob(jobID string) (string, []interface{}) { + return removeTaskForJobTemplate, []interface{}{jobID} +} + type ttlJob struct { id string ownerID string @@ -128,12 +133,27 @@ func (job *ttlJob) finish(se session.Session, now time.Time) { if err != nil { logutil.Logger(job.ctx).Warn("fail to generate summary for ttl job", zap.Error(err)) } + // at this time, the job.ctx may have been canceled (to cancel this job) // even when it's canceled, we'll need to update the states, so use another context - sql, args := finishJobSQL(job.tbl.ID, now, summary, job.id) - _, err = se.ExecuteSQL(context.TODO(), sql, args...) + err = se.RunInTxn(context.TODO(), func() error { + sql, args := finishJobSQL(job.tbl.ID, now, summary, job.id) + _, err = se.ExecuteSQL(context.TODO(), sql, args...) + if err != nil { + return errors.Wrapf(err, "execute sql: %s", sql) + } + + sql, args = removeTaskForJob(job.id) + _, err = se.ExecuteSQL(context.TODO(), sql, args...) + if err != nil { + return errors.Wrapf(err, "execute sql: %s", sql) + } + + return nil + }) + if err != nil { - logutil.Logger(job.ctx).Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id), zap.String("sql", sql), zap.Any("arguments", args)) + logutil.Logger(job.ctx).Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id)) } } diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 4eb014e47ad52..910038666c1b6 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -20,7 +20,9 @@ import ( "strings" "time" + "github.com/google/uuid" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/duration" "github.com/pingcap/tidb/parser/terror" @@ -38,7 +40,7 @@ import ( const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%?, %?)" const setTableStatusOwnerTemplate = `UPDATE mysql.tidb_ttl_table_status - SET current_job_id = UUID(), + SET current_job_id = %?, current_job_owner_id = %?, current_job_start_time = %?, current_job_status = 'waiting', @@ -54,8 +56,8 @@ func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) (string, [] return insertNewTableIntoStatusTemplate, []interface{}{tableID, parentTableID} } -func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) (string, []interface{}) { - return setTableStatusOwnerTemplate, []interface{}{id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID} +func setTableStatusOwnerSQL(uuid string, tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) (string, []interface{}) { + return setTableStatusOwnerTemplate, []interface{}{uuid, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID} } func updateHeartBeatSQL(tableID int64, now time.Time, id string) (string, []interface{}) { @@ -706,9 +708,33 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * return err } - sql, args = setTableStatusOwnerSQL(table.ID, now, expireTime, m.id) + jobID := uuid.New().String() + failpoint.Inject("set-job-uuid", func(val failpoint.Value) { + jobID = val.(string) + }) + + sql, args = setTableStatusOwnerSQL(jobID, table.ID, now, expireTime, m.id) _, err = se.ExecuteSQL(ctx, sql, args...) - return errors.Wrapf(err, "execute sql: %s", sql) + if err != nil { + return errors.Wrapf(err, "execute sql: %s", sql) + } + + ranges, err := table.SplitScanRanges(ctx, m.store, splitScanCount) + if err != nil { + return errors.Wrap(err, "split scan ranges") + } + for scanID, r := range ranges { + sql, args, err = cache.InsertIntoTTLTask(se, jobID, table.ID, scanID, r.Start, r.End, expireTime, now) + if err != nil { + return errors.Wrap(err, "encode scan task") + } + _, err = se.ExecuteSQL(ctx, sql, args...) + if err != nil { + return errors.Wrapf(err, "execute sql: %s", sql) + } + } + + return nil }) if err != nil { return nil, err diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go index 918d05a0766f1..a1e284540a594 100644 --- a/ttl/ttlworker/job_manager_integration_test.go +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -126,6 +126,7 @@ func TestFinishJob(t *testing.T) { job.Finish(se, time.Now()) tk.MustQuery("select table_id, last_job_summary from mysql.tidb_ttl_table_status").Check(testkit.Rows("2 {\"total_rows\":0,\"success_rows\":0,\"error_rows\":0,\"total_scan_task\":1,\"scheduled_scan_task\":0,\"finished_scan_task\":0,\"scan_task_err\":\"\\\"'an error message contains both single and double quote'\\\"\"}")) + tk.MustQuery("select * from mysql.tidb_ttl_task").Check(testkit.Rows()) } func TestTTLAutoAnalyze(t *testing.T) { diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go index 3c31fd021e0e2..97a84b3cb82a2 100644 --- a/ttl/ttlworker/job_manager_test.go +++ b/ttl/ttlworker/job_manager_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx/variable" @@ -28,6 +29,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func newTTLTableStatusRows(status ...*cache.TableStatus) []chunk.Row { @@ -248,6 +250,16 @@ func TestLockNewTable(t *testing.T) { args, } } + getExecuteInfoWithErr := func(sql string, args []interface{}, err error) executeInfo { + require.NoError(t, err) + return executeInfo{ + sql, + args, + } + } + failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/set-job-uuid", `return("test-job-id")`) + defer failpoint.Disable("github.com/pingcap/tidb/ttl/ttlworker/set-job-uuid") + type sqlExecute struct { executeInfo @@ -267,7 +279,11 @@ func TestLockNewTable(t *testing.T) { newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { - getExecuteInfo(setTableStatusOwnerSQL(1, now, expireTime, "test-id")), + getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, expireTime, "test-id")), + nil, nil, + }, + { + getExecuteInfoWithErr(cache.InsertIntoTTLTask(newMockSession(t), "test-job-id", 1, 0, nil, nil, expireTime, now)), nil, nil, }, { @@ -289,7 +305,11 @@ func TestLockNewTable(t *testing.T) { newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { - getExecuteInfo(setTableStatusOwnerSQL(1, now, expireTime, "test-id")), + getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, expireTime, "test-id")), + nil, nil, + }, + { + getExecuteInfoWithErr(cache.InsertIntoTTLTask(newMockSession(t), "test-job-id", 1, 0, nil, nil, expireTime, now)), nil, nil, }, { @@ -303,9 +323,13 @@ func TestLockNewTable(t *testing.T) { newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { - getExecuteInfo(setTableStatusOwnerSQL(1, now, expireTime, "test-id")), + getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, expireTime, "test-id")), nil, errors.New("test error message"), }, + { + getExecuteInfoWithErr(cache.InsertIntoTTLTask(newMockSession(t), "test-job-id", 1, 0, nil, nil, expireTime, now)), + nil, nil, + }, }, false, true}, } @@ -317,8 +341,8 @@ func TestLockNewTable(t *testing.T) { se := newMockSession(t) se.executeSQL = func(ctx context.Context, sql string, args ...interface{}) (rows []chunk.Row, err error) { assert.Less(t, sqlCounter, len(c.sqls)) - assert.Equal(t, sql, c.sqls[sqlCounter].sql) - assert.Equal(t, args, c.sqls[sqlCounter].args) + assert.Equal(t, c.sqls[sqlCounter].sql, sql) + assert.Equal(t, c.sqls[sqlCounter].args, args) rows = c.sqls[sqlCounter].rows err = c.sqls[sqlCounter].err @@ -554,7 +578,7 @@ func TestCheckFinishedJob(t *testing.T) { now := se.Now() jobID := m.runningJobs[0].id se.executeSQL = func(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error) { - if len(args) > 0 { + if len(args) > 1 { meetArg = true expectedSQL, expectedArgs := finishJobSQL(tbl.ID, now, "{\"total_rows\":1,\"success_rows\":1,\"error_rows\":0,\"total_scan_task\":1,\"scheduled_scan_task\":1,\"finished_scan_task\":1}", jobID) assert.Equal(t, expectedSQL, sql)