diff --git a/test/e2e/fixtures/when.go b/test/e2e/fixtures/when.go index c51cf9045501..bceef8536026 100644 --- a/test/e2e/fixtures/when.go +++ b/test/e2e/fixtures/when.go @@ -35,6 +35,7 @@ type When struct { cronWorkflowName string kubeClient kubernetes.Interface resourceQuota *corev1.ResourceQuota + storageQuota *corev1.ResourceQuota configMap *corev1.ConfigMap } @@ -227,6 +228,24 @@ func (w *When) MemoryQuota(quota string) *When { return w } +func (w *When) StorageQuota(quota string) *When { + obj, err := util.CreateHardStorageQuota(w.kubeClient, "argo", "storage-quota", quota) + if err != nil { + w.t.Fatal(err) + } + w.storageQuota = obj + return w +} + +func (w *When) DeleteStorageQuota() *When { + err := util.DeleteQuota(w.kubeClient, w.storageQuota) + if err != nil { + w.t.Fatal(err) + } + w.storageQuota = nil + return w +} + func (w *When) DeleteQuota() *When { err := util.DeleteQuota(w.kubeClient, w.resourceQuota) if err != nil { diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index 9fc50e1127c5..460479a780de 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -4,6 +4,7 @@ package e2e import ( "regexp" + "strings" "testing" "time" @@ -639,6 +640,24 @@ spec: }) } +func (s *FunctionalSuite) TestStorageQuotaLimit() { + s.Given(). + Workflow("@testdata/storage-limit.yaml"). + When(). + StorageQuota("5Mi"). + SubmitWorkflow(). + WaitForWorkflowToStart(5*time.Second). + WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool { + return strings.Contains(wf.Status.Message, "Waiting for a PVC to be created") + }, "PVC pending", 10*time.Second). + DeleteStorageQuota(). + WaitForWorkflow(30 * time.Second). + Then(). + ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { + assert.Equal(t, wfv1.NodeSucceeded, status.Phase) + }) +} + func TestFunctionalSuite(t *testing.T) { suite.Run(t, new(FunctionalSuite)) } diff --git a/test/e2e/testdata/storage-limit.yaml b/test/e2e/testdata/storage-limit.yaml new file mode 100644 index 000000000000..51b1e79644ad --- /dev/null +++ b/test/e2e/testdata/storage-limit.yaml @@ -0,0 +1,22 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: storage-quota-limit + labels: + argo-e2e: true +spec: + entrypoint: wait + volumeClaimTemplates: # define volume, same syntax as k8s Pod spec + - metadata: + name: workdir1 # name of volume claim + spec: + accessModes: [ "ReadWriteMany" ] + resources: + requests: + storage: 20Mi + + templates: + - name: wait + script: + image: argoproj/argosay:v2 + args: [echo, ":) Hello Argo!"] diff --git a/test/util/resourcequota.go b/test/util/resourcequota.go index e6343ddf0d46..63874b8c1bc5 100644 --- a/test/util/resourcequota.go +++ b/test/util/resourcequota.go @@ -8,15 +8,27 @@ import ( ) func CreateHardMemoryQuota(clientset kubernetes.Interface, namespace, name, memoryLimit string) (*corev1.ResourceQuota, error) { + resourceList := corev1.ResourceList{ + corev1.ResourceLimitsMemory: resource.MustParse(memoryLimit), + } + return CreateResourceQuota(clientset, namespace, name, resourceList) +} + +func CreateHardStorageQuota(clientset kubernetes.Interface, namespace, name, storageLimit string) (*corev1.ResourceQuota, error) { + resourceList := corev1.ResourceList{ + "requests.storage": resource.MustParse(storageLimit), + } + return CreateResourceQuota(clientset, namespace, name, resourceList) +} + +func CreateResourceQuota(clientset kubernetes.Interface, namespace, name string, rl corev1.ResourceList) (*corev1.ResourceQuota, error) { return clientset.CoreV1().ResourceQuotas(namespace).Create(&corev1.ResourceQuota{ ObjectMeta: metav1.ObjectMeta{ Name: name, Labels: map[string]string{"argo-e2e": "true"}, }, Spec: corev1.ResourceQuotaSpec{ - Hard: corev1.ResourceList{ - corev1.ResourceLimitsMemory: resource.MustParse(memoryLimit), - }, + Hard: rl, }, }) } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 89be36f0c631..1fb929d3d75e 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -282,11 +282,21 @@ func (woc *wfOperationCtx) operate() { err = woc.createPVCs() if err != nil { + if apierr.IsForbidden(err) { + // Error was most likely caused by a lack of resources. + // In this case, Workflow will be in pending state and requeue. + woc.markWorkflowPhase(wfv1.NodePending, false, fmt.Sprintf("Waiting for a PVC to be created. %v", err)) + woc.requeue(10) + return + } msg := "pvc create error" woc.log.WithError(err).Error(msg) woc.markWorkflowError(err, true) woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "WorkflowFailed", fmt.Sprintf("%s %s: %+v", woc.wf.ObjectMeta.Name, msg, err)) return + } else if woc.wf.Status.Phase == wfv1.NodePending { + // Workflow might be in pending state if previous PVC creation is forbidden + woc.markWorkflowRunning() } node, err := woc.executeTemplate(woc.wf.ObjectMeta.Name, execTmplRef, tmplCtx, execArgs, &executeTemplateOpts{}) @@ -1226,8 +1236,8 @@ func inferFailedReason(pod *apiv1.Pod) (wfv1.NodePhase, string) { } func (woc *wfOperationCtx) createPVCs() error { - if woc.wf.Status.Phase != wfv1.NodeRunning { - // Only attempt to create PVCs if workflow transitioned to Running state + if !(woc.wf.Status.Phase == wfv1.NodePending || woc.wf.Status.Phase == wfv1.NodeRunning) { + // Only attempt to create PVCs if workflow is in Pending or Running state // (e.g. passed validation, or didn't already complete) return nil } @@ -1236,9 +1246,6 @@ func (woc *wfOperationCtx) createPVCs() error { // This will also handle the case where workflow has no volumeClaimTemplates. return nil } - if len(woc.wf.Status.PersistentVolumeClaims) == 0 { - woc.wf.Status.PersistentVolumeClaims = make([]apiv1.Volume, len(woc.wfSpec.VolumeClaimTemplates)) - } pvcClient := woc.controller.kubeclientset.CoreV1().PersistentVolumeClaims(woc.wf.ObjectMeta.Namespace) for i, pvcTmpl := range woc.wfSpec.VolumeClaimTemplates { if pvcTmpl.ObjectMeta.Name == "" { @@ -1290,7 +1297,7 @@ func (woc *wfOperationCtx) createPVCs() error { }, }, } - woc.wf.Status.PersistentVolumeClaims[i] = vol + woc.wf.Status.PersistentVolumeClaims = append(woc.wf.Status.PersistentVolumeClaims, vol) woc.updated = true } return nil @@ -1673,7 +1680,7 @@ func (woc *wfOperationCtx) hasDaemonNodes() bool { } func (woc *wfOperationCtx) markWorkflowRunning() { - woc.markWorkflowPhase(wfv1.NodeRunning, false) + woc.markWorkflowPhase(wfv1.NodeRunning, false, "") } func (woc *wfOperationCtx) markWorkflowSuccess() { diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 95e79813df86..fa49d45d1020 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -4101,7 +4101,7 @@ status: defer cancel() woc := newWorkflowOperationCtx(wf, controller) woc.operate() - assert.Equal(t, wfv1.NodePending, woc.wf.Status.Phase) + assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Phase) for _, node := range woc.wf.Status.Nodes { switch node.TemplateName { case "main": diff --git a/workflow/util/util.go b/workflow/util/util.go index 793bdcc36674..a092431b1bd0 100644 --- a/workflow/util/util.go +++ b/workflow/util/util.go @@ -566,7 +566,7 @@ func FormulateResubmitWorkflow(wf *wfv1.Workflow, memoized bool) (*wfv1.Workflow } newWF.Status.Conditions = wfv1.Conditions{{Status: metav1.ConditionFalse, Type: wfv1.ConditionTypeCompleted}} - newWF.Status.Phase = wfv1.NodePending + newWF.Status.Phase = "" return &newWF, nil }