diff --git a/internal/runner/runner.go b/internal/runner/runner.go index e829c9f4f..da2dc8116 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -38,12 +38,15 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" v2 "github.com/fluxcd/helm-controller/api/v2beta1" ) +var accessor = meta.NewAccessor() + type ActionError struct { Err error CapturedLogs string @@ -95,7 +98,7 @@ func postRenderers(hr v2.HelmRelease) (postrender.PostRenderer, error) { return &combinedRenderer, nil } -// Install runs an Helm install action for the given v2beta1.HelmRelease. +// Install runs a Helm install action for the given v2beta1.HelmRelease. func (r *Runner) Install(hr v2.HelmRelease, chart *chart.Chart, values chartutil.Values) (*release.Release, error) { r.mu.Lock() defer r.mu.Unlock() @@ -110,33 +113,31 @@ func (r *Runner) Install(hr v2.HelmRelease, chart *chart.Chart, values chartutil install.DisableHooks = hr.Spec.GetInstall().DisableHooks install.DisableOpenAPIValidation = hr.Spec.GetInstall().DisableOpenAPIValidation install.Replace = hr.Spec.GetInstall().Replace - var legacyCRDsPolicy = v2.Create - if hr.Spec.GetInstall().SkipCRDs { - legacyCRDsPolicy = v2.Skip + install.SkipCRDs = true + install.Devel = true + + if hr.Spec.TargetNamespace != "" { + install.CreateNamespace = hr.Spec.GetInstall().CreateNamespace } - cRDsPolicy, err := r.validateCRDsPolicy(hr.Spec.GetInstall().CRDs, legacyCRDsPolicy) + + renderer, err := postRenderers(hr) if err != nil { return nil, wrapActionErr(r.logBuffer, err) } - if cRDsPolicy == v2.Skip || cRDsPolicy == v2.CreateReplace { - install.SkipCRDs = true + install.PostRenderer = renderer + + // If user opted-in to install (or replace) CRDs, install them first. + var legacyCRDsPolicy = v2.Create + if hr.Spec.GetInstall().SkipCRDs { + legacyCRDsPolicy = v2.Skip } - install.Devel = true - renderer, err := postRenderers(hr) + cRDsPolicy, err := r.validateCRDsPolicy(hr.Spec.GetInstall().CRDs, legacyCRDsPolicy) if err != nil { return nil, wrapActionErr(r.logBuffer, err) } - install.PostRenderer = renderer - if hr.Spec.TargetNamespace != "" { - install.CreateNamespace = hr.Spec.GetInstall().CreateNamespace - } - - if cRDsPolicy == v2.CreateReplace { - crds := chart.CRDObjects() - if len(crds) > 0 { - if err := r.applyCRDs(cRDsPolicy, hr, chart); err != nil { - return nil, wrapActionErr(r.logBuffer, err) - } + if cRDsPolicy != v2.Skip && len(chart.CRDObjects()) > 0 { + if err := r.applyCRDs(cRDsPolicy, chart, setOriginVisitor(hr.Namespace, hr.Name)); err != nil { + return nil, wrapActionErr(r.logBuffer, err) } } @@ -163,24 +164,24 @@ func (r *Runner) Upgrade(hr v2.HelmRelease, chart *chart.Chart, values chartutil upgrade.Force = hr.Spec.GetUpgrade().Force upgrade.CleanupOnFail = hr.Spec.GetUpgrade().CleanupOnFail upgrade.Devel = true + renderer, err := postRenderers(hr) if err != nil { return nil, wrapActionErr(r.logBuffer, err) } upgrade.PostRenderer = renderer + // If user opted-in to upgrade CRDs, upgrade them first. cRDsPolicy, err := r.validateCRDsPolicy(hr.Spec.GetUpgrade().CRDs, v2.Skip) if err != nil { return nil, wrapActionErr(r.logBuffer, err) } - if cRDsPolicy != v2.Skip { - crds := chart.CRDObjects() - if len(crds) > 0 { - if err := r.applyCRDs(cRDsPolicy, hr, chart); err != nil { - return nil, wrapActionErr(r.logBuffer, err) - } + if cRDsPolicy != v2.Skip && len(chart.CRDObjects()) > 0 { + if err := r.applyCRDs(cRDsPolicy, chart, setOriginVisitor(hr.Namespace, hr.Name)); err != nil { + return nil, wrapActionErr(r.logBuffer, err) } } + rel, err := upgrade.Run(hr.GetReleaseName(), chart, values.AsMap()) return rel, wrapActionErr(r.logBuffer, err) } @@ -196,7 +197,7 @@ func (r *Runner) validateCRDsPolicy(policy v2.CRDsPolicy, defaultValue v2.CRDsPo case v2.CreateReplace: break default: - return policy, fmt.Errorf("invalid CRD upgrade policy '%s' defined in field upgradeCRDs, valid values are '%s', '%s' or '%s'", + return policy, fmt.Errorf("invalid CRD policy '%s' defined in field CRDsPolicy, valid values are '%s', '%s' or '%s'", policy, v2.Skip, v2.Create, v2.CreateReplace, ) } @@ -210,37 +211,45 @@ func (*rootScoped) Name() meta.RESTScopeName { } // This has been adapted from https://github.com/helm/helm/blob/v3.5.4/pkg/action/install.go#L127 -func (r *Runner) applyCRDs(policy v2.CRDsPolicy, hr v2.HelmRelease, chart *chart.Chart) error { - cfg := r.config - cfg.Log("apply CRDs with policy %s", policy) +func (r *Runner) applyCRDs(policy v2.CRDsPolicy, chart *chart.Chart, visitorFunc ...resource.VisitorFunc) error { + r.config.Log("apply CRDs with policy %s", policy) + // Collect all CRDs from all files in `crds` directory. allCrds := make(kube.ResourceList, 0) for _, obj := range chart.CRDObjects() { // Read in the resources - res, err := cfg.KubeClient.Build(bytes.NewBuffer(obj.File.Data), false) + res, err := r.config.KubeClient.Build(bytes.NewBuffer(obj.File.Data), false) if err != nil { - cfg.Log("failed to parse CRDs from %s: %s", obj.Name, err) + r.config.Log("failed to parse CRDs from %s: %s", obj.Name, err) return errors.New(fmt.Sprintf("failed to parse CRDs from %s: %s", obj.Name, err)) } allCrds = append(allCrds, res...) } - totalItems := []*resource.Info{} + + // Visit CRDs with any provided visitor functions. + for _, visitor := range visitorFunc { + if err := allCrds.Visit(visitor); err != nil { + return err + } + } + + var totalItems []*resource.Info switch policy { case v2.Skip: break case v2.Create: for i := range allCrds { - if rr, err := cfg.KubeClient.Create(allCrds[i : i+1]); err != nil { + if rr, err := r.config.KubeClient.Create(allCrds[i : i+1]); err != nil { crdName := allCrds[i].Name // If the error is CRD already exists, continue. if apierrors.IsAlreadyExists(err) { - cfg.Log("CRD %s is already present. Skipping.", crdName) + r.config.Log("CRD %s is already present. Skipping.", crdName) if rr != nil && rr.Created != nil { totalItems = append(totalItems, rr.Created...) } continue } - cfg.Log("failed to create CRD %s: %s", crdName, err) + r.config.Log("failed to create CRD %s: %s", crdName, err) return errors.New(fmt.Sprintf("failed to create CRD %s: %s", crdName, err)) } else { if rr != nil && rr.Created != nil { @@ -248,16 +257,15 @@ func (r *Runner) applyCRDs(policy v2.CRDsPolicy, hr v2.HelmRelease, chart *chart } } } - break case v2.CreateReplace: config, err := r.config.RESTClientGetter.ToRESTConfig() if err != nil { - r.logBuffer.Log("Error while creating Kubernetes client config: %s", err) + r.config.Log("Error while creating Kubernetes client config: %s", err) return err } clientset, err := apiextension.NewForConfig(config) if err != nil { - r.logBuffer.Log("Error while creating Kubernetes clientset for apiextension: %s", err) + r.config.Log("Error while creating Kubernetes clientset for apiextension: %s", err) return err } client := clientset.ApiextensionsV1().CustomResourceDefinitions() @@ -265,21 +273,21 @@ func (r *Runner) applyCRDs(policy v2.CRDsPolicy, hr v2.HelmRelease, chart *chart // Note, we build the originals from the current set of CRDs // and therefore this upgrade will never delete CRDs that existed in the former release // but no longer exist in the current release. - for _, r := range allCrds { - if o, err := client.Get(context.TODO(), r.Name, v1.GetOptions{}); err == nil && o != nil { + for _, res := range allCrds { + if o, err := client.Get(context.TODO(), res.Name, metav1.GetOptions{}); err == nil && o != nil { o.GetResourceVersion() original = append(original, &resource.Info{ Client: clientset.ApiextensionsV1().RESTClient(), Mapping: &meta.RESTMapping{ Resource: schema.GroupVersionResource{ Group: "apiextensions.k8s.io", - Version: r.Mapping.GroupVersionKind.Version, + Version: res.Mapping.GroupVersionKind.Version, Resource: "customresourcedefinition", }, GroupVersionKind: schema.GroupVersionKind{ Kind: "CustomResourceDefinition", Group: "apiextensions.k8s.io", - Version: r.Mapping.GroupVersionKind.Version, + Version: res.Mapping.GroupVersionKind.Version, }, Scope: &rootScoped{}, }, @@ -289,13 +297,13 @@ func (r *Runner) applyCRDs(policy v2.CRDsPolicy, hr v2.HelmRelease, chart *chart ResourceVersion: o.ObjectMeta.ResourceVersion, }) } else if !apierrors.IsNotFound(err) { - cfg.Log("failed to get CRD %s: %s", r.Name, err) + r.config.Log("failed to get CRD %s: %s", res.Name, err) return err } } // Send them to Kube - if rr, err := cfg.KubeClient.Update(original, allCrds, true); err != nil { - cfg.Log("failed to apply CRD %s", err) + if rr, err := r.config.KubeClient.Update(original, allCrds, true); err != nil { + r.config.Log("failed to apply CRD %s", err) return errors.New(fmt.Sprintf("failed to apply CRD %s", err)) } else { if rr != nil { @@ -310,21 +318,21 @@ func (r *Runner) applyCRDs(policy v2.CRDsPolicy, hr v2.HelmRelease, chart *chart } } } - break } + if len(totalItems) > 0 { // Invalidate the local cache, since it will not have the new CRDs // present. - discoveryClient, err := cfg.RESTClientGetter.ToDiscoveryClient() + discoveryClient, err := r.config.RESTClientGetter.ToDiscoveryClient() if err != nil { - cfg.Log("Error in cfg.RESTClientGetter.ToDiscoveryClient(): %s", err) + r.config.Log("Error in cfg.RESTClientGetter.ToDiscoveryClient(): %s", err) return err } - cfg.Log("Clearing discovery cache") + r.config.Log("Clearing discovery cache") discoveryClient.Invalidate() // Give time for the CRD to be recognized. - if err := cfg.KubeClient.Wait(totalItems, 60*time.Second); err != nil { - cfg.Log("Error waiting for items: %s", err) + if err := r.config.KubeClient.Wait(totalItems, 60*time.Second); err != nil { + r.config.Log("Error waiting for items: %s", err) return err } // Make sure to force a rebuild of the cache. @@ -402,3 +410,45 @@ func wrapActionErr(log *LogBuffer, err error) error { } return err } + +func setOriginVisitor(namespace, name string) resource.VisitorFunc { + return func(info *resource.Info, err error) error { + if err != nil { + return err + } + if err = mergeLabels(info.Object, originLabels(namespace, name)); err != nil { + return fmt.Errorf( + "%s origin labels could not be updated: %s", + resourceString(info), err, + ) + } + return nil + } +} + +func mergeLabels(obj runtime.Object, labels map[string]string) error { + current, err := accessor.Labels(obj) + if err != nil { + return err + } + return accessor.SetLabels(obj, mergeStrStrMaps(current, labels)) +} + +func resourceString(info *resource.Info) string { + _, k := info.Mapping.GroupVersionKind.ToAPIVersionAndKind() + return fmt.Sprintf( + "%s %q in namespace %q", + k, info.Name, info.Namespace, + ) +} + +func mergeStrStrMaps(current, desired map[string]string) map[string]string { + result := make(map[string]string) + for k, v := range current { + result[k] = v + } + for k, desiredVal := range desired { + result[k] = desiredVal + } + return result +}