Skip to content

Commit

Permalink
add validation of event and action in policies
Browse files Browse the repository at this point in the history
  • Loading branch information
sivanzcw committed Apr 14, 2019
1 parent e3c9966 commit aae4f20
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 13 deletions.
19 changes: 11 additions & 8 deletions docs/design/job-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ const (
OutOfSyncEvent Event = "OutOfSync"
// CommandIssuedEvent is triggered if a command is raised by user
CommandIssuedEvent Event = "CommandIssued"
// TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed
TaskCompletedEvent Event = "TaskCompleted"
)

// Action is the type of event handling
Expand All @@ -217,12 +219,11 @@ const (
AbortJobAction Action = "AbortJob"
// RestartJobAction if this action is set, the whole job will be restarted
RestartJobAction Action = "RestartJob"
// RestartTaskAction if this action is set, only the task will be restarted; default action.
// This action can not work together with job level events, e.g. JobUnschedulable
RestartTaskAction Action = "RestartTask"
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
// CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
CompleteJobAction Action = "CompleteJob"

// ResumeJobAction is the action to resume an aborted job.
ResumeJobAction Action = "ResumeJob"
Expand Down Expand Up @@ -300,8 +301,8 @@ spec:
```

Some BigData framework (e.g. Spark) may have different requirements. Take Spark as example, the whole job will be restarted
if 'driver' tasks failed and only restart the task if 'executor' tasks failed. As `RestartTask` is the default action of
task events, `RestartJob` is set for driver `spec.tasks.policies` as follow.
if 'driver' tasks failed and only restart the task if 'executor' tasks failed. `OnFailure` restartPolicy is set for executor
and `RestartJob` is set for driver `spec.tasks.policies` as follow.

```yaml
apiVersion: batch.volcano.sh/v1alpha1
Expand All @@ -327,6 +328,7 @@ spec:
containers:
- name: executor
image: executor-img
restartPolicy: OnFailure
```
## Features Interaction
Expand Down Expand Up @@ -508,6 +510,8 @@ const (
OutOfSyncEvent Event = "OutOfSync"
// CommandIssuedEvent is triggered if a command is raised by user
CommandIssuedEvent Event = "CommandIssued"
// TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed
TaskCompletedEvent Event = "TaskCompleted"
)

// Action is the action that Job controller will take according to the event.
Expand All @@ -519,12 +523,11 @@ const (
AbortJobAction Action = "AbortJob"
// RestartJobAction if this action is set, the whole job will be restarted
RestartJobAction Action = "RestartJob"
// RestartTaskAction if this action is set, only the task will be restarted; default action.
// This action can not work together with job level events, e.g. JobUnschedulable
RestartTaskAction Action = "RestartTask"
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
// CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
CompleteJobAction Action = "CompleteJob"

// ResumeJobAction is the action to resume an aborted job.
ResumeJobAction Action = "ResumeJob"
Expand Down
2 changes: 0 additions & 2 deletions example/tensorflow-benchmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ spec:
policies:
- event: PodEvicted
action: RestartJob
- event: PodFailed
action: RestartTask
tasks:
- replicas: 2
name: ps
Expand Down
59 changes: 59 additions & 0 deletions pkg/admission/admission_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/validation/field"

v1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
)
Expand All @@ -43,6 +44,27 @@ type AdmitFunc func(v1beta1.AdmissionReview) *v1beta1.AdmissionResponse
var scheme = runtime.NewScheme()
var Codecs = serializer.NewCodecFactory(scheme)

// policyEventMap defines all policy events and whether to allow external use
var policyEventMap = map[v1alpha1.Event]bool{
v1alpha1.AnyEvent: true,
v1alpha1.PodFailedEvent: true,
v1alpha1.PodEvictedEvent: true,
v1alpha1.JobUnknownEvent: true,
v1alpha1.TaskCompletedEvent: true,
v1alpha1.OutOfSyncEvent: false,
v1alpha1.CommandIssuedEvent: false,
}

// policyActionMap defines all policy actions and whether to allow external use
var policyActionMap = map[v1alpha1.Action]bool{
v1alpha1.AbortJobAction: true,
v1alpha1.RestartJobAction: true,
v1alpha1.TerminateJobAction: true,
v1alpha1.CompleteJobAction: true,
v1alpha1.ResumeJobAction: true,
v1alpha1.SyncJobAction: false,
}

func init() {
addToScheme(scheme)
}
Expand Down Expand Up @@ -102,3 +124,40 @@ func DecodeJob(object runtime.RawExtension, resource metav1.GroupVersionResource

return job, nil
}

func validatePolicies(policies []v1alpha1.LifecyclePolicy, fldPath *field.Path) error {
errs := field.ErrorList{}
for _, p := range policies {
if allow, ok := policyEventMap[p.Event]; !ok || !allow {
errs = append(errs, field.Invalid(fldPath, p.Event, fmt.Sprintf("invalid policy event")))
}

if allow, ok := policyActionMap[p.Action]; !ok || !allow {
errs = append(errs, field.Invalid(fldPath, p.Action, fmt.Sprintf("invalid policy action")))
}
}

return errs.ToAggregate()
}

func getValidEvents() []v1alpha1.Event {
var events []v1alpha1.Event
for e, allow := range policyEventMap {
if allow {
events = append(events, e)
}
}

return events
}

func getValidActions() []v1alpha1.Action {
var actions []v1alpha1.Action
for a, allow := range policyActionMap {
if allow {
actions = append(actions, a)
}
}

return actions
}
11 changes: 11 additions & 0 deletions pkg/admission/admit_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"k8s.io/api/admission/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation/field"

v1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
)
Expand Down Expand Up @@ -99,6 +100,11 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
if duplicateInfo, ok := CheckPolicyDuplicate(task.Policies); ok {
msg = msg + fmt.Sprintf(" duplicated task event policies: %s;", duplicateInfo)
}

if err := validatePolicies(task.Policies, field.NewPath("spec.tasks.policies")); err != nil {
msg = msg + err.Error() + fmt.Sprintf(" valid events are %v, valid actions are %v",
getValidEvents(), getValidActions())
}
}

if totalReplicas < job.Spec.MinAvailable {
Expand All @@ -110,6 +116,11 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
msg = msg + fmt.Sprintf(" duplicated job event policies: %s;", duplicateInfo)
}

if err := validatePolicies(job.Spec.Policies, field.NewPath("spec.policies")); err != nil {
msg = msg + err.Error() + fmt.Sprintf(" valid events are %v, valid actions are %v",
getValidEvents(), getValidActions())
}

if msg != "" {
reviewResponse.Allowed = false
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/apis/batch/v1alpha1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ const (
AbortJobAction Action = "AbortJob"
// RestartJobAction if this action is set, the whole job will be restarted
RestartJobAction Action = "RestartJob"
// RestartTaskAction if this action is set, only the task will be restarted; default action.
// This action can not work together with job level events, e.g. JobUnschedulable
RestartTaskAction Action = "RestartTask"
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
Expand Down
1 change: 1 addition & 0 deletions test/e2e/mpi.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var _ = Describe("MPI E2E Test", func() {
plugins: map[string][]string{
"ssh": []string{},
"env": []string{},
"svc": []string{},
},
tasks: []taskSpec{
{
Expand Down

0 comments on commit aae4f20

Please sign in to comment.