From 706ae02ebf2e590650e04d824de964c112288864 Mon Sep 17 00:00:00 2001 From: TommyLike Date: Tue, 7 May 2019 14:58:44 +0800 Subject: [PATCH 1/4] Fix combine volume feature issues&Adding testcase --- example/job.yaml | 8 +- .../volcano/templates/batch_v1alpha1_job.yaml | 43 +++---- pkg/apis/batch/v1alpha1/job.go | 2 + pkg/controllers/job/helpers/helpers.go | 14 ++- pkg/controllers/job/job_controller_actions.go | 114 ++++++------------ pkg/controllers/job/state/pending.go | 4 +- test/e2e/job_controlled_resource.go | 71 +++++++++++ test/e2e/util.go | 3 + 8 files changed, 143 insertions(+), 116 deletions(-) create mode 100644 test/e2e/job_controlled_resource.go diff --git a/example/job.yaml b/example/job.yaml index 8a1e87ab5a..ace1d986f6 100644 --- a/example/job.yaml +++ b/example/job.yaml @@ -8,10 +8,10 @@ spec: policies: - event: PodEvicted action: RestartJob - input: - mountPath: "/myinput" - output: - mountPath: "/myoutput" + volumes: + - mountPath: "/myinput" + - mountPath: "/myoutput" + volumeClaimName: "testvolumeclaimname" volumeClaim: accessModes: [ "ReadWriteOnce" ] storageClassName: "my-storage-class" diff --git a/installer/chart/volcano/templates/batch_v1alpha1_job.yaml b/installer/chart/volcano/templates/batch_v1alpha1_job.yaml index eb463bb248..ad397c069e 100644 --- a/installer/chart/volcano/templates/batch_v1alpha1_job.yaml +++ b/installer/chart/volcano/templates/batch_v1alpha1_job.yaml @@ -32,36 +32,27 @@ spec: description: Specification of the desired behavior of a cron job, including the minAvailable properties: - input: - description: The volume mount for input of Job - properties: - volumeClaim: - description: VolumeClaim defines the PVC used by the VolumeMount. - type: object - mountPath: - description: Path within the container at which the volume should - be mounted. Must not contain ':'. - type: string - required: - - mountPath - type: object + volumes: + description: The volumes for Job + items: + properties: + volumeClaim: + description: VolumeClaim defines the PVC used by the VolumeMount. + type: object + mountPath: + description: Path within the container at which the volume should be mounted. + Must not contain ':'. + type: string + volumeClaimName: + description: The name of the volume claim. + type: object + required: + - mountPath + type: array minAvailable: description: The minimal available pods to run for this Job format: int32 type: integer - output: - description: The volume mount for output of Job - properties: - volumeClaim: - description: VolumeClaim defines the PVC used by the VolumeMount. - type: object - mountPath: - description: Path within the container at which the volume should - be mounted. Must not contain ':'. - type: string - required: - - mountPath - type: object policies: description: Specifies the default lifecycle of tasks items: diff --git a/pkg/apis/batch/v1alpha1/job.go b/pkg/apis/batch/v1alpha1/job.go index 84b4f576eb..a9406dd365 100644 --- a/pkg/apis/batch/v1alpha1/job.go +++ b/pkg/apis/batch/v1alpha1/job.go @@ -90,6 +90,8 @@ type JobEvent string const ( CommandIssued JobEvent = "CommandIssued" PluginError JobEvent = "PluginError" + PVCError JobEvent = "PVCError" + PodGroupError JobEvent = "PodGroupError" ) // Event represent the phase of Job, e.g. pod-failed. diff --git a/pkg/controllers/job/helpers/helpers.go b/pkg/controllers/job/helpers/helpers.go index ca44a7cadc..10fd9a7dde 100644 --- a/pkg/controllers/job/helpers/helpers.go +++ b/pkg/controllers/job/helpers/helpers.go @@ -18,15 +18,15 @@ package helpers import ( "fmt" + "k8s.io/api/core/v1" "math/rand" "strings" "time" - - v1 "k8s.io/api/core/v1" ) const ( - PodNameFmt = "%s-%s-%d" + PodNameFmt = "%s-%s-%d" + VolumeClaimFmt = "%s-volume-%s" ) func GetTaskIndex(pod *v1.Pod) string { @@ -42,13 +42,17 @@ func MakePodName(jobName string, taskName string, index int) string { return fmt.Sprintf(PodNameFmt, jobName, taskName, index) } -func GenRandomStr(l int) string { +func genRandomStr(l int) string { str := "0123456789abcdefghijklmnopqrstuvwxyz" bytes := []byte(str) - result := []byte{} + var result []byte r := rand.New(rand.NewSource(time.Now().UnixNano())) for i := 0; i < l; i++ { result = append(result, bytes[r.Intn(len(bytes))]) } return string(result) } + +func MakeVolumeClaimName(jobName string) string { + return fmt.Sprintf(VolumeClaimFmt, jobName, genRandomStr(12)) +} diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index e3fcd736a6..3ff0824a3c 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -138,17 +138,13 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt return nil } -func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateStatusFn) error { +func (cc *Controller) createJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error { glog.V(3).Infof("Starting to create Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name) defer glog.V(3).Infof("Finished Job <%s/%s> create", jobInfo.Job.Namespace, jobInfo.Job.Name) job := jobInfo.Job.DeepCopy() glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name) - if update, err := cc.filljob(job); err != nil || update { - return err - } - if err := cc.pluginOnJobAdd(job); err != nil { cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PluginError), fmt.Sprintf("Execute plugin when job add failed, err: %v", err)) @@ -156,22 +152,31 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta } if err := cc.createPodGroupIfNotExist(job); err != nil { + cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PodGroupError), + fmt.Sprintf("Failed to create PodGroup, err: %v", err)) return err } - if err := cc.createJobIOIfNotExist(job); err != nil { + err, job := cc.createJobIOIfNotExist(job) + if err != nil { + cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PVCError), + fmt.Sprintf("Failed to create PVC, err: %v", err)) return err } + if updateStatus != nil { + updateStatus(&job.Status) + } + if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil { glog.Errorf("Failed to update status of Job %v/%v: %v", job.Namespace, job.Name, err) return err } else { - if e := cc.cache.Update(job); e != nil { + if err := cc.cache.Update(job); err != nil { glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v", - job.Namespace, job.Name, e) - return e + job.Namespace, job.Name, err) + return err } } @@ -329,24 +334,21 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt return nil } -func (cc *Controller) calculateVersion(current int32, bumpVersion bool) int32 { - if current == 0 { - current += 1 - } - if bumpVersion { - current += 1 - } - return current -} - -func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error { +func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) (error, *vkv1.Job) { // If PVC does not exist, create them for Job. + var needUpdate bool volumes := job.Spec.Volumes - for _, volume := range volumes { + for index, volume := range volumes { vcName := volume.VolumeClaimName + if len(vcName) == 0 { + //If volume claim name doesn't exist, generate a new one,ignore the case when duplicated names are generated. + vcName = vkjobhelpers.MakeVolumeClaimName(job.Name) + job.Spec.Volumes[index].VolumeClaimName = vcName + needUpdate = true + } exist, err := cc.checkPVCExist(job, vcName) if err != nil { - return err + return err, nil } if !exist { if job.Status.ControlledResources == nil { @@ -354,7 +356,7 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error { } if volume.VolumeClaim != nil { if err := cc.createPVC(job, vcName, volume.VolumeClaim); err != nil { - return err + return err, nil } job.Status.ControlledResources["volume-pvc-"+vcName] = vcName } else { @@ -362,37 +364,17 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error { } } } - return nil -} - -func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (bool, *vkv1.Job, error) { - // If VolumeClaimName does not exist, generate them for Job. - var newJob *vkv1.Job - volumes := job.Spec.Volumes - update := false - for index, volume := range volumes { - vcName := volume.VolumeClaimName - if len(vcName) == 0 { - for { - randomStr := vkjobhelpers.GenRandomStr(12) - vcName = fmt.Sprintf("%s-volume-%s", job.Name, randomStr) - exist, err := cc.checkPVCExist(job, vcName) - if err != nil { - return false, nil, err - } - if exist { - continue - } - if newJob == nil { - newJob = job.DeepCopy() - } - newJob.Spec.Volumes[index].VolumeClaimName = vcName - update = true - break - } + if needUpdate { + newJob, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(job) + if err != nil { + glog.Errorf("Failed to update Job %v/%v for volume claim name: %v ", + job.Namespace, job.Name, err) + return err, nil + } else { + return nil, newJob } } - return update, newJob, nil + return nil, job } func (cc *Controller) checkPVCExist(job *vkv1.Job, vcName string) (bool, error) { @@ -506,31 +488,3 @@ func (cc *Controller) calcPGMinResources(job *vkv1.Job) *v1.ResourceList { return minAvailableTasksRes.Convert2K8sResource() } - -func (cc *Controller) filljob(job *vkv1.Job) (bool, error) { - update, newJob, err := cc.needUpdateForVolumeClaim(job) - if err != nil { - return false, err - } - if update { - if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil { - glog.Errorf("Failed to update Job %v/%v: %v", - job.Namespace, job.Name, err) - return false, err - } - return true, nil - } else if job.Status.State.Phase == "" { - job.Status.State.Phase = vkv1.Pending - if j, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil { - glog.Errorf("Failed to update status of Job %v/%v: %v", - job.Namespace, job.Name, err) - } else { - if e := cc.cache.Update(j); e != nil { - glog.Error("Failed to update cache status of Job %v/%v: %v", job.Namespace, job.Name, e) - } - } - return true, nil - } - - return false, nil -} diff --git a/pkg/controllers/job/state/pending.go b/pkg/controllers/job/state/pending.go index 0595023daa..f3bb4b501f 100644 --- a/pkg/controllers/job/state/pending.go +++ b/pkg/controllers/job/state/pending.go @@ -67,6 +67,8 @@ func (ps *pendingState) Execute(action vkv1.Action) error { status.State.Phase = phase }) default: - return CreateJob(ps.job, nil) + return CreateJob(ps.job, func(status *vkv1.JobStatus) { + status.State.Phase = vkv1.Pending + }) } } diff --git a/test/e2e/job_controlled_resource.go b/test/e2e/job_controlled_resource.go new file mode 100644 index 0000000000..f6277f872d --- /dev/null +++ b/test/e2e/job_controlled_resource.go @@ -0,0 +1,71 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/apis/meta/v1" + "volcano.sh/volcano/pkg/apis/batch/v1alpha1" +) + +var _ = Describe("Job E2E Test: Test Job PVCs", func() { + It("Generate PVC name if not specified", func() { + jobName := "job-pvc-name-empty" + namespace := "test" + taskName := "task" + pvcName := "specifiedpvcname" + context := initTestContext() + defer cleanupTestContext(context) + + job := createJob(context, &jobSpec{ + namespace: namespace, + name: jobName, + tasks: []taskSpec{ + { + img: defaultNginxImage, + req: oneCPU, + min: 1, + rep: 1, + name: taskName, + }, + }, + volumes: []v1alpha1.VolumeSpec{ + { + MountPath: "/mountone", + VolumeClaimName: pvcName, + }, + { + MountPath: "/mounttwo", + }, + }, + }) + + err := waitJobReady(context, job) + Expect(err).NotTo(HaveOccurred()) + + job, err = context.vkclient.BatchV1alpha1().Jobs(namespace).Get(jobName, v1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + + Expect(len(job.Spec.Volumes)).To(Equal(2), + "Two volumes should be created") + for _, volume := range job.Spec.Volumes { + Expect(volume.VolumeClaimName).Should(Or(ContainSubstring(jobName), Equal(pvcName)), + "PVC name should be generated for manually specified.") + } + }) +}) diff --git a/test/e2e/util.go b/test/e2e/util.go index a8a33a67ca..cd9149889d 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -276,6 +276,7 @@ type jobSpec struct { policies []vkv1.LifecyclePolicy min int32 plugins map[string][]string + volumes []vkv1.VolumeSpec } func getNS(context *context, job *jobSpec) string { @@ -358,6 +359,8 @@ func createJobInner(context *context, jobSpec *jobSpec) (*vkv1.Job, error) { job.Spec.MinAvailable = min } + job.Spec.Volumes = jobSpec.volumes + return context.vkclient.BatchV1alpha1().Jobs(job.Namespace).Create(job) } From 8b779d2bb6dd936631f146361107715fec8c8b6c Mon Sep 17 00:00:00 2001 From: TommyLike Date: Wed, 8 May 2019 11:05:56 +0800 Subject: [PATCH 2/4] Add loop for generate names --- pkg/controllers/job/job_controller_actions.go | 34 +++++++++++++------ 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index 3ff0824a3c..129524bc46 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -336,21 +336,35 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) (error, *vkv1.Job) { // If PVC does not exist, create them for Job. - var needUpdate bool + var needUpdate, nameExist bool volumes := job.Spec.Volumes for index, volume := range volumes { + nameExist = false vcName := volume.VolumeClaimName if len(vcName) == 0 { - //If volume claim name doesn't exist, generate a new one,ignore the case when duplicated names are generated. - vcName = vkjobhelpers.MakeVolumeClaimName(job.Name) - job.Spec.Volumes[index].VolumeClaimName = vcName - needUpdate = true - } - exist, err := cc.checkPVCExist(job, vcName) - if err != nil { - return err, nil + //NOTE(k82cn): Ensure never have duplicated generated names. + for { + vcName = vkjobhelpers.MakeVolumeClaimName(job.Name) + exist, err := cc.checkPVCExist(job, vcName) + if err != nil { + return err, nil + } + if exist { + continue + } + job.Spec.Volumes[index].VolumeClaimName = vcName + needUpdate = true + break + } + } else { + exist, err := cc.checkPVCExist(job, vcName) + if err != nil { + return err, nil + } + nameExist = exist } - if !exist { + + if !nameExist { if job.Status.ControlledResources == nil { job.Status.ControlledResources = make(map[string]string) } From b43736b65777bc3de65e594741cfefcf0c4c420d Mon Sep 17 00:00:00 2001 From: TommyLike Date: Wed, 8 May 2019 15:06:30 +0800 Subject: [PATCH 3/4] Update volcano crds --- .../integrations/tensorflow/tf-example.yaml | 1 + .../duplicatedPolicyEvent-webhook-deny.yaml | 0 .../duplicatedTaskName-webhook-deny.yaml | 0 .../minAvailable-webhook-deny.yaml | 0 example/job.yaml | 56 +++++++++-------- example/kube-batch-conf.yaml | 11 ---- example/openmpi-hello.yaml | 56 ----------------- example/role.yaml | 12 ---- example/tensorflow-benchmark.yaml | 61 ------------------- .../volcano/templates/batch_v1alpha1_job.yaml | 21 +++++++ 10 files changed, 53 insertions(+), 165 deletions(-) rename example/{ => invalid_jobs}/duplicatedPolicyEvent-webhook-deny.yaml (100%) rename example/{ => invalid_jobs}/duplicatedTaskName-webhook-deny.yaml (100%) rename example/{ => invalid_jobs}/minAvailable-webhook-deny.yaml (100%) delete mode 100644 example/kube-batch-conf.yaml delete mode 100644 example/openmpi-hello.yaml delete mode 100644 example/role.yaml delete mode 100644 example/tensorflow-benchmark.yaml diff --git a/example/integrations/tensorflow/tf-example.yaml b/example/integrations/tensorflow/tf-example.yaml index d2090bbf68..05bc67d781 100644 --- a/example/integrations/tensorflow/tf-example.yaml +++ b/example/integrations/tensorflow/tf-example.yaml @@ -37,6 +37,7 @@ spec: minAvailable: 2 schedulerName: kube-batch plugins: + env: [] svc: [] policies: - event: PodEvicted diff --git a/example/duplicatedPolicyEvent-webhook-deny.yaml b/example/invalid_jobs/duplicatedPolicyEvent-webhook-deny.yaml similarity index 100% rename from example/duplicatedPolicyEvent-webhook-deny.yaml rename to example/invalid_jobs/duplicatedPolicyEvent-webhook-deny.yaml diff --git a/example/duplicatedTaskName-webhook-deny.yaml b/example/invalid_jobs/duplicatedTaskName-webhook-deny.yaml similarity index 100% rename from example/duplicatedTaskName-webhook-deny.yaml rename to example/invalid_jobs/duplicatedTaskName-webhook-deny.yaml diff --git a/example/minAvailable-webhook-deny.yaml b/example/invalid_jobs/minAvailable-webhook-deny.yaml similarity index 100% rename from example/minAvailable-webhook-deny.yaml rename to example/invalid_jobs/minAvailable-webhook-deny.yaml diff --git a/example/job.yaml b/example/job.yaml index ace1d986f6..c5247f8e7b 100644 --- a/example/job.yaml +++ b/example/job.yaml @@ -6,30 +6,36 @@ spec: minAvailable: 3 schedulerName: kube-batch policies: - - event: PodEvicted - action: RestartJob + - event: PodEvicted + action: RestartJob + plugins: + ssh: [] + env: [] + svc: [] + maxRetry: 5 + queue: default volumes: - - mountPath: "/myinput" - - mountPath: "/myoutput" - volumeClaimName: "testvolumeclaimname" - volumeClaim: - accessModes: [ "ReadWriteOnce" ] - storageClassName: "my-storage-class" - resources: - requests: - storage: 1Gi + - mountPath: "/myinput" + - mountPath: "/myoutput" + volumeClaimName: "testvolumeclaimname" + volumeClaim: + accessModes: [ "ReadWriteOnce" ] + storageClassName: "my-storage-class" + resources: + requests: + storage: 1Gi tasks: - - replicas: 6 - name: "default-nginx" - template: - metadata: - name: web - spec: - containers: - - image: nginx - imagePullPolicy: IfNotPresent - name: nginx - resources: - requests: - cpu: "1" - restartPolicy: OnFailure + - replicas: 6 + name: "default-nginx" + template: + metadata: + name: web + spec: + containers: + - image: nginx + imagePullPolicy: IfNotPresent + name: nginx + resources: + requests: + cpu: "1" + restartPolicy: OnFailure diff --git a/example/kube-batch-conf.yaml b/example/kube-batch-conf.yaml deleted file mode 100644 index add2e0b890..0000000000 --- a/example/kube-batch-conf.yaml +++ /dev/null @@ -1,11 +0,0 @@ -actions: "enqueue, reclaim, allocate, backfill, preempt" -tiers: - - plugins: - - name: priority - - name: gang - - name: conformance - - plugins: - - name: drf - - name: predicates - - name: proportion - - name: nodeorder diff --git a/example/openmpi-hello.yaml b/example/openmpi-hello.yaml deleted file mode 100644 index 5f136c736a..0000000000 --- a/example/openmpi-hello.yaml +++ /dev/null @@ -1,56 +0,0 @@ -apiVersion: batch.volcano.sh/v1alpha1 -kind: Job -metadata: - name: openmpi-hello -spec: - minAvailable: 3 - schedulerName: scheduler - plugins: - ssh: [] - env: [] - svc: [] - tasks: - - replicas: 1 - name: mpimaster - policies: - - event: TaskCompleted - action: CompleteJob - template: - spec: - imagePullSecrets: - - name: default-secret - containers: - - command: - - /bin/sh - - -c - - | - MPI_HOST=`cat /etc/volcano/mpiworker.host | tr "\n" ","`; - mkdir -p /var/run/sshd; /usr/sbin/sshd; - mpiexec --allow-run-as-root --host ${MPI_HOST} -np 2 mpi_hello_world > /home/re - image: 100.125.5.235:20202/l00427178/openmpi-hello:3.28 - name: mpimaster - ports: - - containerPort: 22 - name: mpijob-port - workingDir: /home - restartPolicy: OnFailure - - replicas: 2 - name: mpiworker - template: - spec: - imagePullSecrets: - - name: default-secret - containers: - - command: - - /bin/sh - - -c - - | - mkdir -p /var/run/sshd; /usr/sbin/sshd -D; - image: 100.125.5.235:20202/l00427178/openmpi-hello:3.28 - name: mpiworker - ports: - - containerPort: 22 - name: mpijob-port - workingDir: /home - restartPolicy: OnFailure - diff --git a/example/role.yaml b/example/role.yaml deleted file mode 100644 index fe6bdfacc0..0000000000 --- a/example/role.yaml +++ /dev/null @@ -1,12 +0,0 @@ -apiVersion: rbac.authorization.k8s.io/v1beta1 -kind: ClusterRoleBinding -metadata: - name: default-as-admin -subjects: - - kind: ServiceAccount - name: default - namespace: kube-system -roleRef: - kind: ClusterRole - name: cluster-admin - apiGroup: rbac.authorization.k8s.io diff --git a/example/tensorflow-benchmark.yaml b/example/tensorflow-benchmark.yaml deleted file mode 100644 index 49912a57e3..0000000000 --- a/example/tensorflow-benchmark.yaml +++ /dev/null @@ -1,61 +0,0 @@ -apiVersion: batch.volcano.sh/v1alpha1 -kind: Job -metadata: - name: tensorflow-benchmark -spec: - minAvailable: 5 - schedulerName: scheduler - plugins: - env: [] - svc: [] - policies: - - event: PodEvicted - action: RestartJob - tasks: - - replicas: 2 - name: ps - template: - spec: - imagePullSecrets: - - name: default-secret - containers: - - command: - - sh - - -c - - | - PS_HOST=`cat /etc/volcano/ps.host | sed 's/$/&:2222/g' | tr "\n" ","`; - WORKER_HOST=`cat /etc/volcano/worker.host | sed 's/$/&:2222/g' | tr "\n" ","`; - python tf_cnn_benchmarks.py --batch_size=32 --model=resnet50 --variable_update=parameter_server --flush_stdout=true --num_gpus=1 --local_parameter_device=cpu --device=cpu --data_format=NHWC --job_name=ps --task_index=${VK_TASK_INDEX} --ps_hosts=${PS_HOST} --worker_hosts=${WORKER_HOST} - image: 100.125.5.235:20202/l00427178/tf-benchmarks-cpu:v20171202-bdab599-dirty-284af3 - name: tensorflow - ports: - - containerPort: 2222 - name: tfjob-port - resources: {} - workingDir: /opt/tf-benchmarks/scripts/tf_cnn_benchmarks - restartPolicy: OnFailure - - replicas: 3 - name: worker - policies: - - event: TaskCompleted - action: CompleteJob - template: - spec: - imagePullSecrets: - - name: default-secret - containers: - - command: - - sh - - -c - - | - PS_HOST=`cat /etc/volcano/ps.host | sed 's/$/&:2222/g' | tr "\n" ","`; - WORKER_HOST=`cat /etc/volcano/worker.host | sed 's/$/&:2222/g' | tr "\n" ","`; - python tf_cnn_benchmarks.py --batch_size=32 --model=resnet50 --variable_update=parameter_server --flush_stdout=true --num_gpus=1 --local_parameter_device=cpu --device=cpu --data_format=NHWC --job_name=worker --task_index=${VK_TASK_INDEX} --ps_hosts=${PS_HOST} --worker_hosts=${WORKER_HOST} - image: 100.125.5.235:20202/l00427178/tf-benchmarks-cpu:v20171202-bdab599-dirty-284af3 - name: tensorflow - ports: - - containerPort: 2222 - name: tfjob-port - resources: {} - workingDir: /opt/tf-benchmarks/scripts/tf_cnn_benchmarks - restartPolicy: OnFailure diff --git a/installer/chart/volcano/templates/batch_v1alpha1_job.yaml b/installer/chart/volcano/templates/batch_v1alpha1_job.yaml index ad397c069e..eb54dca40b 100644 --- a/installer/chart/volcano/templates/batch_v1alpha1_job.yaml +++ b/installer/chart/volcano/templates/batch_v1alpha1_job.yaml @@ -74,6 +74,11 @@ spec: schedulerName: description: SchedulerName is the default value of `tasks.template.spec.schedulerName`. type: string + plugins: + description: Enabled task plugins when creating job. + type: object + additionalProperties: + type: array tasks: description: Tasks specifies the task specification of Job items: @@ -111,6 +116,13 @@ spec: type: object type: object type: array + queue: + description: The name of the queue on which job should been created + type: string + maxRetry: + description: The limit for retrying submiting job, default is 3 + format: int32 + type: integer type: object status: description: Current status of Job @@ -139,6 +151,15 @@ spec: description: Job's current version format: int32 type: integer + retryCount: + description: The number that volcano retried to submit the job. + format: int32 + type: integer + ControlledResources: + description: All of the resources that are controlled by this job. + type: object + additionalProperties: + type: string state: description: Current state of Job. properties: From 1f7e5cc1c4e41aad704b13b041facc3d98d7bf86 Mon Sep 17 00:00:00 2001 From: TommyLike Date: Wed, 8 May 2019 17:25:12 +0800 Subject: [PATCH 4/4] Fix gen admission secret issue --- hack/run-e2e-kind.sh | 12 +++++++++--- .../gen-admission-secret/gen-admission-secret.sh | 4 ++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/hack/run-e2e-kind.sh b/hack/run-e2e-kind.sh index a9e37d658d..796c885ae4 100755 --- a/hack/run-e2e-kind.sh +++ b/hack/run-e2e-kind.sh @@ -64,14 +64,20 @@ function install-volcano { echo "Install volcano plugin into cluster...." helm plugin install --kubeconfig ${KUBECONFIG} installer/chart/volcano/plugins/gen-admission-secret - helm gen-admission-secret --service integration-admission-service --namespace kube-system + + #If failed to generate secret for admission service, return immediately + helm gen-admission-secret --service ${CLUSTER_NAME}-admission-service --namespace kube-system + if [[ $? != 0 ]]; then + echo "Failed to install secret for admission service, usually we need a retry." + exit 1 + fi echo "Install volcano chart" - helm install installer/chart/volcano --namespace kube-system --name integration --kubeconfig ${KUBECONFIG} --set basic.image_tag_version=${TAG} + helm install installer/chart/volcano --namespace kube-system --name ${CLUSTER_NAME} --kubeconfig ${KUBECONFIG} --set basic.image_tag_version=${TAG} --wait } function uninstall-volcano { - helm delete integration --purge --kubeconfig ${KUBECONFIG} + helm delete ${CLUSTER_NAME} --purge --kubeconfig ${KUBECONFIG} } function generate-log { diff --git a/installer/chart/volcano/plugins/gen-admission-secret/gen-admission-secret.sh b/installer/chart/volcano/plugins/gen-admission-secret/gen-admission-secret.sh index da715a9dcf..84bb061fb1 100755 --- a/installer/chart/volcano/plugins/gen-admission-secret/gen-admission-secret.sh +++ b/installer/chart/volcano/plugins/gen-admission-secret/gen-admission-secret.sh @@ -108,7 +108,7 @@ done # approve and fetch the signed certificate kubectl certificate approve ${csrName} # verify certificate has been signed -for x in $(seq 15); do +for x in $(seq 20); do serverCert=$(kubectl get csr ${csrName} -o jsonpath='{.status.certificate}') if [[ ${serverCert} != '' ]]; then break @@ -116,7 +116,7 @@ for x in $(seq 15); do sleep 1 done if [[ ${serverCert} == '' ]]; then - echo "ERROR: After approving csr ${csrName}, the signed certificate did not appear on the resource. Giving up after 15 attempts." >&2 + echo "ERROR: After approving csr ${csrName}, the signed certificate did not appear on the resource. Giving up after 20 attempts." >&2 exit 1 fi echo ${serverCert} | openssl base64 -d -A -out ${tmpdir}/server-cert.pem