Skip to content

Commit

Permalink
Merge branch 'master' into bump-tidb
Browse files Browse the repository at this point in the history
  • Loading branch information
okJiang authored Nov 11, 2022
2 parents 63e1d5e + 8216bab commit 03189e1
Show file tree
Hide file tree
Showing 11 changed files with 31 additions and 79 deletions.
23 changes: 4 additions & 19 deletions engine/framework/base_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tiflow/engine/model"
dcontext "github.com/pingcap/tiflow/engine/pkg/context"
"github.com/pingcap/tiflow/engine/pkg/errctx"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model"
metaModel "github.com/pingcap/tiflow/engine/pkg/meta/model"
"github.com/pingcap/tiflow/engine/pkg/p2p"
"github.com/pingcap/tiflow/engine/pkg/promutil"
Expand Down Expand Up @@ -55,13 +54,9 @@ type BaseJobMaster interface {
GetWorkers() map[frameModel.WorkerID]WorkerHandle

// CreateWorker requires the framework to dispatch a new worker.
// If the worker needs to access certain file system resources,
// their ID's must be passed by `resources`.
CreateWorker(workerType WorkerType, config WorkerConfig, cost model.RescUnit, resources ...resModel.ResourceID) (frameModel.WorkerID, error)

// CreateWorkerV2 is the latest version of CreateWorker, but with
// a more flexible way of passing options.
CreateWorkerV2(
// If the worker needs to access certain file system resources, it must pass
// resource ID via CreateWorkerOpt
CreateWorker(
workerType frameModel.WorkerType,
config WorkerConfig,
opts ...CreateWorkerOpt,
Expand Down Expand Up @@ -305,21 +300,11 @@ func (d *DefaultBaseJobMaster) NotifyExit(ctx context.Context, errIn error) (ret

// CreateWorker implements BaseJobMaster.CreateWorker
func (d *DefaultBaseJobMaster) CreateWorker(
workerType WorkerType,
config WorkerConfig,
cost model.RescUnit,
resources ...resModel.ResourceID,
) (frameModel.WorkerID, error) {
return d.master.CreateWorker(workerType, config, cost, resources...)
}

// CreateWorkerV2 implements BaseJobMaster.CreateWorkerV2
func (d *DefaultBaseJobMaster) CreateWorkerV2(
workerType frameModel.WorkerType,
config WorkerConfig,
opts ...CreateWorkerOpt,
) (frameModel.WorkerID, error) {
return d.master.CreateWorkerV2(workerType, config, opts...)
return d.master.CreateWorker(workerType, config, opts...)
}

// UpdateStatus delegates the UpdateStatus of inner worker
Expand Down
2 changes: 1 addition & 1 deletion engine/framework/fake/fake_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (m *Master) InitImpl(ctx context.Context) error {

// This function is not thread safe, it must be called with m.workerListMu locked
func (m *Master) createWorker(wcfg *WorkerConfig) error {
workerID, err := m.CreateWorker(frameModel.FakeTask, wcfg, 1)
workerID, err := m.CreateWorker(frameModel.FakeTask, wcfg, framework.CreateWorkerWithCost(1))
if err != nil {
return errors.Trace(err)
}
Expand Down
34 changes: 4 additions & 30 deletions engine/framework/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,19 +211,11 @@ type BaseMaster interface {
// NOTE: Currently, no implement has used this method, but we still keep it to make the interface intact
Exit(ctx context.Context, exitReason ExitReason, err error, detail []byte) error

// CreateWorker requires the framework to dispatch a new worker.
// If the worker needs to access certain file system resources,
// their ID's must be passed by `resources`.
CreateWorker(
workerType WorkerType,
config WorkerConfig,
cost model.RescUnit,
resources ...resModel.ResourceID,
) (frameModel.WorkerID, error)

// CreateWorkerV2 is the latest version of CreateWorker, but with
// CreateWorker is the latest version of CreateWorker, but with
// a more flexible way of passing options.
CreateWorkerV2(
// If the worker needs to access certain file system resources, it must pass
// resource ID via CreateWorkerOpt
CreateWorker(
workerType frameModel.WorkerType,
config WorkerConfig,
opts ...CreateWorkerOpt,
Expand Down Expand Up @@ -698,24 +690,6 @@ func (m *DefaultBaseMaster) PrepareWorkerConfig(

// CreateWorker implements BaseMaster.CreateWorker
func (m *DefaultBaseMaster) CreateWorker(
workerType frameModel.WorkerType,
config WorkerConfig,
cost model.RescUnit,
resources ...resModel.ResourceID,
) (frameModel.WorkerID, error) {
m.Logger().Info("CreateWorker",
zap.Stringer("worker-type", workerType),
zap.Any("worker-config", config),
zap.Int("cost", int(cost)),
zap.Any("resources", resources),
zap.String("master-id", m.id))

return m.CreateWorkerV2(workerType, config,
CreateWorkerWithCost(cost), CreateWorkerWithResourceRequirements(resources...))
}

// CreateWorkerV2 implements BaseMaster.CreateWorkerV2
func (m *DefaultBaseMaster) CreateWorkerV2(
workerType frameModel.WorkerType,
config WorkerConfig,
opts ...CreateWorkerOpt,
Expand Down
8 changes: 4 additions & 4 deletions engine/framework/master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ func TestMasterCreateWorker(t *testing.T) {
workerID, err := master.CreateWorker(
workerTypePlaceholder,
&dummyConfig{param: 1},
100,
"resource-1",
"resource-2")
CreateWorkerWithCost(100),
CreateWorkerWithResourceRequirements("resource-1", "resource-2"),
)
require.NoError(t, err)
require.Equal(t, workerID1, workerID)

Expand Down Expand Up @@ -275,7 +275,7 @@ func TestMasterCreateWorkerMetError(t *testing.T) {
close(done)
})

_, err = master.CreateWorker(workerTypePlaceholder, &dummyConfig{param: 1}, 100)
_, err = master.CreateWorker(workerTypePlaceholder, &dummyConfig{param: 1}, CreateWorkerWithCost(100))
require.NoError(t, err)

for {
Expand Down
3 changes: 2 additions & 1 deletion engine/jobmaster/cvsjob/cvs_job_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ func (jm *JobMaster) Tick(ctx context.Context) error {
for idx, workerInfo := range jm.syncFilesInfo {
// check if need to recreate worker
if workerInfo.needCreate.Load() {
workerID, err := jm.CreateWorker(frameModel.CvsTask, getTaskConfig(jm.jobStatus, idx), 10)
workerID, err := jm.CreateWorker(frameModel.CvsTask,
getTaskConfig(jm.jobStatus, idx), framework.CreateWorkerWithCost(10))
if err != nil {
log.Warn("create worker failed, try next time", zap.Any("master id", jm.workerID), zap.Error(err))
} else {
Expand Down
5 changes: 3 additions & 2 deletions engine/jobmaster/dm/dm_jobmaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/pingcap/tiflow/engine/pkg/deps"
dmpkg "github.com/pingcap/tiflow/engine/pkg/dm"
"github.com/pingcap/tiflow/engine/pkg/externalresource/broker"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model"
kvmock "github.com/pingcap/tiflow/engine/pkg/meta/mock"
metaModel "github.com/pingcap/tiflow/engine/pkg/meta/model"
pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm"
Expand Down Expand Up @@ -517,7 +516,9 @@ func (m *MockBaseJobmaster) MetaKVClient() metaModel.KVClient {
return args.Get(0).(metaModel.KVClient)
}

func (m *MockBaseJobmaster) CreateWorker(workerType framework.WorkerType, config framework.WorkerConfig, cost model.RescUnit, resources ...resModel.ResourceID) (frameModel.WorkerID, error) {
func (m *MockBaseJobmaster) CreateWorker(workerType framework.WorkerType,
config framework.WorkerConfig, opts ...framework.CreateWorkerOpt,
) (frameModel.WorkerID, error) {
m.mu.Lock()
defer m.mu.Unlock()
args := m.Called()
Expand Down
8 changes: 4 additions & 4 deletions engine/jobmaster/dm/worker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tiflow/engine/jobmaster/dm/metadata"
"github.com/pingcap/tiflow/engine/jobmaster/dm/runtime"
"github.com/pingcap/tiflow/engine/jobmaster/dm/ticker"
"github.com/pingcap/tiflow/engine/model"
dmpkg "github.com/pingcap/tiflow/engine/pkg/dm"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model"
"go.uber.org/zap"
Expand All @@ -45,8 +44,7 @@ type WorkerAgent interface {
CreateWorker(
workerType framework.WorkerType,
config framework.WorkerConfig,
cost model.RescUnit,
resources ...resModel.ResourceID,
opts ...framework.CreateWorkerOpt,
) (frameModel.WorkerID, error)
}

Expand Down Expand Up @@ -356,7 +354,9 @@ func (wm *WorkerManager) createWorker(
resources ...resModel.ResourceID,
) error {
wm.logger.Info("start to create worker", zap.String("task_id", taskID), zap.Stringer("unit", unit))
workerID, err := wm.workerAgent.CreateWorker(unit, taskCfg, 1, resources...)
workerID, err := wm.workerAgent.CreateWorker(unit, taskCfg,
framework.CreateWorkerWithCost(1),
framework.CreateWorkerWithResourceRequirements(resources...))
if err != nil {
wm.logger.Error("failed to create workers", zap.String("task_id", taskID), zap.Stringer("unit", unit), zap.Error(err))
}
Expand Down
7 changes: 4 additions & 3 deletions engine/jobmaster/dm/worker_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ import (
"github.com/pingcap/tiflow/engine/jobmaster/dm/config"
"github.com/pingcap/tiflow/engine/jobmaster/dm/metadata"
"github.com/pingcap/tiflow/engine/jobmaster/dm/runtime"
"github.com/pingcap/tiflow/engine/model"
dmpkg "github.com/pingcap/tiflow/engine/pkg/dm"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model"
kvmock "github.com/pingcap/tiflow/engine/pkg/meta/mock"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -617,7 +615,10 @@ type MockWorkerAgent struct {
mock.Mock
}

func (mockAgent *MockWorkerAgent) CreateWorker(workerType framework.WorkerType, taskCfg interface{}, cost model.RescUnit, resources ...resModel.ResourceID) (frameModel.WorkerID, error) {
func (mockAgent *MockWorkerAgent) CreateWorker(
workerType framework.WorkerType, taskCfg interface{},
opts ...framework.CreateWorkerOpt,
) (frameModel.WorkerID, error) {
mockAgent.Lock()
defer mockAgent.Unlock()
args := mockAgent.Called()
Expand Down
3 changes: 2 additions & 1 deletion engine/jobmaster/example/master_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ type exampleMaster struct {
func (e *exampleMaster) InitImpl(ctx context.Context) (err error) {
log.Info("InitImpl")
e.worker.mu.Lock()
e.worker.id, err = e.CreateWorker(exampleWorkerType, exampleWorkerCfg, exampleWorkerCost)
e.worker.id, err = e.CreateWorker(exampleWorkerType, exampleWorkerCfg,
framework.CreateWorkerWithCost(exampleWorkerCost))
e.worker.mu.Unlock()
return
}
Expand Down
6 changes: 3 additions & 3 deletions engine/servermaster/jobmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (jm *JobManagerImpl) CreateJob(ctx context.Context, req *pb.CreateJobReques

// CreateWorker here is to create job master actually
// TODO: use correct worker cost
workerID, err := jm.BaseMaster.CreateWorkerV2(
workerID, err := jm.BaseMaster.CreateWorker(
meta.Type, meta,
framework.CreateWorkerWithCost(defaultJobMasterCost),
framework.CreateWorkerWithSelectors(selectors...))
Expand Down Expand Up @@ -618,7 +618,7 @@ func (jm *JobManagerImpl) Tick(ctx context.Context) error {
return "", errors.ErrMasterCreateWorkerBackoff.FastGenByArgs()
}
return jm.BaseMaster.CreateWorker(
job.Type, job, defaultJobMasterCost)
job.Type, job, framework.CreateWorkerWithCost(defaultJobMasterCost))
})
if _, err = filterQuotaError(err); err != nil {
return err
Expand All @@ -645,7 +645,7 @@ func (jm *JobManagerImpl) Tick(ctx context.Context) error {
err = jm.JobFsm.IterWaitAckJobs(
func(job *frameModel.MasterMeta) (string, error) {
return jm.BaseMaster.CreateWorker(
job.Type, job, defaultJobMasterCost)
job.Type, job, framework.CreateWorkerWithCost(defaultJobMasterCost))
})
exceedQuota, err := filterQuotaError(err)
if err != nil {
Expand Down
11 changes: 0 additions & 11 deletions engine/servermaster/jobmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ import (
"github.com/pingcap/tiflow/engine/framework"
"github.com/pingcap/tiflow/engine/framework/metadata"
frameModel "github.com/pingcap/tiflow/engine/framework/model"
"github.com/pingcap/tiflow/engine/model"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/engine/pkg/ctxmu"
resManager "github.com/pingcap/tiflow/engine/pkg/externalresource/manager"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model"
jobMock "github.com/pingcap/tiflow/engine/pkg/httputil/mock"
"github.com/pingcap/tiflow/engine/pkg/notifier"
pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm"
Expand Down Expand Up @@ -125,15 +123,6 @@ type mockBaseMasterCreateWorkerFailed struct {
}

func (m *mockBaseMasterCreateWorkerFailed) CreateWorker(
workerType framework.WorkerType,
config framework.WorkerConfig,
cost model.RescUnit,
resources ...resModel.ResourceID,
) (frameModel.WorkerID, error) {
return "", errors.ErrMasterConcurrencyExceeded.FastGenByArgs()
}

func (m *mockBaseMasterCreateWorkerFailed) CreateWorkerV2(
workerType framework.WorkerType,
config framework.WorkerConfig,
opts ...framework.CreateWorkerOpt,
Expand Down

0 comments on commit 03189e1

Please sign in to comment.