diff --git a/pkg/driver/aws-efs/aws_efs.go b/pkg/driver/aws-efs/aws_efs.go index d397b5b65..090d38215 100644 --- a/pkg/driver/aws-efs/aws_efs.go +++ b/pkg/driver/aws-efs/aws_efs.go @@ -115,8 +115,8 @@ func GetAWSEFSOperatorControllerConfig(ctx context.Context, flavour generator.Cl cfg.AddDaemonSetHookBuilders(c, withFIPSDaemonSetHook, withVolumeMetricsDaemonSetHook) cfg.AddCredentialsRequestHook(stsCredentialsRequestHook) - volumeTagController := NewEFSVolumeTagsController(cfg.GetControllerName("EFSVolumeTagsController"), c, c.EventRecorder) - cfg.ExtraControlPlaneControllers = append(cfg.ExtraControlPlaneControllers, volumeTagController) + accessPointsTagController := NewEFSAccessPointTagsController(cfg.GetControllerName("EFSAccessPointTagsController"), c, c.EventRecorder) + cfg.ExtraControlPlaneControllers = append(cfg.ExtraControlPlaneControllers, accessPointsTagController) cfg.DeploymentInformers = append(cfg.DeploymentInformers, c.KubeInformers.InformersFor("").Core().V1().PersistentVolumes().Informer()) return cfg, nil diff --git a/pkg/driver/aws-efs/aws_efs_tags_controller.go b/pkg/driver/aws-efs/aws_efs_tags_controller.go index c38f945a2..3c62ff862 100644 --- a/pkg/driver/aws-efs/aws_efs_tags_controller.go +++ b/pkg/driver/aws-efs/aws_efs_tags_controller.go @@ -5,13 +5,14 @@ import ( "crypto/sha256" "encoding/hex" "fmt" - "k8s.io/apimachinery/pkg/labels" "os" "sort" + "strings" "time" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" @@ -31,7 +32,7 @@ const ( awsEFSSecretNamespace = "openshift-cluster-csi-drivers" awsEFSSecretName = "aws-efs-cloud-credentials" efsDriverName = "efs.csi.aws.com" - tagHashAnnotationKey = "efs.openshift.io/volume-tags-hash" + tagHashAnnotationKey = "efs.openshift.io/access-point-tags-hash" infrastructureName = "cluster" operationDelay = 2 * time.Second @@ -39,18 +40,18 @@ const ( operationRetryCount = 5 ) -type EFSVolumeTagsController struct { +type EFSAccessPointTagsController struct { name string commonClient *clients.Clients eventRecorder events.Recorder } -func NewEFSVolumeTagsController( +func NewEFSAccessPointTagsController( name string, commonClient *clients.Clients, eventRecorder events.Recorder) factory.Controller { - c := &EFSVolumeTagsController{ + c := &EFSAccessPointTagsController{ name: name, commonClient: commonClient, eventRecorder: eventRecorder, @@ -67,9 +68,9 @@ func NewEFSVolumeTagsController( ) } -func (c *EFSVolumeTagsController) Sync(ctx context.Context, syncCtx factory.SyncContext) error { - klog.Infof("EFSVolumeTagsController sync started") - defer klog.Infof("EFSVolumeTagsController sync finished") +func (c *EFSAccessPointTagsController) Sync(ctx context.Context, syncCtx factory.SyncContext) error { + klog.Infof("EFSAccessPointTagsController sync started") + defer klog.Infof("EFSAccessPointTagsController sync finished") opSpec, _, _, err := c.commonClient.OperatorClient.GetOperatorState() if err != nil { @@ -83,15 +84,10 @@ func (c *EFSVolumeTagsController) Sync(ctx context.Context, syncCtx factory.Sync if err != nil { return err } - var infraRegion = "" - if infra.Status.PlatformStatus != nil && infra.Status.PlatformStatus.AWS != nil { - infraRegion = infra.Status.PlatformStatus.AWS.Region - } - efsClient, err := c.getEFSClient(ctx, infraRegion) - if err != nil { - return err + if infra == nil || infra.Status.PlatformStatus == nil || infra.Status.PlatformStatus.AWS == nil { + return nil } - err = c.processInfrastructure(ctx, infra, efsClient) + err = c.processInfrastructure(ctx, infra) if err != nil { return err } @@ -100,7 +96,7 @@ func (c *EFSVolumeTagsController) Sync(ctx context.Context, syncCtx factory.Sync } // getEFSClient retrieves AWS credentials from the secret and creates an AWS EFS client using session.Options -func (c *EFSVolumeTagsController) getEFSClient(ctx context.Context, awsRegion string) (*efs.EFS, error) { +func (c *EFSAccessPointTagsController) getEFSClient(ctx context.Context, awsRegion string) (*efs.EFS, error) { secret, err := c.getEFSCloudCredSecret(ctx) if err != nil { return nil, fmt.Errorf("error retrieving AWS credentials secret: %v", err) @@ -185,29 +181,16 @@ func writeCredentialsToTempFile(data []byte) (string, error) { } // getInfrastructure retrieves the Infrastructure resource in OpenShift -func (c *EFSVolumeTagsController) getInfrastructure(ctx context.Context) (*configv1.Infrastructure, error) { - backoff := wait.Backoff{ - Duration: operationDelay, - Factor: operationBackoffFactor, - Steps: operationRetryCount, +func (c *EFSAccessPointTagsController) getInfrastructure(ctx context.Context) (*configv1.Infrastructure, error) { + infra, err := c.commonClient.ConfigInformers.Config().V1().Infrastructures().Lister().Get(infrastructureName) + if err != nil { + klog.Errorf("error listing infrastructures objects: %v", err) + return nil, err } - infra := &configv1.Infrastructure{} - err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) { - var apiError error - infra, apiError = c.commonClient.ConfigInformers.Config().V1().Infrastructures().Lister().Get(infrastructureName) - if apiError != nil { - klog.Errorf("error listing infrastructures objects: %v", apiError) - return false, nil - } - if infra != nil { - return true, nil - } - return false, nil - }) return infra, err } -func (c *EFSVolumeTagsController) getEFSCloudCredSecret(ctx context.Context) (*v1.Secret, error) { +func (c *EFSAccessPointTagsController) getEFSCloudCredSecret(ctx context.Context) (*v1.Secret, error) { backoff := wait.Backoff{ Duration: operationDelay, Factor: operationBackoffFactor, @@ -231,11 +214,10 @@ func (c *EFSVolumeTagsController) getEFSCloudCredSecret(ctx context.Context) (*v } // processInfrastructure processes the Infrastructure resource and updates EFS tags -func (c *EFSVolumeTagsController) processInfrastructure(ctx context.Context, infra *configv1.Infrastructure, efsClient *efs.EFS) error { +func (c *EFSAccessPointTagsController) processInfrastructure(ctx context.Context, infra *configv1.Infrastructure) error { if infra.Status.PlatformStatus != nil && infra.Status.PlatformStatus.AWS != nil && infra.Status.PlatformStatus.AWS.ResourceTags != nil { - awsInfra := infra.Status.PlatformStatus.AWS - err := c.fetchPVsAndUpdateTags(ctx, awsInfra.ResourceTags, efsClient) + err := c.fetchPVsAndUpdateTags(ctx, infra) if err != nil { klog.Errorf("Error processing PVs for infrastructure update: %v", err) return err @@ -245,13 +227,13 @@ func (c *EFSVolumeTagsController) processInfrastructure(ctx context.Context, inf } // fetchPVsAndUpdateTags retrieves all PVs and updates the AWS EFS tags in batches of 100 -func (c *EFSVolumeTagsController) fetchPVsAndUpdateTags(ctx context.Context, resourceTags []configv1.AWSResourceTag, efsClient *efs.EFS) error { - pvs, err := c.listPersistentVolumesWithRetry(ctx) +func (c *EFSAccessPointTagsController) fetchPVsAndUpdateTags(ctx context.Context, infra *configv1.Infrastructure) error { + pvs, err := c.listPersistentVolumes() if err != nil { return fmt.Errorf("error fetching PVs: %v", err) } // Compute the hash for the new set of tags - newTagsHash := computeTagsHash(resourceTags) + newTagsHash := computeTagsHash(infra.Status.PlatformStatus.AWS.ResourceTags) pvsToBeUpdated := filterUpdatableVolumes(pvs, newTagsHash) // If there are no volumes to update, return early @@ -259,8 +241,16 @@ func (c *EFSVolumeTagsController) fetchPVsAndUpdateTags(ctx context.Context, res klog.Infof("No volume tags to update as hashes are unchanged") return nil } + var infraRegion = "" + if infra.Status.PlatformStatus != nil && infra.Status.PlatformStatus.AWS != nil { + infraRegion = infra.Status.PlatformStatus.AWS.Region + } + efsClient, err := c.getEFSClient(ctx, infraRegion) + if err != nil { + return err + } for _, volume := range pvsToBeUpdated { - err = c.updateEFSVolumeTags(volume, efsClient, resourceTags) + err = c.updateEFSVolumeTags(volume, efsClient, infra.Status.PlatformStatus.AWS.ResourceTags) if err != nil { klog.Errorf("Error updating volume %s tags: %v", volume.Name, err) c.eventRecorder.Warning("EFSVolumeTagsUpdateFailed", fmt.Sprintf("Failed to update tags for batch %v: %v", volume.Name, err.Error())) @@ -281,15 +271,14 @@ func (c *EFSVolumeTagsController) fetchPVsAndUpdateTags(ctx context.Context, res } // updateEFSTags updates the tags of an AWS EFS volume -func (c *EFSVolumeTagsController) updateEFSVolumeTags(pv *v1.PersistentVolume, efsClient *efs.EFS, resourceTags []configv1.AWSResourceTag) error { +func (c *EFSAccessPointTagsController) updateEFSVolumeTags(pv *v1.PersistentVolume, efsClient *efs.EFS, resourceTags []configv1.AWSResourceTag) error { - // Merge the existing tags with new resource tags - mergedTags := newAndUpdatedTags(resourceTags) + tags := newAndUpdatedTags(resourceTags) // Create or update the tags _, err := efsClient.TagResource(&efs.TagResourceInput{ - ResourceId: aws.String(pv.Spec.CSI.VolumeHandle), - Tags: mergedTags, + ResourceId: aws.String(parseAccessPointID(pv.Spec.CSI.VolumeHandle)), + Tags: tags, }) if err != nil { klog.Errorf("Error updating tags for PV %s: %v", pv.Spec.CSI.VolumeHandle, err) @@ -298,7 +287,7 @@ func (c *EFSVolumeTagsController) updateEFSVolumeTags(pv *v1.PersistentVolume, e return nil } -func (c *EFSVolumeTagsController) updateVolumeWithRetry(ctx context.Context, pv *v1.PersistentVolume) error { +func (c *EFSAccessPointTagsController) updateVolumeWithRetry(ctx context.Context, pv *v1.PersistentVolume) error { backoff := wait.Backoff{ Duration: operationDelay, Factor: operationBackoffFactor, @@ -316,23 +305,13 @@ func (c *EFSVolumeTagsController) updateVolumeWithRetry(ctx context.Context, pv return err } -func (c *EFSVolumeTagsController) listPersistentVolumesWithRetry(ctx context.Context) ([]*v1.PersistentVolume, error) { - backoff := wait.Backoff{ - Duration: operationDelay, - Factor: operationBackoffFactor, - Steps: operationRetryCount, +func (c *EFSAccessPointTagsController) listPersistentVolumes() ([]*v1.PersistentVolume, error) { + pvList, err := c.commonClient.KubeInformers.InformersFor("").Core().V1().PersistentVolumes().Lister().List(labels.Everything()) + if err != nil { + klog.Errorf("error listing volumes objects: %v", err) + return nil, err } - pvList := []*v1.PersistentVolume{} - err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) { - var apiError error - pvList, apiError = c.commonClient.KubeInformers.InformersFor("").Core().V1().PersistentVolumes().Lister().List(labels.Everything()) - if apiError != nil { - klog.Errorf("error listing volumes objects: %v", apiError) - return false, nil - } - return true, nil - }) - return pvList, err + return pvList, nil } // newAndUpdatedTags adds and update existing AWS tags with new resource tags from OpenShift infrastructure @@ -351,7 +330,8 @@ func newAndUpdatedTags(resourceTags []configv1.AWSResourceTag) []*efs.Tag { func filterUpdatableVolumes(volumes []*v1.PersistentVolume, newTagsHash string) []*v1.PersistentVolume { var pvsToBeUpdated = make([]*v1.PersistentVolume, 0) for _, volume := range volumes { - if volume.Spec.CSI != nil && volume.Spec.CSI.Driver == efsDriverName { + if volume.Spec.CSI != nil && volume.Spec.CSI.Driver == efsDriverName && + parseAccessPointID(volume.Spec.CSI.VolumeHandle) != "" { existingHash := getPVTagHash(volume) if existingHash == "" || existingHash != newTagsHash { pvsToBeUpdated = append(pvsToBeUpdated, volume) @@ -400,3 +380,17 @@ func computeTagsHash(resourceTags []configv1.AWSResourceTag) string { hash := sha256.Sum256([]byte(tagsString)) return hex.EncodeToString(hash[:]) } + +// parseAccessPointID checks if an Access Point ID is present in the input string. +// It returns the Access Point ID if present, or an empty string if not. +func parseAccessPointID(input string) string { + // Split the input string by "::" delimiter + parts := strings.Split(input, "::") + + // Check if there's an Access Point ID after "::" + if len(parts) == 2 && strings.HasPrefix(parts[1], "fsap-") { + return parts[1] + } + // Return an empty string if no Access Point ID is found + return "" +}