Skip to content

Commit

Permalink
Merge pull request #596 from fluxcd/label-crds
Browse files Browse the repository at this point in the history
Patch CRDs with origin labels
  • Loading branch information
hiddeco authored Feb 1, 2023
2 parents e47b08e + 362a271 commit 60a6b53
Showing 1 changed file with 103 additions and 53 deletions.
156 changes: 103 additions & 53 deletions internal/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@ import (

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

v2 "github.com/fluxcd/helm-controller/api/v2beta1"
)

var accessor = meta.NewAccessor()

type ActionError struct {
Err error
CapturedLogs string
Expand Down Expand Up @@ -95,7 +98,7 @@ func postRenderers(hr v2.HelmRelease) (postrender.PostRenderer, error) {
return &combinedRenderer, nil
}

// Install runs an Helm install action for the given v2beta1.HelmRelease.
// Install runs a Helm install action for the given v2beta1.HelmRelease.
func (r *Runner) Install(hr v2.HelmRelease, chart *chart.Chart, values chartutil.Values) (*release.Release, error) {
r.mu.Lock()
defer r.mu.Unlock()
Expand All @@ -110,33 +113,31 @@ func (r *Runner) Install(hr v2.HelmRelease, chart *chart.Chart, values chartutil
install.DisableHooks = hr.Spec.GetInstall().DisableHooks
install.DisableOpenAPIValidation = hr.Spec.GetInstall().DisableOpenAPIValidation
install.Replace = hr.Spec.GetInstall().Replace
var legacyCRDsPolicy = v2.Create
if hr.Spec.GetInstall().SkipCRDs {
legacyCRDsPolicy = v2.Skip
install.SkipCRDs = true
install.Devel = true

if hr.Spec.TargetNamespace != "" {
install.CreateNamespace = hr.Spec.GetInstall().CreateNamespace
}
cRDsPolicy, err := r.validateCRDsPolicy(hr.Spec.GetInstall().CRDs, legacyCRDsPolicy)

renderer, err := postRenderers(hr)
if err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
if cRDsPolicy == v2.Skip || cRDsPolicy == v2.CreateReplace {
install.SkipCRDs = true
install.PostRenderer = renderer

// If user opted-in to install (or replace) CRDs, install them first.
var legacyCRDsPolicy = v2.Create
if hr.Spec.GetInstall().SkipCRDs {
legacyCRDsPolicy = v2.Skip
}
install.Devel = true
renderer, err := postRenderers(hr)
cRDsPolicy, err := r.validateCRDsPolicy(hr.Spec.GetInstall().CRDs, legacyCRDsPolicy)
if err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
install.PostRenderer = renderer
if hr.Spec.TargetNamespace != "" {
install.CreateNamespace = hr.Spec.GetInstall().CreateNamespace
}

if cRDsPolicy == v2.CreateReplace {
crds := chart.CRDObjects()
if len(crds) > 0 {
if err := r.applyCRDs(cRDsPolicy, hr, chart); err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
if cRDsPolicy != v2.Skip && len(chart.CRDObjects()) > 0 {
if err := r.applyCRDs(cRDsPolicy, chart, setOriginVisitor(hr.Namespace, hr.Name)); err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
}

Expand All @@ -163,24 +164,24 @@ func (r *Runner) Upgrade(hr v2.HelmRelease, chart *chart.Chart, values chartutil
upgrade.Force = hr.Spec.GetUpgrade().Force
upgrade.CleanupOnFail = hr.Spec.GetUpgrade().CleanupOnFail
upgrade.Devel = true

renderer, err := postRenderers(hr)
if err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
upgrade.PostRenderer = renderer

// If user opted-in to upgrade CRDs, upgrade them first.
cRDsPolicy, err := r.validateCRDsPolicy(hr.Spec.GetUpgrade().CRDs, v2.Skip)
if err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
if cRDsPolicy != v2.Skip {
crds := chart.CRDObjects()
if len(crds) > 0 {
if err := r.applyCRDs(cRDsPolicy, hr, chart); err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
if cRDsPolicy != v2.Skip && len(chart.CRDObjects()) > 0 {
if err := r.applyCRDs(cRDsPolicy, chart, setOriginVisitor(hr.Namespace, hr.Name)); err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
}

rel, err := upgrade.Run(hr.GetReleaseName(), chart, values.AsMap())
return rel, wrapActionErr(r.logBuffer, err)
}
Expand All @@ -196,7 +197,7 @@ func (r *Runner) validateCRDsPolicy(policy v2.CRDsPolicy, defaultValue v2.CRDsPo
case v2.CreateReplace:
break
default:
return policy, fmt.Errorf("invalid CRD upgrade policy '%s' defined in field upgradeCRDs, valid values are '%s', '%s' or '%s'",
return policy, fmt.Errorf("invalid CRD policy '%s' defined in field CRDsPolicy, valid values are '%s', '%s' or '%s'",
policy, v2.Skip, v2.Create, v2.CreateReplace,
)
}
Expand All @@ -210,76 +211,83 @@ func (*rootScoped) Name() meta.RESTScopeName {
}

// This has been adapted from https://github.com/helm/helm/blob/v3.5.4/pkg/action/install.go#L127
func (r *Runner) applyCRDs(policy v2.CRDsPolicy, hr v2.HelmRelease, chart *chart.Chart) error {
cfg := r.config
cfg.Log("apply CRDs with policy %s", policy)
func (r *Runner) applyCRDs(policy v2.CRDsPolicy, chart *chart.Chart, visitorFunc ...resource.VisitorFunc) error {
r.config.Log("apply CRDs with policy %s", policy)

// Collect all CRDs from all files in `crds` directory.
allCrds := make(kube.ResourceList, 0)
for _, obj := range chart.CRDObjects() {
// Read in the resources
res, err := cfg.KubeClient.Build(bytes.NewBuffer(obj.File.Data), false)
res, err := r.config.KubeClient.Build(bytes.NewBuffer(obj.File.Data), false)
if err != nil {
cfg.Log("failed to parse CRDs from %s: %s", obj.Name, err)
r.config.Log("failed to parse CRDs from %s: %s", obj.Name, err)
return errors.New(fmt.Sprintf("failed to parse CRDs from %s: %s", obj.Name, err))
}
allCrds = append(allCrds, res...)
}
totalItems := []*resource.Info{}

// Visit CRDs with any provided visitor functions.
for _, visitor := range visitorFunc {
if err := allCrds.Visit(visitor); err != nil {
return err
}
}

var totalItems []*resource.Info
switch policy {
case v2.Skip:
break
case v2.Create:
for i := range allCrds {
if rr, err := cfg.KubeClient.Create(allCrds[i : i+1]); err != nil {
if rr, err := r.config.KubeClient.Create(allCrds[i : i+1]); err != nil {
crdName := allCrds[i].Name
// If the error is CRD already exists, continue.
if apierrors.IsAlreadyExists(err) {
cfg.Log("CRD %s is already present. Skipping.", crdName)
r.config.Log("CRD %s is already present. Skipping.", crdName)
if rr != nil && rr.Created != nil {
totalItems = append(totalItems, rr.Created...)
}
continue
}
cfg.Log("failed to create CRD %s: %s", crdName, err)
r.config.Log("failed to create CRD %s: %s", crdName, err)
return errors.New(fmt.Sprintf("failed to create CRD %s: %s", crdName, err))
} else {
if rr != nil && rr.Created != nil {
totalItems = append(totalItems, rr.Created...)
}
}
}
break
case v2.CreateReplace:
config, err := r.config.RESTClientGetter.ToRESTConfig()
if err != nil {
r.logBuffer.Log("Error while creating Kubernetes client config: %s", err)
r.config.Log("Error while creating Kubernetes client config: %s", err)
return err
}
clientset, err := apiextension.NewForConfig(config)
if err != nil {
r.logBuffer.Log("Error while creating Kubernetes clientset for apiextension: %s", err)
r.config.Log("Error while creating Kubernetes clientset for apiextension: %s", err)
return err
}
client := clientset.ApiextensionsV1().CustomResourceDefinitions()
original := make(kube.ResourceList, 0)
// Note, we build the originals from the current set of CRDs
// and therefore this upgrade will never delete CRDs that existed in the former release
// but no longer exist in the current release.
for _, r := range allCrds {
if o, err := client.Get(context.TODO(), r.Name, v1.GetOptions{}); err == nil && o != nil {
for _, res := range allCrds {
if o, err := client.Get(context.TODO(), res.Name, metav1.GetOptions{}); err == nil && o != nil {
o.GetResourceVersion()
original = append(original, &resource.Info{
Client: clientset.ApiextensionsV1().RESTClient(),
Mapping: &meta.RESTMapping{
Resource: schema.GroupVersionResource{
Group: "apiextensions.k8s.io",
Version: r.Mapping.GroupVersionKind.Version,
Version: res.Mapping.GroupVersionKind.Version,
Resource: "customresourcedefinition",
},
GroupVersionKind: schema.GroupVersionKind{
Kind: "CustomResourceDefinition",
Group: "apiextensions.k8s.io",
Version: r.Mapping.GroupVersionKind.Version,
Version: res.Mapping.GroupVersionKind.Version,
},
Scope: &rootScoped{},
},
Expand All @@ -289,13 +297,13 @@ func (r *Runner) applyCRDs(policy v2.CRDsPolicy, hr v2.HelmRelease, chart *chart
ResourceVersion: o.ObjectMeta.ResourceVersion,
})
} else if !apierrors.IsNotFound(err) {
cfg.Log("failed to get CRD %s: %s", r.Name, err)
r.config.Log("failed to get CRD %s: %s", res.Name, err)
return err
}
}
// Send them to Kube
if rr, err := cfg.KubeClient.Update(original, allCrds, true); err != nil {
cfg.Log("failed to apply CRD %s", err)
if rr, err := r.config.KubeClient.Update(original, allCrds, true); err != nil {
r.config.Log("failed to apply CRD %s", err)
return errors.New(fmt.Sprintf("failed to apply CRD %s", err))
} else {
if rr != nil {
Expand All @@ -310,21 +318,21 @@ func (r *Runner) applyCRDs(policy v2.CRDsPolicy, hr v2.HelmRelease, chart *chart
}
}
}
break
}

if len(totalItems) > 0 {
// Invalidate the local cache, since it will not have the new CRDs
// present.
discoveryClient, err := cfg.RESTClientGetter.ToDiscoveryClient()
discoveryClient, err := r.config.RESTClientGetter.ToDiscoveryClient()
if err != nil {
cfg.Log("Error in cfg.RESTClientGetter.ToDiscoveryClient(): %s", err)
r.config.Log("Error in cfg.RESTClientGetter.ToDiscoveryClient(): %s", err)
return err
}
cfg.Log("Clearing discovery cache")
r.config.Log("Clearing discovery cache")
discoveryClient.Invalidate()
// Give time for the CRD to be recognized.
if err := cfg.KubeClient.Wait(totalItems, 60*time.Second); err != nil {
cfg.Log("Error waiting for items: %s", err)
if err := r.config.KubeClient.Wait(totalItems, 60*time.Second); err != nil {
r.config.Log("Error waiting for items: %s", err)
return err
}
// Make sure to force a rebuild of the cache.
Expand Down Expand Up @@ -402,3 +410,45 @@ func wrapActionErr(log *LogBuffer, err error) error {
}
return err
}

func setOriginVisitor(namespace, name string) resource.VisitorFunc {
return func(info *resource.Info, err error) error {
if err != nil {
return err
}
if err = mergeLabels(info.Object, originLabels(namespace, name)); err != nil {
return fmt.Errorf(
"%s origin labels could not be updated: %s",
resourceString(info), err,
)
}
return nil
}
}

func mergeLabels(obj runtime.Object, labels map[string]string) error {
current, err := accessor.Labels(obj)
if err != nil {
return err
}
return accessor.SetLabels(obj, mergeStrStrMaps(current, labels))
}

func resourceString(info *resource.Info) string {
_, k := info.Mapping.GroupVersionKind.ToAPIVersionAndKind()
return fmt.Sprintf(
"%s %q in namespace %q",
k, info.Name, info.Namespace,
)
}

func mergeStrStrMaps(current, desired map[string]string) map[string]string {
result := make(map[string]string)
for k, v := range current {
result[k] = v
}
for k, desiredVal := range desired {
result[k] = desiredVal
}
return result
}

0 comments on commit 60a6b53

Please sign in to comment.