Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: remove useless task state #50071

Merged
merged 11 commits into from
Jan 9, 2024
Merged
2 changes: 1 addition & 1 deletion pkg/disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestFrameworkWithQuery(t *testing.T) {
distContext.Close()
}

func TestFrameworkCancelGTask(t *testing.T) {
func TestFrameworkCancelTask(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 2)
defer ctrl.Finish()

Expand Down
8 changes: 2 additions & 6 deletions pkg/disttask/framework/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ func TestRunWithRetry(t *testing.T) {
)
require.Error(t, err)
}()
require.Eventually(t, func() bool {
return end.Load()
}, 5*time.Second, 100*time.Millisecond)
require.Eventually(t, end.Load, 5*time.Second, 100*time.Millisecond)

// fail with retryable error once, then success
end.Store(false)
Expand All @@ -134,9 +132,7 @@ func TestRunWithRetry(t *testing.T) {
)
require.NoError(t, err)
}()
require.Eventually(t, func() bool {
return end.Load()
}, 5*time.Second, 100*time.Millisecond)
require.Eventually(t, end.Load, 5*time.Second, 100*time.Millisecond)

// context done
subctx, cancel := context.WithCancel(ctx)
Expand Down
4 changes: 3 additions & 1 deletion pkg/disttask/framework/proto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"node.go",
"subtask.go",
"task.go",
"type.go",
],
importpath = "github.com/pingcap/tidb/pkg/disttask/framework/proto",
visibility = ["//visibility:public"],
Expand All @@ -17,9 +18,10 @@ go_test(
srcs = [
"subtask_test.go",
"task_test.go",
"type_test.go",
],
embed = [":proto"],
flaky = True,
shard_count = 4,
shard_count = 5,
deps = ["@com_github_stretchr_testify//require"],
)
2 changes: 1 addition & 1 deletion pkg/disttask/framework/proto/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ const (
SubtaskStateFailed SubtaskState = "failed"
SubtaskStateCanceled SubtaskState = "canceled"
SubtaskStatePaused SubtaskState = "paused"
SubtaskStateRevertPending SubtaskState = "revert_pending"
SubtaskStateReverting SubtaskState = "reverting"
SubtaskStateReverted SubtaskState = "reverted"
SubtaskStateRevertFailed SubtaskState = "revert_failed"
SubtaskStateRevertPending SubtaskState = "revert_pending"
)

type (
Expand Down
57 changes: 11 additions & 46 deletions pkg/disttask/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,17 @@ import (
//
// TODO: we don't have revert_failed task for now.
const (
TaskStatePending TaskState = "pending"
TaskStateRunning TaskState = "running"
TaskStateSucceed TaskState = "succeed"
TaskStateReverting TaskState = "reverting"
TaskStateFailed TaskState = "failed"
TaskStateRevertFailed TaskState = "revert_failed"
TaskStateCancelling TaskState = "cancelling"
TaskStateCanceled TaskState = "canceled"
TaskStatePausing TaskState = "pausing"
TaskStatePaused TaskState = "paused"
TaskStateResuming TaskState = "resuming"
TaskStateRevertPending TaskState = "revert_pending"
TaskStateReverted TaskState = "reverted"
TaskStatePending TaskState = "pending"
TaskStateRunning TaskState = "running"
TaskStateSucceed TaskState = "succeed"
TaskStateFailed TaskState = "failed"
TaskStateReverting TaskState = "reverting"
TaskStateReverted TaskState = "reverted"
TaskStateRevertFailed TaskState = "revert_failed"
TaskStateCancelling TaskState = "cancelling"
TaskStatePausing TaskState = "pausing"
TaskStatePaused TaskState = "paused"
TaskStateResuming TaskState = "resuming"
)

type (
Expand Down Expand Up @@ -148,36 +146,3 @@ func (t *Task) Compare(other *Task) int {
}
return int(t.ID - other.ID)
}

const (
// TaskTypeExample is TaskType of Example.
TaskTypeExample TaskType = "Example"
// ImportInto is TaskType of ImportInto.
ImportInto TaskType = "ImportInto"
// Backfill is TaskType of add index Backfilling process.
Backfill TaskType = "backfill"
)

// Type2Int converts task type to int.
func Type2Int(t TaskType) int {
switch t {
case TaskTypeExample:
return 1
case ImportInto:
return 2
default:
return 0
}
}

// Int2Type converts int to task type.
func Int2Type(i int) TaskType {
switch i {
case 1:
return TaskTypeExample
case 2:
return ImportInto
default:
return ""
}
}
1 change: 0 additions & 1 deletion pkg/disttask/framework/proto/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func TestTaskIsDone(t *testing.T) {
{TaskStateFailed, true},
{TaskStateRevertFailed, false},
{TaskStateCancelling, false},
{TaskStateCanceled, false},
{TaskStatePausing, false},
{TaskStatePaused, false},
{TaskStateReverted, true},
Expand Down
52 changes: 52 additions & 0 deletions pkg/disttask/framework/proto/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package proto

const (
// TaskTypeExample is TaskType of Example.
TaskTypeExample TaskType = "Example"
// ImportInto is TaskType of ImportInto.
ImportInto TaskType = "ImportInto"
// Backfill is TaskType of add index Backfilling process.
Backfill TaskType = "backfill"
)

// Type2Int converts task type to int.
func Type2Int(t TaskType) int {
switch t {
case TaskTypeExample:
return 1
case ImportInto:
return 2
case Backfill:
return 3
default:
return 0
}
}

// Int2Type converts int to task type.
func Int2Type(i int) TaskType {
switch i {
case 1:
return TaskTypeExample
case 2:
return ImportInto
case 3:
return Backfill
default:
return ""
}
}
40 changes: 40 additions & 0 deletions pkg/disttask/framework/proto/type_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package proto

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestTaskType(t *testing.T) {
cases := []struct {
tp TaskType
val int
}{
{TaskTypeExample, 1},
{ImportInto, 2},
{Backfill, 3},
{"", 0},
}
for _, c := range cases {
require.Equal(t, c.val, Type2Int(c.tp))
}

for _, c := range cases {
require.Equal(t, c.tp, Int2Type(c.val))
}
}
10 changes: 5 additions & 5 deletions pkg/disttask/framework/scheduler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import (
"go.uber.org/goleak"
)

// GetRunningGTaskCnt implements Scheduler.GetRunningGTaskCnt interface.
// GetRunningTaskCnt implements Scheduler.GetRunningTaskCnt interface.
func (sm *Manager) GetRunningTaskCnt() int {
return sm.getSchedulerCount()
}

// DelRunningGTask implements Scheduler.DelRunningGTask interface.
// DelRunningTask implements Scheduler.DelRunningTask interface.
func (sm *Manager) DelRunningTask(id int64) {
sm.delScheduler(id)
}
Expand All @@ -48,9 +48,9 @@ func TestMain(m *testing.M) {
testsetup.SetupForCommonTest()

// Make test more fast.
checkTaskRunningInterval = checkTaskRunningInterval / 10
checkTaskFinishedInterval = checkTaskFinishedInterval / 10
RetrySQLInterval = RetrySQLInterval / 20
checkTaskRunningInterval /= 10
checkTaskFinishedInterval /= 10
RetrySQLInterval /= 20

opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
Expand Down
3 changes: 0 additions & 3 deletions pkg/disttask/framework/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ const (
subtaskCnt = 3
)

var mockedAllServerInfos = []*infosync.ServerInfo{}

func getTestSchedulerExt(ctrl *gomock.Controller) scheduler.Extension {
mockScheduler := mockDispatch.NewMockExtension(ctrl)
mockScheduler.EXPECT().OnTick(gomock.Any(), gomock.Any()).Return().AnyTimes()
Expand Down Expand Up @@ -486,7 +484,6 @@ func TestVerifyTaskStateTransform(t *testing.T) {
{proto.TaskStateRunning, proto.TaskStatePausing, true},
{proto.TaskStateRunning, proto.TaskStateResuming, false},
{proto.TaskStateCancelling, proto.TaskStateRunning, false},
{proto.TaskStateCanceled, proto.TaskStateRunning, false},
}
for _, tc := range testCases {
require.Equal(t, tc.expect, scheduler.VerifyTaskStateTransform(tc.oldState, tc.newState))
Expand Down
6 changes: 1 addition & 5 deletions pkg/disttask/framework/scheduler/state_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@ func VerifyTaskStateTransform(from, to proto.TaskState) bool {
proto.TaskStateRevertFailed: {},
proto.TaskStateCancelling: {
proto.TaskStateReverting,
// no canceled now
// proto.TaskStateCanceled,
},
proto.TaskStateCanceled: {},
proto.TaskStatePausing: {
proto.TaskStatePaused,
},
Expand All @@ -58,8 +55,7 @@ func VerifyTaskStateTransform(from, to proto.TaskState) bool {
proto.TaskStateResuming: {
proto.TaskStateRunning,
},
proto.TaskStateRevertPending: {},
proto.TaskStateReverted: {},
proto.TaskStateReverted: {},
}

if from == to {
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ go_test(
embed = [":storage"],
flaky = True,
race = "on",
shard_count = 18,
shard_count = 19,
deps = [
"//pkg/config",
"//pkg/disttask/framework/proto",
Expand Down
20 changes: 18 additions & 2 deletions pkg/disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ func TestBothTaskAndSubTaskTable(t *testing.T) {

// test transactional
require.NoError(t, sm.DeleteSubtasksByTaskID(ctx, 1))
failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/storage/MockUpdateTaskErr", "1*return(true)")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/storage/MockUpdateTaskErr", "1*return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/storage/MockUpdateTaskErr"))
}()
Expand Down Expand Up @@ -987,7 +987,7 @@ func TestSubtaskHistoryTable(t *testing.T) {
require.Len(t, subTasks, 3)

// test GC history table.
failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds", "return(1)")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds", "return(1)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds"))
}()
Expand Down Expand Up @@ -1148,3 +1148,19 @@ func TestInitMeta(t *testing.T) {
tk.MustExec(`set global tidb_service_scope="background"`)
tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows("background"))
}

func TestSubtaskType(t *testing.T) {
_, sm, ctx := testutil.InitTableTest(t)
cases := []proto.TaskType{
proto.TaskTypeExample,
proto.ImportInto,
proto.Backfill,
"",
}
for i, c := range cases {
testutil.InsertSubtask(t, sm, int64(i+1), proto.StepOne, "tidb-1", []byte(""), proto.SubtaskStateRunning, c, 12)
subtask, err := sm.GetFirstSubtaskInStates(ctx, "tidb-1", int64(i+1), proto.StepOne, proto.SubtaskStateRunning)
require.NoError(t, err)
require.Equal(t, c, subtask.Type)
}
}
4 changes: 2 additions & 2 deletions pkg/disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,9 +1064,9 @@ func (mgr *TaskManager) UpdateTaskAndAddSubTasks(ctx context.Context, task *prot
}
})
if len(subtasks) > 0 {
subtaskState := proto.TaskStatePending
subtaskState := proto.SubtaskStatePending
if task.State == proto.TaskStateReverting {
subtaskState = proto.TaskStateRevertPending
subtaskState = proto.SubtaskStateRevertPending
}

sql := new(strings.Builder)
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/taskexecutor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestManageTask(t *testing.T) {
ctx4, cancel4 := context.WithCancelCause(context.Background())
m.registerCancelFunc(1, cancel4)
mockTaskTable.EXPECT().PauseSubtasks(m.ctx, "test", int64(1)).Return(nil)
m.onPausingTasks([]*proto.Task{{ID: 1}})
require.NoError(t, m.onPausingTasks([]*proto.Task{{ID: 1}}))
require.Equal(t, context.Canceled, ctx4.Err())
}

Expand Down