Skip to content

Commit

Permalink
Added EFS AccessPoints Tags Update
Browse files Browse the repository at this point in the history
  • Loading branch information
anirudhAgniRedhat committed Nov 8, 2024
1 parent d59cb71 commit 77f5f8c
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 68 deletions.
4 changes: 2 additions & 2 deletions pkg/driver/aws-efs/aws_efs.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func GetAWSEFSOperatorControllerConfig(ctx context.Context, flavour generator.Cl
cfg.AddDaemonSetHookBuilders(c, withFIPSDaemonSetHook, withVolumeMetricsDaemonSetHook)
cfg.AddCredentialsRequestHook(stsCredentialsRequestHook)

volumeTagController := NewEFSVolumeTagsController(cfg.GetControllerName("EFSVolumeTagsController"), c, c.EventRecorder)
cfg.ExtraControlPlaneControllers = append(cfg.ExtraControlPlaneControllers, volumeTagController)
accessPointsTagController := NewEFSAccessPointTagsController(cfg.GetControllerName("EFSAccessPointTagsController"), c, c.EventRecorder)
cfg.ExtraControlPlaneControllers = append(cfg.ExtraControlPlaneControllers, accessPointsTagController)
cfg.DeploymentInformers = append(cfg.DeploymentInformers, c.KubeInformers.InformersFor("").Core().V1().PersistentVolumes().Informer())

return cfg, nil
Expand Down
126 changes: 60 additions & 66 deletions pkg/driver/aws-efs/aws_efs_tags_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"k8s.io/apimachinery/pkg/labels"
"os"
"sort"
"strings"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

Expand All @@ -31,26 +32,26 @@ const (
awsEFSSecretNamespace = "openshift-cluster-csi-drivers"
awsEFSSecretName = "aws-efs-cloud-credentials"
efsDriverName = "efs.csi.aws.com"
tagHashAnnotationKey = "efs.openshift.io/volume-tags-hash"
tagHashAnnotationKey = "efs.openshift.io/access-point-tags-hash"
infrastructureName = "cluster"

operationDelay = 2 * time.Second
operationBackoffFactor = 1.2
operationRetryCount = 5
)

type EFSVolumeTagsController struct {
type EFSAccessPointTagsController struct {
name string
commonClient *clients.Clients
eventRecorder events.Recorder
}

func NewEFSVolumeTagsController(
func NewEFSAccessPointTagsController(
name string,
commonClient *clients.Clients,
eventRecorder events.Recorder) factory.Controller {

c := &EFSVolumeTagsController{
c := &EFSAccessPointTagsController{
name: name,
commonClient: commonClient,
eventRecorder: eventRecorder,
Expand All @@ -67,9 +68,9 @@ func NewEFSVolumeTagsController(
)
}

func (c *EFSVolumeTagsController) Sync(ctx context.Context, syncCtx factory.SyncContext) error {
klog.Infof("EFSVolumeTagsController sync started")
defer klog.Infof("EFSVolumeTagsController sync finished")
func (c *EFSAccessPointTagsController) Sync(ctx context.Context, syncCtx factory.SyncContext) error {
klog.Infof("EFSAccessPointTagsController sync started")
defer klog.Infof("EFSAccessPointTagsController sync finished")

opSpec, _, _, err := c.commonClient.OperatorClient.GetOperatorState()
if err != nil {
Expand All @@ -83,15 +84,10 @@ func (c *EFSVolumeTagsController) Sync(ctx context.Context, syncCtx factory.Sync
if err != nil {
return err
}
var infraRegion = ""
if infra.Status.PlatformStatus != nil && infra.Status.PlatformStatus.AWS != nil {
infraRegion = infra.Status.PlatformStatus.AWS.Region
}
efsClient, err := c.getEFSClient(ctx, infraRegion)
if err != nil {
return err
if infra == nil || infra.Status.PlatformStatus == nil || infra.Status.PlatformStatus.AWS == nil {
return nil
}
err = c.processInfrastructure(ctx, infra, efsClient)
err = c.processInfrastructure(ctx, infra)
if err != nil {
return err
}
Expand All @@ -100,7 +96,7 @@ func (c *EFSVolumeTagsController) Sync(ctx context.Context, syncCtx factory.Sync
}

// getEFSClient retrieves AWS credentials from the secret and creates an AWS EFS client using session.Options
func (c *EFSVolumeTagsController) getEFSClient(ctx context.Context, awsRegion string) (*efs.EFS, error) {
func (c *EFSAccessPointTagsController) getEFSClient(ctx context.Context, awsRegion string) (*efs.EFS, error) {
secret, err := c.getEFSCloudCredSecret(ctx)
if err != nil {
return nil, fmt.Errorf("error retrieving AWS credentials secret: %v", err)
Expand Down Expand Up @@ -185,29 +181,16 @@ func writeCredentialsToTempFile(data []byte) (string, error) {
}

// getInfrastructure retrieves the Infrastructure resource in OpenShift
func (c *EFSVolumeTagsController) getInfrastructure(ctx context.Context) (*configv1.Infrastructure, error) {
backoff := wait.Backoff{
Duration: operationDelay,
Factor: operationBackoffFactor,
Steps: operationRetryCount,
func (c *EFSAccessPointTagsController) getInfrastructure(ctx context.Context) (*configv1.Infrastructure, error) {
infra, err := c.commonClient.ConfigInformers.Config().V1().Infrastructures().Lister().Get(infrastructureName)
if err != nil {
klog.Errorf("error listing infrastructures objects: %v", err)
return nil, err
}
infra := &configv1.Infrastructure{}
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) {
var apiError error
infra, apiError = c.commonClient.ConfigInformers.Config().V1().Infrastructures().Lister().Get(infrastructureName)
if apiError != nil {
klog.Errorf("error listing infrastructures objects: %v", apiError)
return false, nil
}
if infra != nil {
return true, nil
}
return false, nil
})
return infra, err
}

func (c *EFSVolumeTagsController) getEFSCloudCredSecret(ctx context.Context) (*v1.Secret, error) {
func (c *EFSAccessPointTagsController) getEFSCloudCredSecret(ctx context.Context) (*v1.Secret, error) {
backoff := wait.Backoff{
Duration: operationDelay,
Factor: operationBackoffFactor,
Expand All @@ -231,11 +214,10 @@ func (c *EFSVolumeTagsController) getEFSCloudCredSecret(ctx context.Context) (*v
}

// processInfrastructure processes the Infrastructure resource and updates EFS tags
func (c *EFSVolumeTagsController) processInfrastructure(ctx context.Context, infra *configv1.Infrastructure, efsClient *efs.EFS) error {
func (c *EFSAccessPointTagsController) processInfrastructure(ctx context.Context, infra *configv1.Infrastructure) error {
if infra.Status.PlatformStatus != nil && infra.Status.PlatformStatus.AWS != nil &&
infra.Status.PlatformStatus.AWS.ResourceTags != nil {
awsInfra := infra.Status.PlatformStatus.AWS
err := c.fetchPVsAndUpdateTags(ctx, awsInfra.ResourceTags, efsClient)
err := c.fetchPVsAndUpdateTags(ctx, infra)
if err != nil {
klog.Errorf("Error processing PVs for infrastructure update: %v", err)
return err
Expand All @@ -245,22 +227,30 @@ func (c *EFSVolumeTagsController) processInfrastructure(ctx context.Context, inf
}

// fetchPVsAndUpdateTags retrieves all PVs and updates the AWS EFS tags in batches of 100
func (c *EFSVolumeTagsController) fetchPVsAndUpdateTags(ctx context.Context, resourceTags []configv1.AWSResourceTag, efsClient *efs.EFS) error {
pvs, err := c.listPersistentVolumesWithRetry(ctx)
func (c *EFSAccessPointTagsController) fetchPVsAndUpdateTags(ctx context.Context, infra *configv1.Infrastructure) error {
pvs, err := c.listPersistentVolumes()
if err != nil {
return fmt.Errorf("error fetching PVs: %v", err)
}
// Compute the hash for the new set of tags
newTagsHash := computeTagsHash(resourceTags)
newTagsHash := computeTagsHash(infra.Status.PlatformStatus.AWS.ResourceTags)
pvsToBeUpdated := filterUpdatableVolumes(pvs, newTagsHash)

// If there are no volumes to update, return early
if len(pvsToBeUpdated) == 0 {
klog.Infof("No volume tags to update as hashes are unchanged")
return nil
}
var infraRegion = ""
if infra.Status.PlatformStatus != nil && infra.Status.PlatformStatus.AWS != nil {
infraRegion = infra.Status.PlatformStatus.AWS.Region
}
efsClient, err := c.getEFSClient(ctx, infraRegion)
if err != nil {
return err
}
for _, volume := range pvsToBeUpdated {
err = c.updateEFSVolumeTags(volume, efsClient, resourceTags)
err = c.updateEFSVolumeTags(volume, efsClient, infra.Status.PlatformStatus.AWS.ResourceTags)
if err != nil {
klog.Errorf("Error updating volume %s tags: %v", volume.Name, err)
c.eventRecorder.Warning("EFSVolumeTagsUpdateFailed", fmt.Sprintf("Failed to update tags for batch %v: %v", volume.Name, err.Error()))
Expand All @@ -281,15 +271,14 @@ func (c *EFSVolumeTagsController) fetchPVsAndUpdateTags(ctx context.Context, res
}

// updateEFSTags updates the tags of an AWS EFS volume
func (c *EFSVolumeTagsController) updateEFSVolumeTags(pv *v1.PersistentVolume, efsClient *efs.EFS, resourceTags []configv1.AWSResourceTag) error {
func (c *EFSAccessPointTagsController) updateEFSVolumeTags(pv *v1.PersistentVolume, efsClient *efs.EFS, resourceTags []configv1.AWSResourceTag) error {

// Merge the existing tags with new resource tags
mergedTags := newAndUpdatedTags(resourceTags)
tags := newAndUpdatedTags(resourceTags)

// Create or update the tags
_, err := efsClient.TagResource(&efs.TagResourceInput{
ResourceId: aws.String(pv.Spec.CSI.VolumeHandle),
Tags: mergedTags,
ResourceId: aws.String(parseAccessPointID(pv.Spec.CSI.VolumeHandle)),
Tags: tags,
})
if err != nil {
klog.Errorf("Error updating tags for PV %s: %v", pv.Spec.CSI.VolumeHandle, err)
Expand All @@ -298,7 +287,7 @@ func (c *EFSVolumeTagsController) updateEFSVolumeTags(pv *v1.PersistentVolume, e
return nil
}

func (c *EFSVolumeTagsController) updateVolumeWithRetry(ctx context.Context, pv *v1.PersistentVolume) error {
func (c *EFSAccessPointTagsController) updateVolumeWithRetry(ctx context.Context, pv *v1.PersistentVolume) error {
backoff := wait.Backoff{
Duration: operationDelay,
Factor: operationBackoffFactor,
Expand All @@ -316,23 +305,13 @@ func (c *EFSVolumeTagsController) updateVolumeWithRetry(ctx context.Context, pv
return err
}

func (c *EFSVolumeTagsController) listPersistentVolumesWithRetry(ctx context.Context) ([]*v1.PersistentVolume, error) {
backoff := wait.Backoff{
Duration: operationDelay,
Factor: operationBackoffFactor,
Steps: operationRetryCount,
func (c *EFSAccessPointTagsController) listPersistentVolumes() ([]*v1.PersistentVolume, error) {
pvList, err := c.commonClient.KubeInformers.InformersFor("").Core().V1().PersistentVolumes().Lister().List(labels.Everything())
if err != nil {
klog.Errorf("error listing volumes objects: %v", err)
return nil, err
}
pvList := []*v1.PersistentVolume{}
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) {
var apiError error
pvList, apiError = c.commonClient.KubeInformers.InformersFor("").Core().V1().PersistentVolumes().Lister().List(labels.Everything())
if apiError != nil {
klog.Errorf("error listing volumes objects: %v", apiError)
return false, nil
}
return true, nil
})
return pvList, err
return pvList, nil
}

// newAndUpdatedTags adds and update existing AWS tags with new resource tags from OpenShift infrastructure
Expand All @@ -351,7 +330,8 @@ func newAndUpdatedTags(resourceTags []configv1.AWSResourceTag) []*efs.Tag {
func filterUpdatableVolumes(volumes []*v1.PersistentVolume, newTagsHash string) []*v1.PersistentVolume {
var pvsToBeUpdated = make([]*v1.PersistentVolume, 0)
for _, volume := range volumes {
if volume.Spec.CSI != nil && volume.Spec.CSI.Driver == efsDriverName {
if volume.Spec.CSI != nil && volume.Spec.CSI.Driver == efsDriverName &&
parseAccessPointID(volume.Spec.CSI.VolumeHandle) != "" {
existingHash := getPVTagHash(volume)
if existingHash == "" || existingHash != newTagsHash {
pvsToBeUpdated = append(pvsToBeUpdated, volume)
Expand Down Expand Up @@ -400,3 +380,17 @@ func computeTagsHash(resourceTags []configv1.AWSResourceTag) string {
hash := sha256.Sum256([]byte(tagsString))
return hex.EncodeToString(hash[:])
}

// parseAccessPointID checks if an Access Point ID is present in the input string.
// It returns the Access Point ID if present, or an empty string if not.
func parseAccessPointID(input string) string {
// Split the input string by "::" delimiter
parts := strings.Split(input, "::")

// Check if there's an Access Point ID after "::"
if len(parts) == 2 && strings.HasPrefix(parts[1], "fsap-") {
return parts[1]
}
// Return an empty string if no Access Point ID is found
return ""
}

0 comments on commit 77f5f8c

Please sign in to comment.