From 0d61ac3908629e2b6c13bf94c43a16f883e753e9 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Fri, 28 Feb 2020 10:19:03 +0100 Subject: [PATCH] executor: use cancellable context in executetask Use a cancellable context to handle running task stop. When the context is done the pod will be stopped. --- internal/services/executor/driver/docker.go | 6 ++- internal/services/executor/executor.go | 48 ++++++++++++++------- 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/internal/services/executor/driver/docker.go b/internal/services/executor/driver/docker.go index 9c2fd2c55..0092ea270 100644 --- a/internal/services/executor/driver/docker.go +++ b/internal/services/executor/driver/docker.go @@ -614,7 +614,11 @@ func (dp *DockerPod) Exec(ctx context.Context, execConfig *ExecConfig) (Containe func (e *DockerContainerExec) Wait(ctx context.Context) (int, error) { // ignore error, we'll use the exit code of the exec - <-e.endCh + select { + case <-ctx.Done(): + return 0, ctx.Err() + case <-e.endCh: + } var exitCode int for { diff --git a/internal/services/executor/executor.go b/internal/services/executor/executor.go index d006a59b9..87100fc2d 100644 --- a/internal/services/executor/executor.go +++ b/internal/services/executor/executor.go @@ -724,7 +724,7 @@ func (e *Executor) sendExecutorTaskStatus(ctx context.Context, et *types.Executo return err } -func (e *Executor) executeTask(ctx context.Context, rt *runningTask) { +func (e *Executor) executeTask(rt *runningTask) { // * save in local state that we have a running task // * start the pod // * then update the executortask status to in-progress @@ -733,10 +733,21 @@ func (e *Executor) executeTask(ctx context.Context, rt *runningTask) { // have an in progress running task rt.Lock() + ctx := rt.ctx + + // wait for context to be done and then stop the pod if running + go func() { + <-ctx.Done() + if rt.pod != nil { + if err := rt.pod.Stop(context.Background()); err != nil { + log.Errorf("error stopping the pod: %+v", err) + } + } + }() defer func() { rt.Lock() - rt.executing = false + rt.cancel() rt.Unlock() }() @@ -1070,9 +1081,12 @@ func (e *Executor) executorTasksStatusSenderLoop(ctx context.Context) { } // remove running task if send was successful and it's not executing - if !rt.executing { + select { + case <-ctx.Done(): e.runningTasks.delete(rtID) + default: } + rt.Unlock() } @@ -1116,14 +1130,18 @@ func (e *Executor) tasksUpdater(ctx context.Context) error { e.taskUpdater(ctx, et) } - // remove runningTasks not existing in the runservice + // stop and remove runningTasks not existing in the runservice etIDsMap := map[string]struct{}{} for _, et := range ets { etIDsMap[et.ID] = struct{}{} } for _, rtID := range e.runningTasks.ids() { - if _, ok := etIDsMap[rtID]; !ok { + if _, ok := etIDsMap[rtID]; ok { + continue + } + if rt, ok := e.runningTasks.get(rtID); ok { + rt.cancel() e.runningTasks.delete(rtID) } } @@ -1145,11 +1163,8 @@ func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) { if !rt.et.Spec.Stop && et.Spec.Stop { rt.et.Spec.Stop = et.Spec.Stop - if !rt.et.Status.Phase.IsFinished() && rt.pod != nil { - if err := rt.pod.Stop(ctx); err != nil { - log.Errorf("err: %+v", err) - } - } + // cancel the running task + rt.cancel() } rt.Unlock() @@ -1193,9 +1208,11 @@ func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) { if activeTasks > e.c.ActiveTasksLimit { return } + rtCtx, rtCancel := context.WithCancel(ctx) rt := &runningTask{ - et: et, - executing: true, + et: et, + ctx: rtCtx, + cancel: rtCancel, } if !e.runningTasks.addIfNotExists(et.ID, rt) { @@ -1203,7 +1220,7 @@ func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) { return } - go e.executeTask(ctx, rt) + go e.executeTask(rt) } } @@ -1266,10 +1283,11 @@ type runningTasks struct { type runningTask struct { sync.Mutex + ctx context.Context + cancel context.CancelFunc + et *types.ExecutorTask pod driver.Pod - - executing bool } func (r *runningTasks) get(rtID string) (*runningTask, bool) {