Skip to content

Commit

Permalink
Merge pull request agola-io#224 from sgotti/executor_use_cancellable_…
Browse files Browse the repository at this point in the history
…context_in_executetask

executor: use cancellable context in executetask
  • Loading branch information
sgotti authored Mar 2, 2020
2 parents 72c7f4b + 2a35132 commit b8ce97e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 16 deletions.
6 changes: 5 additions & 1 deletion internal/services/executor/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,11 @@ func (dp *DockerPod) Exec(ctx context.Context, execConfig *ExecConfig) (Containe

func (e *DockerContainerExec) Wait(ctx context.Context) (int, error) {
// ignore error, we'll use the exit code of the exec
<-e.endCh
select {
case <-ctx.Done():
return 0, ctx.Err()
case <-e.endCh:
}

var exitCode int
for {
Expand Down
48 changes: 33 additions & 15 deletions internal/services/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ func (e *Executor) sendExecutorTaskStatus(ctx context.Context, et *types.Executo
return err
}

func (e *Executor) executeTask(ctx context.Context, rt *runningTask) {
func (e *Executor) executeTask(rt *runningTask) {
// * save in local state that we have a running task
// * start the pod
// * then update the executortask status to in-progress
Expand All @@ -733,10 +733,21 @@ func (e *Executor) executeTask(ctx context.Context, rt *runningTask) {
// have an in progress running task

rt.Lock()
ctx := rt.ctx

// wait for context to be done and then stop the pod if running
go func() {
<-ctx.Done()
if rt.pod != nil {
if err := rt.pod.Stop(context.Background()); err != nil {
log.Errorf("error stopping the pod: %+v", err)
}
}
}()

defer func() {
rt.Lock()
rt.executing = false
rt.cancel()
rt.Unlock()
}()

Expand Down Expand Up @@ -1070,9 +1081,12 @@ func (e *Executor) executorTasksStatusSenderLoop(ctx context.Context) {
}

// remove running task if send was successful and it's not executing
if !rt.executing {
select {
case <-ctx.Done():
e.runningTasks.delete(rtID)
default:
}

rt.Unlock()
}

Expand Down Expand Up @@ -1116,14 +1130,18 @@ func (e *Executor) tasksUpdater(ctx context.Context) error {
e.taskUpdater(ctx, et)
}

// remove runningTasks not existing in the runservice
// stop and remove runningTasks not existing in the runservice
etIDsMap := map[string]struct{}{}
for _, et := range ets {
etIDsMap[et.ID] = struct{}{}
}

for _, rtID := range e.runningTasks.ids() {
if _, ok := etIDsMap[rtID]; !ok {
if _, ok := etIDsMap[rtID]; ok {
continue
}
if rt, ok := e.runningTasks.get(rtID); ok {
rt.cancel()
e.runningTasks.delete(rtID)
}
}
Expand All @@ -1145,11 +1163,8 @@ func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) {
if !rt.et.Spec.Stop && et.Spec.Stop {
rt.et.Spec.Stop = et.Spec.Stop

if !rt.et.Status.Phase.IsFinished() && rt.pod != nil {
if err := rt.pod.Stop(ctx); err != nil {
log.Errorf("err: %+v", err)
}
}
// cancel the running task
rt.cancel()
}
rt.Unlock()

Expand Down Expand Up @@ -1193,17 +1208,19 @@ func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) {
if activeTasks > e.c.ActiveTasksLimit {
return
}
rtCtx, rtCancel := context.WithCancel(ctx)
rt := &runningTask{
et: et,
executing: true,
et: et,
ctx: rtCtx,
cancel: rtCancel,
}

if !e.runningTasks.addIfNotExists(et.ID, rt) {
log.Warnf("task %s already running, this shouldn't happen", et.ID)
return
}

go e.executeTask(ctx, rt)
go e.executeTask(rt)
}
}

Expand Down Expand Up @@ -1266,10 +1283,11 @@ type runningTasks struct {
type runningTask struct {
sync.Mutex

ctx context.Context
cancel context.CancelFunc

et *types.ExecutorTask
pod driver.Pod

executing bool
}

func (r *runningTasks) get(rtID string) (*runningTask, bool) {
Expand Down

0 comments on commit b8ce97e

Please sign in to comment.