diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index d06bb86382d5..8a224673432f 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -546,6 +546,10 @@ "description": "MergeStrategy is the strategy used to merge a patch. It defaults to \"strategic\" Must be one of: strategic, merge, json", "type": "string" }, + "setOwnerReference": { + "description": "SetOwnerReference sets the reference to the workflow on the OwnerReference of generated resource.", + "type": "boolean" + }, "successCondition": { "description": "SuccessCondition is a label selector expression which describes the conditions of the k8s resource in which it is acceptable to proceed to the following step", "type": "string" diff --git a/cmd/argoexec/commands/resource.go b/cmd/argoexec/commands/resource.go index d024e1683ae9..f93221a50f67 100644 --- a/cmd/argoexec/commands/resource.go +++ b/cmd/argoexec/commands/resource.go @@ -41,7 +41,7 @@ func execResource(action string) error { wfExecutor.AddError(err) return err } - resourceNamespace, resourceName, err := wfExecutor.ExecResource(action, common.ExecutorResourceManifestPath, isDelete) + resourceNamespace, resourceName, err := wfExecutor.ExecResource(action, common.ExecutorResourceManifestPath) if err != nil { wfExecutor.AddError(err) return err diff --git a/examples/k8s-set-owner-reference.yaml b/examples/k8s-set-owner-reference.yaml new file mode 100644 index 000000000000..a9de5a737d84 --- /dev/null +++ b/examples/k8s-set-owner-reference.yaml @@ -0,0 +1,25 @@ +# This example creates a Kubernetes resource that will be deleted +# when the workflow is deleted via Kubernetes GC. +# +# A configmap is used for this example, but the same approach would apply +# to other more interesting resource types. +# +# https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: k8s-set-owner-reference- +spec: + entrypoint: k8s-set-owner-reference + templates: + - name: k8s-set-owner-reference + resource: + action: create + setOwnerReference: true + manifest: | + apiVersion: v1 + kind: ConfigMap + metadata: + generateName: owned-eg- + data: + some: value diff --git a/pkg/apis/workflow/v1alpha1/openapi_generated.go b/pkg/apis/workflow/v1alpha1/openapi_generated.go index 51974196de83..dc8b401bf297 100644 --- a/pkg/apis/workflow/v1alpha1/openapi_generated.go +++ b/pkg/apis/workflow/v1alpha1/openapi_generated.go @@ -1026,6 +1026,13 @@ func schema_pkg_apis_workflow_v1alpha1_ResourceTemplate(ref common.ReferenceCall Format: "", }, }, + "setOwnerReference": { + SchemaProps: spec.SchemaProps{ + Description: "SetOwnerReference sets the reference to the workflow on the OwnerReference of generated resource.", + Type: []string{"boolean"}, + Format: "", + }, + }, "successCondition": { SchemaProps: spec.SchemaProps{ Description: "SuccessCondition is a label selector expression which describes the conditions of the k8s resource in which it is acceptable to proceed to the following step", diff --git a/pkg/apis/workflow/v1alpha1/types.go b/pkg/apis/workflow/v1alpha1/types.go index 855b3ad51473..1d9d4308a0ba 100644 --- a/pkg/apis/workflow/v1alpha1/types.go +++ b/pkg/apis/workflow/v1alpha1/types.go @@ -825,6 +825,9 @@ type ResourceTemplate struct { // Manifest contains the kubernetes manifest Manifest string `json:"manifest"` + // SetOwnerReference sets the reference to the workflow on the OwnerReference of generated resource. + SetOwnerReference bool `json:"setOwnerReference,omitempty"` + // SuccessCondition is a label selector expression which describes the conditions // of the k8s resource in which it is acceptable to proceed to the following step SuccessCondition string `json:"successCondition,omitempty"` diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 3e14d1b7611b..b19f8c6accdf 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -15,11 +15,13 @@ import ( argokubeerr "github.com/argoproj/pkg/kube/errors" "github.com/argoproj/pkg/strftime" jsonpatch "github.com/evanphx/json-patch" + "github.com/ghodss/yaml" log "github.com/sirupsen/logrus" "github.com/valyala/fasttemplate" apiv1 "k8s.io/api/core/v1" apierr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/tools/cache" "k8s.io/utils/pointer" @@ -1541,16 +1543,36 @@ func (woc *wfOperationCtx) addChildNode(parent string, child string) { // executeResource is runs a kubectl command against a manifest func (woc *wfOperationCtx) executeResource(nodeName string, tmpl *wfv1.Template, boundaryID string) *wfv1.NodeStatus { + tmpl = tmpl.DeepCopy() + node := woc.getNodeByName(nodeName) if node != nil { return node } + + // Try to unmarshal the given manifest. + obj := unstructured.Unstructured{} + err := yaml.Unmarshal([]byte(tmpl.Resource.Manifest), &obj) + if err != nil { + return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error()) + } + + if tmpl.Resource.SetOwnerReference { + ownerReferences := obj.GetOwnerReferences() + obj.SetOwnerReferences(append(ownerReferences, *metav1.NewControllerRef(woc.wf, wfv1.SchemaGroupVersionKind))) + bytes, err := yaml.Marshal(obj.Object) + if err != nil { + return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error()) + } + tmpl.Resource.Manifest = string(bytes) + } + mainCtr := woc.newExecContainer(common.MainContainerName) mainCtr.Command = []string{"argoexec", "resource", tmpl.Resource.Action} mainCtr.VolumeMounts = []apiv1.VolumeMount{ volumeMountPodMetadata, } - _, err := woc.createWorkflowPod(nodeName, *mainCtr, tmpl) + _, err = woc.createWorkflowPod(nodeName, *mainCtr, tmpl) if err != nil { return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error()) } diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index b30b88fd2d9c..f92b4bbae502 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -6,7 +6,9 @@ import ( wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/test" + "github.com/argoproj/argo/workflow/common" "github.com/argoproj/argo/workflow/util" + "github.com/ghodss/yaml" "github.com/stretchr/testify/assert" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -352,11 +354,11 @@ func TestDAGTemplateParallelismLimit(t *testing.T) { } var nestedParallelism = ` -# Example with vertical and horizontal scalability -# -# Imagine we have 'M' workers which work in parallel, -# each worker should performs 'N' loops sequentially -# +# Example with vertical and horizontal scalability +# +# Imagine we have 'M' workers which work in parallel, +# each worker should performs 'N' loops sequentially +# apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: @@ -1003,3 +1005,152 @@ func TestResolveIOPathPlaceholders(t *testing.T) { assert.Equal(t, []string{"sh", "-c", "head -n 3 <\"/inputs/text/data\" | tee \"/outputs/text/data\" | wc -l > \"/outputs/actual-lines-count/data\""}, pods.Items[0].Spec.Containers[1].Command) } + +var resourceTemplate = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: resource-template +spec: + entrypoint: resource + templates: + - name: resource + resource: + manifest: | + apiVersion: v1 + kind: ConfigMap + metadata: + name: resource-cm +` + +func TestResourceTemplate(t *testing.T) { + controller := newController() + wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("") + + // operate the workflow. it should create a pod. + wf := unmarshalWF(resourceTemplate) + wf, err := wfcset.Create(wf) + assert.Nil(t, err) + woc := newWorkflowOperationCtx(wf, controller) + woc.operate() + wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{}) + assert.Nil(t, err) + assert.Equal(t, wfv1.NodeRunning, wf.Status.Phase) + + pod, err := controller.kubeclientset.CoreV1().Pods("").Get("resource-template", metav1.GetOptions{}) + if !assert.Nil(t, err) { + t.Fatal(err) + } + tmplStr := pod.Annotations[common.AnnotationKeyTemplate] + tmpl := wfv1.Template{} + err = yaml.Unmarshal([]byte(tmplStr), &tmpl) + if !assert.Nil(t, err) { + t.Fatal(err) + } + cm := apiv1.ConfigMap{} + err = yaml.Unmarshal([]byte(tmpl.Resource.Manifest), &cm) + if !assert.Nil(t, err) { + t.Fatal(err) + } + assert.Equal(t, "resource-cm", cm.Name) + assert.Empty(t, cm.ObjectMeta.OwnerReferences) +} + +var resourceWithOwnerReferenceTemplate = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: resource-with-ownerreference-template +spec: + entrypoint: start + templates: + - name: start + steps: + - - name: resource-1 + template: resource-1 + - name: resource-2 + template: resource-2 + - name: resource-3 + template: resource-3 + - name: resource-1 + resource: + manifest: | + apiVersion: v1 + kind: ConfigMap + metadata: + name: resource-cm-1 + ownerReferences: + - apiVersion: argoproj.io/v1alpha1 + blockOwnerDeletion: true + kind: Workflow + name: "manual-ref-name" + uid: "manual-ref-uid" + - name: resource-2 + resource: + setOwnerReference: true + manifest: | + apiVersion: v1 + kind: ConfigMap + metadata: + name: resource-cm-2 + - name: resource-3 + resource: + setOwnerReference: true + manifest: | + apiVersion: v1 + kind: ConfigMap + metadata: + name: resource-cm-3 + ownerReferences: + - apiVersion: argoproj.io/v1alpha1 + blockOwnerDeletion: true + kind: Workflow + name: "manual-ref-name" + uid: "manual-ref-uid" +` + +func TestResourceWithOwnerReferenceTemplate(t *testing.T) { + controller := newController() + wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("") + + // operate the workflow. it should create a pod. + wf := unmarshalWF(resourceWithOwnerReferenceTemplate) + wf, err := wfcset.Create(wf) + assert.Nil(t, err) + woc := newWorkflowOperationCtx(wf, controller) + woc.operate() + wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{}) + assert.Nil(t, err) + assert.Equal(t, wfv1.NodeRunning, wf.Status.Phase) + + pods, err := controller.kubeclientset.CoreV1().Pods("").List(metav1.ListOptions{}) + if !assert.Nil(t, err) { + t.Fatal(err) + } + + objectMetas := map[string]metav1.ObjectMeta{} + for _, pod := range pods.Items { + tmplStr := pod.Annotations[common.AnnotationKeyTemplate] + tmpl := wfv1.Template{} + err = yaml.Unmarshal([]byte(tmplStr), &tmpl) + if !assert.Nil(t, err) { + t.Fatal(err) + } + cm := apiv1.ConfigMap{} + err = yaml.Unmarshal([]byte(tmpl.Resource.Manifest), &cm) + if !assert.Nil(t, err) { + t.Fatal(err) + } + objectMetas[cm.Name] = cm.ObjectMeta + } + if assert.Equal(t, 1, len(objectMetas["resource-cm-1"].OwnerReferences)) { + assert.Equal(t, "manual-ref-name", objectMetas["resource-cm-1"].OwnerReferences[0].Name) + } + if assert.Equal(t, 1, len(objectMetas["resource-cm-2"].OwnerReferences)) { + assert.Equal(t, "resource-with-ownerreference-template", objectMetas["resource-cm-2"].OwnerReferences[0].Name) + } + if assert.Equal(t, 2, len(objectMetas["resource-cm-3"].OwnerReferences)) { + assert.Equal(t, "manual-ref-name", objectMetas["resource-cm-3"].OwnerReferences[0].Name) + assert.Equal(t, "resource-with-ownerreference-template", objectMetas["resource-cm-3"].OwnerReferences[1].Name) + } +} diff --git a/workflow/executor/resource.go b/workflow/executor/resource.go index 270ab69298dc..cdc573b0a3a5 100644 --- a/workflow/executor/resource.go +++ b/workflow/executor/resource.go @@ -20,7 +20,8 @@ import ( ) // ExecResource will run kubectl action against a manifest -func (we *WorkflowExecutor) ExecResource(action string, manifestPath string, isDelete bool) (string, string, error) { +func (we *WorkflowExecutor) ExecResource(action string, manifestPath string) (string, string, error) { + isDelete := action == "delete" args := []string{ action, } diff --git a/workflow/validate/validate.go b/workflow/validate/validate.go index 007ae4b7d86a..b5714c9ba747 100644 --- a/workflow/validate/validate.go +++ b/workflow/validate/validate.go @@ -8,7 +8,9 @@ import ( "regexp" "strings" + "github.com/ghodss/yaml" "github.com/valyala/fasttemplate" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" apivalidation "k8s.io/apimachinery/pkg/util/validation" "github.com/argoproj/argo/errors" @@ -325,6 +327,14 @@ func validateLeaf(scope map[string]interface{}, tmpl *wfv1.Template) error { mountPaths[art.Path] = fmt.Sprintf("inputs.artifacts.%s", art.Name) } } + if tmpl.Resource != nil { + // Try to unmarshal the given manifest. + obj := unstructured.Unstructured{} + err := yaml.Unmarshal([]byte(tmpl.Resource.Manifest), &obj) + if err != nil { + return errors.Errorf(errors.CodeBadRequest, "templates.%s.resource.manifest must be a valid yaml", tmpl.Name) + } + } if tmpl.ActiveDeadlineSeconds != nil { if *tmpl.ActiveDeadlineSeconds <= 0 { return errors.Errorf(errors.CodeBadRequest, "templates.%s.activeDeadlineSeconds must be a positive integer > 0", tmpl.Name) diff --git a/workflow/validate/validate_test.go b/workflow/validate/validate_test.go index 25774083052b..b1875e91bd52 100644 --- a/workflow/validate/validate_test.go +++ b/workflow/validate/validate_test.go @@ -1356,3 +1356,51 @@ func TestBaseImageOutputVerify(t *testing.T) { assert.NoError(t, err) } } + +var validResourceWorkflow = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: valid-resource- +spec: + entrypoint: whalesay + templates: + - name: whalesay + resource: + manifest: | + apiVersion: v1 + kind: ConfigMap + metadata: + name: whalesay-cm +` + +// TestValidResourceWorkflow verifies a workflow of a valid resource. +func TestValidResourceWorkflow(t *testing.T) { + wf := unmarshalWf(validResourceWorkflow) + err := ValidateWorkflow(wf, ValidateOpts{}) + assert.Equal(t, err, nil) +} + +var invalidResourceWorkflow = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: invalid-resource- +spec: + entrypoint: whalesay + templates: + - name: whalesay + resource: + manifest: | + invalid-yaml-line + kind: ConfigMap + metadata: + name: whalesay-cm +` + +// TestInvalidResourceWorkflow verifies an error against a workflow of an invalid resource. +func TestInvalidResourceWorkflow(t *testing.T) { + wf := unmarshalWf(invalidResourceWorkflow) + err := ValidateWorkflow(wf, ValidateOpts{}) + assert.Error(t, err, "templates.whalesay.resource.manifest must be a valid yaml") +}