Skip to content

Commit

Permalink
*(engine): add comments to interface (#7155)
Browse files Browse the repository at this point in the history
ref #4287
  • Loading branch information
lance6716 authored Sep 28, 2022
1 parent 54e3c74 commit 7a9b9f9
Show file tree
Hide file tree
Showing 21 changed files with 191 additions and 151 deletions.
3 changes: 1 addition & 2 deletions engine/executor/cvs/cvstask.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,10 @@ func (task *cvsTask) OnMasterMessage(ctx context.Context, topic p2p.Topic, messa
}

// CloseImpl tells the WorkerImpl to quitrunStatusWorker and release resources.
func (task *cvsTask) CloseImpl(ctx context.Context) error {
func (task *cvsTask) CloseImpl(ctx context.Context) {
if task.cancelFn != nil {
task.cancelFn()
}
return nil
}

func (task *cvsTask) Receive(ctx context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion engine/executor/dm/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestStopWorker(t *testing.T) {
require.NoError(t, err)

// mock close by framework
require.NoError(t, dmWorker.CloseImpl(context.Background()))
dmWorker.CloseImpl(context.Background())
}

func TestOperateTask(t *testing.T) {
Expand Down
22 changes: 9 additions & 13 deletions engine/executor/dm/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,33 +164,29 @@ func (w *dmWorker) Workload() model.RescUnit {
return 0
}

// OnMasterFailover implements lib.WorkerImpl.OnMasterFailover
func (w *dmWorker) OnMasterFailover(reason framework.MasterFailoverReason) error {
w.Logger().Info("dmworker.OnMasterFailover")
return nil
}

// OnMasterMessage implements lib.WorkerImpl.OnMasterMessage
func (w *dmWorker) OnMasterMessage(ctx context.Context, topic p2p.Topic, message p2p.MessageValue) error {
w.Logger().Info("dmworker.OnMasterMessage", zap.String("topic", topic), zap.Any("message", message))
return nil
}

// CloseImpl implements lib.WorkerImpl.CloseImpl
func (w *dmWorker) CloseImpl(ctx context.Context) error {
func (w *dmWorker) CloseImpl(ctx context.Context) {
w.Logger().Info("close the dm worker", zap.String("task-id", w.taskID))
var recordErr error
// unregister jobmaster client

if err := w.unitHolder.Close(ctx); err != nil {
w.Logger().Error("fail to close unit holder", zap.Error(err))
}

if w.messageAgent == nil {
return
}
if err := w.messageAgent.UpdateClient(w.masterID, nil); err != nil {
w.Logger().Error("failed to update message client", zap.Error(err))
recordErr = err
}
w.unitHolder.Close(ctx)
if err := w.messageAgent.Close(ctx); err != nil {
w.Logger().Error("failed to close message client", zap.Error(err))
recordErr = err
}
return recordErr
}

// setupStorage opens and configs external storage
Expand Down
1 change: 0 additions & 1 deletion engine/executor/dm/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func TestWorker(t *testing.T) {

// placeholder
require.Equal(t, model.RescUnit(0), dmWorker.Workload())
require.NoError(t, dmWorker.OnMasterFailover(framework.MasterFailoverReason{}))
require.NoError(t, dmWorker.OnMasterMessage(context.Background(), "", nil))

// Finished
Expand Down
34 changes: 13 additions & 21 deletions engine/framework/base_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,13 @@ type JobMasterImpl interface {
// OnCancel is triggered when a cancel message is received. It can be
// triggered multiple times.
OnCancel(ctx context.Context) error
// OnOpenAPIInitialized is called when the OpenAPI is initialized.
// This is used to for JobMaster to register its OpenAPI handler.
// The implementation must not retain the apiGroup. It must register
// its OpenAPI handler before this function returns.
// OnOpenAPIInitialized is called as the first callback function of the JobMasterImpl
// instance, the business logic should only register the OpenAPI handler in it.
// The implementation must not retain the apiGroup.
// Note: this function is called before Init().
// Concurrent safety:
// - this function is called as the first callback function of an JobMasterImpl
// instance, and it's not concurrent with other callbacks.
OnOpenAPIInitialized(apiGroup *gin.RouterGroup)

// IsJobMasterImpl is an empty function used to prevent accidental implementation
Expand Down Expand Up @@ -252,10 +254,7 @@ func (d *DefaultBaseJobMaster) GetWorkers() map[frameModel.WorkerID]WorkerHandle
// Close implements BaseJobMaster.Close
func (d *DefaultBaseJobMaster) Close(ctx context.Context) error {
d.closeOnce.Do(func() {
err := d.impl.CloseImpl(ctx)
if err != nil {
d.Logger().Error("Failed to close JobMasterImpl", zap.Error(err))
}
d.impl.CloseImpl(ctx)
})

d.master.persistMetaError()
Expand All @@ -269,9 +268,7 @@ func (d *DefaultBaseJobMaster) Stop(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()

if err := d.impl.StopImpl(ctx); err != nil {
d.Logger().Error("Failed to stop JobMasterImpl", zap.Error(err))
}
d.impl.StopImpl(ctx)
d.master.doClose()
d.worker.doClose()
return nil
Expand All @@ -287,10 +284,7 @@ func (d *DefaultBaseJobMaster) NotifyExit(ctx context.Context, errIn error) (ret
}

d.closeOnce.Do(func() {
err := d.impl.CloseImpl(ctx)
if err != nil {
log.Error("Failed to close JobMasterImpl", zap.Error(err))
}
d.impl.CloseImpl(ctx)
})

startTime := time.Now()
Expand Down Expand Up @@ -436,9 +430,8 @@ func (j *jobMasterImplAsWorkerImpl) OnMasterMessage(
return nil
}

func (j *jobMasterImplAsWorkerImpl) CloseImpl(ctx context.Context) error {
func (j *jobMasterImplAsWorkerImpl) CloseImpl(ctx context.Context) {
log.Panic("unexpected Close call")
return nil
}

type jobMasterImplAsMasterImpl struct {
Expand Down Expand Up @@ -479,12 +472,11 @@ func (j *jobMasterImplAsMasterImpl) OnWorkerMessage(worker WorkerHandle, topic p
return j.inner.OnWorkerMessage(worker, topic, message)
}

func (j *jobMasterImplAsMasterImpl) CloseImpl(ctx context.Context) error {
func (j *jobMasterImplAsMasterImpl) CloseImpl(ctx context.Context) {
log.Panic("unexpected Close call")
return nil
return
}

func (j *jobMasterImplAsMasterImpl) StopImpl(ctx context.Context) error {
func (j *jobMasterImplAsMasterImpl) StopImpl(ctx context.Context) {
log.Panic("unexpected StopImpl call")
return nil
}
18 changes: 8 additions & 10 deletions engine/framework/base_jobmaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,18 @@ func (m *testJobMasterImpl) Tick(ctx context.Context) error {
return args.Error(0)
}

func (m *testJobMasterImpl) CloseImpl(ctx context.Context) error {
func (m *testJobMasterImpl) CloseImpl(ctx context.Context) {
m.mu.Lock()
defer m.mu.Unlock()

args := m.Called(ctx)
return args.Error(0)
m.Called(ctx)
}

func (m *testJobMasterImpl) StopImpl(ctx context.Context) error {
func (m *testJobMasterImpl) StopImpl(ctx context.Context) {
m.mu.Lock()
defer m.mu.Unlock()

args := m.Called(ctx)
return args.Error(0)
m.Called(ctx)
}

func (m *testJobMasterImpl) OnMasterRecovered(ctx context.Context) error {
Expand Down Expand Up @@ -263,7 +261,7 @@ func TestBaseJobMasterBasics(t *testing.T) {
jobMaster.ExpectedCalls = nil
jobMaster.Calls = nil

jobMaster.On("CloseImpl", mock.Anything).Return(nil)
jobMaster.On("CloseImpl", mock.Anything).Return()
jobMaster.mu.Unlock()

status := jobMaster.Status()
Expand Down Expand Up @@ -400,7 +398,7 @@ func TestJobMasterExit(t *testing.T) {
jobMaster.ExpectedCalls = nil
jobMaster.Calls = nil

jobMaster.On("CloseImpl", mock.Anything).Return(nil)
jobMaster.On("CloseImpl", mock.Anything).Return()
jobMaster.mu.Unlock()

// test exit status
Expand Down Expand Up @@ -442,7 +440,7 @@ func TestJobMasterInitReturnError(t *testing.T) {
// clean status
jobMaster.ExpectedCalls = nil
jobMaster.Calls = nil
jobMaster.On("CloseImpl", mock.Anything).Return(nil)
jobMaster.On("CloseImpl", mock.Anything).Return()
jobMaster.mu.Unlock()

err = jobMaster.base.Close(ctx)
Expand Down Expand Up @@ -495,7 +493,7 @@ func TestJobMasterPollReturnError(t *testing.T) {
// clean status
jobMaster.ExpectedCalls = nil
jobMaster.Calls = nil
jobMaster.On("CloseImpl", mock.Anything).Return(nil)
jobMaster.On("CloseImpl", mock.Anything).Return()
jobMaster.mu.Unlock()

err = jobMaster.base.Close(ctx)
Expand Down
6 changes: 2 additions & 4 deletions engine/framework/fake/fake_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,15 +493,13 @@ func (m *Master) OnWorkerStatusUpdated(worker framework.WorkerHandle, newStatus
}

// CloseImpl implements MasterImpl.CloseImpl
func (m *Master) CloseImpl(ctx context.Context) error {
func (m *Master) CloseImpl(ctx context.Context) {
log.Info("FakeMaster: Close", zap.Stack("stack"))
return nil
}

// StopImpl implements MasterImpl.StopImpl
func (m *Master) StopImpl(ctx context.Context) error {
func (m *Master) StopImpl(ctx context.Context) {
log.Info("FakeMaster: Stop", zap.Stack("stack"))
return nil
}

// OnMasterMessage implements MasterImpl.OnMasterMessage
Expand Down
3 changes: 1 addition & 2 deletions engine/framework/fake/fake_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,12 @@ func (d *dummyWorker) OnMasterMessage(ctx context.Context, topic p2p.Topic, mess
return nil
}

func (d *dummyWorker) CloseImpl(ctx context.Context) error {
func (d *dummyWorker) CloseImpl(ctx context.Context) {
if atomic.CompareAndSwapInt32(&d.closed, 0, 1) {
if d.cancel != nil {
d.cancel()
}
}
return nil
}

func (d *dummyWorker) setState(code frameModel.WorkerState) {
Expand Down
105 changes: 78 additions & 27 deletions engine/framework/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,37 +67,96 @@ type Master interface {
// MasterImpl defines the interface to implement a master, business logic can be
// added in the functions of this interface
type MasterImpl interface {
// InitImpl provides customized logic for the business logic to initialize.
// InitImpl will not be called if the master recovers from an error.
// InitImpl is called at the first time the MasterImpl instance is initialized
// after OnOpenAPIInitialized. When InitImpl returns without error, framework
// will try to persist an internal state so further failover will call OnMasterRecovered
// rather than InitImpl.
// Return:
// - error to let the framework call CloseImpl, and framework may retry InitImpl
// later for some times. For non-retryable failure, business logic should
// call Exit.
// Concurrent safety:
// - this function is not concurrent with other callbacks.
InitImpl(ctx context.Context) error

// Tick is called on a fixed interval.
Tick(ctx context.Context) error

// OnMasterRecovered is called when the master has recovered from an error.
// OnMasterRecovered is called when the MasterImpl instance has failover from
// error by framework. For this MasterImpl instance, it's called after OnOpenAPIInitialized.
// Return:
// - error to let the framework call CloseImpl.
// Concurrent safety:
// - this function is not concurrent with other callbacks.
OnMasterRecovered(ctx context.Context) error

// OnWorkerDispatched is called when a request to launch a worker is finished.
// Tick is called on a fixed interval after MasterImpl's InitImpl or OnMasterRecovered,
// business logic can do some periodic tasks here.
// Return:
// - error to let the framework call CloseImpl.
// Concurrent safety:
// - this function may be concurrently called with other callbacks except for
// Tick itself, OnOpenAPIInitialized, InitImpl, OnMasterRecovered, CloseImpl,
// StopImpl.
Tick(ctx context.Context) error

// OnWorkerDispatched is called when the asynchronized action of CreateWorker
// is finished. Only after OnWorkerDispatched, OnWorkerOnline and OnWorkerStatusUpdated
// of the same worker may be called.
// Return:
// - error to let the framework call CloseImpl.
// Concurrent safety:
// - this function may be concurrently called with another worker's OnWorkerXXX,
// Tick, CloseImpl, StopImpl, OnCancel.
OnWorkerDispatched(worker WorkerHandle, result error) error

// OnWorkerOnline is called when the first heartbeat for a worker is received.
// Only after OnWorkerOnline, OnWorkerOffline of the same worker may be called.
// Return:
// - error to let the framework call CloseImpl.
// Concurrent safety:
// - this function may be concurrently called with another worker's OnWorkerXXX,
// Tick, CloseImpl, StopImpl, OnCancel, the same worker's OnWorkerStatusUpdated.
OnWorkerOnline(worker WorkerHandle) error

// OnWorkerOffline is called when a worker exits or has timed out.
// Worker exit scenario contains normal finish and manually stop
// OnWorkerOffline is called as the consequence of worker's Exit or heartbeat
// timed out. It's the last callback function among OnWorkerXXX for a worker.
// Return:
// - error to let the framework call CloseImpl.
// Concurrent safety:
// - this function may be concurrently called with another worker's OnWorkerXXX,
// Tick, CloseImpl, StopImpl, OnCancel.
OnWorkerOffline(worker WorkerHandle, reason error) error

// OnWorkerMessage is called when a customized message is received.
OnWorkerMessage(worker WorkerHandle, topic p2p.Topic, message interface{}) error

// OnWorkerStatusUpdated is called when a worker's status is updated.
// OnWorkerStatusUpdated is called as the consequence of worker's UpdateStatus.
// Return:
// - error to let the framework call CloseImpl.
// Concurrent safety:
// - this function may be concurrently called with another worker's OnWorkerXXX,
// Tick, CloseImpl, StopImpl, OnCancel, the same worker's OnWorkerOnline.
OnWorkerStatusUpdated(worker WorkerHandle, newStatus *frameModel.WorkerStatus) error

// CloseImpl is called when the master is being closed
CloseImpl(ctx context.Context) error

// StopImpl is called when the master is being canceled
StopImpl(ctx context.Context) error
// CloseImpl is called as the consequence of returning error from InitImpl,
// OnMasterRecovered or Tick, the Tick will be stopped after entering this function.
// And framework may try to create a new masterImpl instance afterwards.
// Business logic is expected to release resources here, but business developer
// should be aware that when the runtime is crashed, CloseImpl has no time to
// be called.
// TODO: no other callbacks will be called after and concurrent with CloseImpl
// Concurrent safety:
// - this function may be concurrently called with OnWorkerMessage, OnCancel,
// OnWorkerDispatched, OnWorkerOnline, OnWorkerOffline, OnWorkerStatusUpdated.
CloseImpl(ctx context.Context)

// StopImpl is called the consequence of business logic calls Exit. Tick will
// be stopped after entering this function, and framework will treat this MasterImpl
// as non-recoverable,
// There's at most one invocation to StopImpl after Exit. If the runtime is
// crashed, StopImpl has no time to be called.
// Concurrent safety:
// - this function may be concurrently called with OnWorkerMessage, OnCancel,
// OnWorkerDispatched, OnWorkerOnline, OnWorkerOffline, OnWorkerStatusUpdated.
StopImpl(ctx context.Context)
}

const (
Expand Down Expand Up @@ -539,25 +598,17 @@ func (m *DefaultBaseMaster) doClose() {

// Close implements BaseMaster.Close
func (m *DefaultBaseMaster) Close(ctx context.Context) error {
err := m.Impl.CloseImpl(ctx)
// We don't return here if CloseImpl return error to ensure
// that we can close inner resources of the framework
if err != nil {
m.Logger().Error("Failed to close MasterImpl", zap.Error(err))
}
m.Impl.CloseImpl(ctx)

m.persistMetaError()
m.doClose()
return errors.Trace(err)
return nil
}

// Stop implements Master.Stop
func (m *DefaultBaseMaster) Stop(ctx context.Context) error {
err := m.Impl.StopImpl(ctx)
if err != nil {
m.Logger().Error("stop master impl failed", zap.Error(err))
}
return err
m.Impl.StopImpl(ctx)
return nil
}

// refreshMetadata load and update metadata by current epoch, nodeID, advertiseAddr, etc.
Expand Down
Loading

0 comments on commit 7a9b9f9

Please sign in to comment.