Skip to content

Commit

Permalink
Merge branch 'dev' into 'master'
Browse files Browse the repository at this point in the history
【Issue volcano-sh#57】【BUG2019040101375 】【BUG2019032102153 】增加policy中action和events的校验,去掉RestartTask action

单号:  BUG2019040101375; BUG2019032102153 

特性/模块名称: volcano-admission

修改原因:
1. policy中RestartTask action配置后不生效
2. policy中action和event缺少校验

修改内容:
1. RestartTask未被state实现,去掉这个action
2. 增加policy中可配置action和event的校验,action的可取值为"AbortJob","RestartJob","TerminateJob","CompleteJob","ResumeJob"
    events的可取值为"*","PodFailed","PodEvicted","Unknown",“TaskCompleted”

自验情况 自验通过
验证yaml [policy.yaml](/uploads/2d684ceab9b5a399571c4aff233e790b/policy.yaml)
验证结果
![image](/uploads/13973e45fd75d45b611791fd039488be/image.png)

![image](/uploads/d0b9cb5e7a501dc87d53f78a4ac29477/image.png)




Issues info:
Issue ID: 57
Title: 【BUG2019040101375 】restartTask不生效【BUG2019032102153 】envents/action组合不合理
Issue url: CBU-PaaS/Community/volcano/volcano#57


See merge request CBU-PaaS/Community/volcano/volcano!92
  • Loading branch information
mada 00483107 committed Apr 15, 2019
2 parents e3c9966 + aae4f20 commit 6c069d4
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 13 deletions.
19 changes: 11 additions & 8 deletions docs/design/job-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ const (
OutOfSyncEvent Event = "OutOfSync"
// CommandIssuedEvent is triggered if a command is raised by user
CommandIssuedEvent Event = "CommandIssued"
// TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed
TaskCompletedEvent Event = "TaskCompleted"
)

// Action is the type of event handling
Expand All @@ -217,12 +219,11 @@ const (
AbortJobAction Action = "AbortJob"
// RestartJobAction if this action is set, the whole job will be restarted
RestartJobAction Action = "RestartJob"
// RestartTaskAction if this action is set, only the task will be restarted; default action.
// This action can not work together with job level events, e.g. JobUnschedulable
RestartTaskAction Action = "RestartTask"
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
// CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
CompleteJobAction Action = "CompleteJob"

// ResumeJobAction is the action to resume an aborted job.
ResumeJobAction Action = "ResumeJob"
Expand Down Expand Up @@ -300,8 +301,8 @@ spec:
```

Some BigData framework (e.g. Spark) may have different requirements. Take Spark as example, the whole job will be restarted
if 'driver' tasks failed and only restart the task if 'executor' tasks failed. As `RestartTask` is the default action of
task events, `RestartJob` is set for driver `spec.tasks.policies` as follow.
if 'driver' tasks failed and only restart the task if 'executor' tasks failed. `OnFailure` restartPolicy is set for executor
and `RestartJob` is set for driver `spec.tasks.policies` as follow.

```yaml
apiVersion: batch.volcano.sh/v1alpha1
Expand All @@ -327,6 +328,7 @@ spec:
containers:
- name: executor
image: executor-img
restartPolicy: OnFailure
```
## Features Interaction
Expand Down Expand Up @@ -508,6 +510,8 @@ const (
OutOfSyncEvent Event = "OutOfSync"
// CommandIssuedEvent is triggered if a command is raised by user
CommandIssuedEvent Event = "CommandIssued"
// TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed
TaskCompletedEvent Event = "TaskCompleted"
)

// Action is the action that Job controller will take according to the event.
Expand All @@ -519,12 +523,11 @@ const (
AbortJobAction Action = "AbortJob"
// RestartJobAction if this action is set, the whole job will be restarted
RestartJobAction Action = "RestartJob"
// RestartTaskAction if this action is set, only the task will be restarted; default action.
// This action can not work together with job level events, e.g. JobUnschedulable
RestartTaskAction Action = "RestartTask"
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
// CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
CompleteJobAction Action = "CompleteJob"

// ResumeJobAction is the action to resume an aborted job.
ResumeJobAction Action = "ResumeJob"
Expand Down
2 changes: 0 additions & 2 deletions example/tensorflow-benchmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ spec:
policies:
- event: PodEvicted
action: RestartJob
- event: PodFailed
action: RestartTask
tasks:
- replicas: 2
name: ps
Expand Down
59 changes: 59 additions & 0 deletions pkg/admission/admission_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/validation/field"

v1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
)
Expand All @@ -43,6 +44,27 @@ type AdmitFunc func(v1beta1.AdmissionReview) *v1beta1.AdmissionResponse
var scheme = runtime.NewScheme()
var Codecs = serializer.NewCodecFactory(scheme)

// policyEventMap defines all policy events and whether to allow external use
var policyEventMap = map[v1alpha1.Event]bool{
v1alpha1.AnyEvent: true,
v1alpha1.PodFailedEvent: true,
v1alpha1.PodEvictedEvent: true,
v1alpha1.JobUnknownEvent: true,
v1alpha1.TaskCompletedEvent: true,
v1alpha1.OutOfSyncEvent: false,
v1alpha1.CommandIssuedEvent: false,
}

// policyActionMap defines all policy actions and whether to allow external use
var policyActionMap = map[v1alpha1.Action]bool{
v1alpha1.AbortJobAction: true,
v1alpha1.RestartJobAction: true,
v1alpha1.TerminateJobAction: true,
v1alpha1.CompleteJobAction: true,
v1alpha1.ResumeJobAction: true,
v1alpha1.SyncJobAction: false,
}

func init() {
addToScheme(scheme)
}
Expand Down Expand Up @@ -102,3 +124,40 @@ func DecodeJob(object runtime.RawExtension, resource metav1.GroupVersionResource

return job, nil
}

func validatePolicies(policies []v1alpha1.LifecyclePolicy, fldPath *field.Path) error {
errs := field.ErrorList{}
for _, p := range policies {
if allow, ok := policyEventMap[p.Event]; !ok || !allow {
errs = append(errs, field.Invalid(fldPath, p.Event, fmt.Sprintf("invalid policy event")))
}

if allow, ok := policyActionMap[p.Action]; !ok || !allow {
errs = append(errs, field.Invalid(fldPath, p.Action, fmt.Sprintf("invalid policy action")))
}
}

return errs.ToAggregate()
}

func getValidEvents() []v1alpha1.Event {
var events []v1alpha1.Event
for e, allow := range policyEventMap {
if allow {
events = append(events, e)
}
}

return events
}

func getValidActions() []v1alpha1.Action {
var actions []v1alpha1.Action
for a, allow := range policyActionMap {
if allow {
actions = append(actions, a)
}
}

return actions
}
11 changes: 11 additions & 0 deletions pkg/admission/admit_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"k8s.io/api/admission/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation/field"

v1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
)
Expand Down Expand Up @@ -99,6 +100,11 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
if duplicateInfo, ok := CheckPolicyDuplicate(task.Policies); ok {
msg = msg + fmt.Sprintf(" duplicated task event policies: %s;", duplicateInfo)
}

if err := validatePolicies(task.Policies, field.NewPath("spec.tasks.policies")); err != nil {
msg = msg + err.Error() + fmt.Sprintf(" valid events are %v, valid actions are %v",
getValidEvents(), getValidActions())
}
}

if totalReplicas < job.Spec.MinAvailable {
Expand All @@ -110,6 +116,11 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
msg = msg + fmt.Sprintf(" duplicated job event policies: %s;", duplicateInfo)
}

if err := validatePolicies(job.Spec.Policies, field.NewPath("spec.policies")); err != nil {
msg = msg + err.Error() + fmt.Sprintf(" valid events are %v, valid actions are %v",
getValidEvents(), getValidActions())
}

if msg != "" {
reviewResponse.Allowed = false
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/apis/batch/v1alpha1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ const (
AbortJobAction Action = "AbortJob"
// RestartJobAction if this action is set, the whole job will be restarted
RestartJobAction Action = "RestartJob"
// RestartTaskAction if this action is set, only the task will be restarted; default action.
// This action can not work together with job level events, e.g. JobUnschedulable
RestartTaskAction Action = "RestartTask"
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
Expand Down
1 change: 1 addition & 0 deletions test/e2e/mpi.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var _ = Describe("MPI E2E Test", func() {
plugins: map[string][]string{
"ssh": []string{},
"env": []string{},
"svc": []string{},
},
tasks: []taskSpec{
{
Expand Down

0 comments on commit 6c069d4

Please sign in to comment.