Skip to content

Commit

Permalink
Added New Retry worker
Browse files Browse the repository at this point in the history
  • Loading branch information
anirudhAgniRedhat committed Dec 16, 2024
1 parent 13d121b commit 6c87a0e
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 62 deletions.
104 changes: 42 additions & 62 deletions pkg/driver/aws-efs/aws_efs_tags_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -35,19 +35,14 @@ const (
efsDriverName = "efs.csi.aws.com"
tagHashAnnotationKey = "efs.openshift.io/access-point-tags-hash"
infrastructureName = "cluster"

operationDelay = 2 * time.Second
operationBackoffFactor = 1.2
operationRetryCount = 5

awsTagsRequestRateLimit = 10
awsTagsRequestBurstSize = 1
)

type EFSAccessPointTagsController struct {
name string
commonClient *clients.Clients
eventRecorder events.Recorder
failedQueue workqueue.TypedRateLimitingInterface[string]
rateLimiter *rate.Limiter
}

func NewEFSAccessPointTagsController(
Expand All @@ -59,13 +54,15 @@ func NewEFSAccessPointTagsController(
name: name,
commonClient: commonClient,
eventRecorder: eventRecorder,
failedQueue: workqueue.NewTypedRateLimitingQueue[string](workqueue.NewTypedItemExponentialFailureRateLimiter[string](10*time.Second, 100*time.Hour)),
rateLimiter: rate.NewLimiter(rate.Limit(10), 100),
}
return factory.New().WithSync(
c.Sync,
).ResyncEvery(
20*time.Minute,
).WithInformers(
c.commonClient.ConfigInformers.Config().V1().Infrastructures().Informer(),
).WithPostStartHooks(
c.startFailedQueueWorker,
).ToController(
name,
eventRecorder,
Expand All @@ -84,7 +81,7 @@ func (c *EFSAccessPointTagsController) Sync(ctx context.Context, syncCtx factory
return nil
}

infra, err := c.getInfrastructure(ctx)
infra, err := c.getInfrastructure()
if err != nil {
return err
}
Expand Down Expand Up @@ -185,7 +182,7 @@ func writeCredentialsToTempFile(data []byte) (string, error) {
}

// getInfrastructure retrieves the Infrastructure resource in OpenShift
func (c *EFSAccessPointTagsController) getInfrastructure(ctx context.Context) (*configv1.Infrastructure, error) {
func (c *EFSAccessPointTagsController) getInfrastructure() (*configv1.Infrastructure, error) {
infra, err := c.commonClient.ConfigInformers.Config().V1().Infrastructures().Lister().Get(infrastructureName)
if err != nil {
klog.Errorf("error listing infrastructures objects: %v", err)
Expand All @@ -195,26 +192,12 @@ func (c *EFSAccessPointTagsController) getInfrastructure(ctx context.Context) (*
}

func (c *EFSAccessPointTagsController) getEFSCloudCredSecret(ctx context.Context) (*v1.Secret, error) {
backoff := wait.Backoff{
Duration: operationDelay,
Factor: operationBackoffFactor,
Steps: operationRetryCount,
}
var awsCreds *v1.Secret
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) {
var apiError error
awsCreds, apiError = c.commonClient.KubeClient.CoreV1().Secrets(awsEFSSecretNamespace).Get(ctx, awsEFSSecretName, metav1.GetOptions{})
if apiError != nil {
klog.Errorf("error getting secret object: %v", apiError)
return false, nil
}
if awsCreds != nil {
return true, nil
}
return false, nil
})
return awsCreds, err

awsCreds, err := c.commonClient.KubeClient.CoreV1().Secrets(awsEFSSecretNamespace).Get(ctx, awsEFSSecretName, metav1.GetOptions{})
if err != nil {
klog.Errorf("error getting secret object: %v", err)
return nil, err
}
return awsCreds, nil
}

// processInfrastructure processes the Infrastructure resource and updates EFS tags
Expand All @@ -238,7 +221,7 @@ func (c *EFSAccessPointTagsController) fetchPVsAndUpdateTags(ctx context.Context
}
// Compute the hash for the new set of tags
newTagsHash := computeTagsHash(infra.Status.PlatformStatus.AWS.ResourceTags)
pvsToBeUpdated := filterUpdatableVolumes(pvs, newTagsHash)
pvsToBeUpdated := c.filterUpdatableVolumes(pvs, newTagsHash)

// If there are no volumes to update, return early
if len(pvsToBeUpdated) == 0 {
Expand All @@ -254,28 +237,22 @@ func (c *EFSAccessPointTagsController) fetchPVsAndUpdateTags(ctx context.Context
return err
}

limiter := rate.NewLimiter(rate.Limit(awsTagsRequestRateLimit), awsTagsRequestBurstSize)

for _, volume := range pvsToBeUpdated {
err = limiter.Wait(context.Background())
if err != nil {
klog.Errorf("Error waiting for rate limiter: %v", err)
return err
}

err = c.updateEFSAccessPointTags(volume, efsClient, infra.Status.PlatformStatus.AWS.ResourceTags)
err = c.updateEFSAccessPointTags(ctx, volume, efsClient, infra.Status.PlatformStatus.AWS.ResourceTags)
if err != nil {
klog.Errorf("Error updating volume's AccessPoint %s tags: %v", volume.Name, err)
c.eventRecorder.Warning("EFSAccessPointTagsUpdateFailed", fmt.Sprintf("Failed to update tags for batch %v: %v", volume.Name, err.Error()))
c.failedQueue.AddRateLimited(volume.Name)
continue
}
// Set the new tag hash annotation in the PV object
updatedPv := setPVTagHash(volume, newTagsHash)

// Update the PV with the new annotations
err = c.updateVolumeWithRetry(ctx, updatedPv)
err = c.updateVolume(ctx, updatedPv)
if err != nil {
klog.Errorf("Error updating PV annotations for volume %s: %v", volume.Name, err)
c.failedQueue.AddRateLimited(volume.Name)
continue
}
klog.Infof("Successfully updated PV annotations and access points tags for volume %s", volume.Name)
Expand All @@ -284,12 +261,18 @@ func (c *EFSAccessPointTagsController) fetchPVsAndUpdateTags(ctx context.Context
}

// updateEFSTags updates the tags of an AWS EFS Access Points
func (c *EFSAccessPointTagsController) updateEFSAccessPointTags(pv *v1.PersistentVolume, efsClient *efs.EFS, resourceTags []configv1.AWSResourceTag) error {
func (c *EFSAccessPointTagsController) updateEFSAccessPointTags(ctx context.Context, pv *v1.PersistentVolume, efsClient *efs.EFS, resourceTags []configv1.AWSResourceTag) error {

err := c.rateLimiter.Wait(ctx)
if err != nil {
klog.Errorf("Error waiting for rate limiter: %v", err)
return err
}

tags := newAndUpdatedTags(resourceTags)

// Create or update the tags
_, err := efsClient.TagResource(&efs.TagResourceInput{
_, err = efsClient.TagResource(&efs.TagResourceInput{
ResourceId: aws.String(parseAccessPointID(pv.Spec.CSI.VolumeHandle)),
Tags: tags,
})
Expand All @@ -300,21 +283,12 @@ func (c *EFSAccessPointTagsController) updateEFSAccessPointTags(pv *v1.Persisten
return nil
}

func (c *EFSAccessPointTagsController) updateVolumeWithRetry(ctx context.Context, pv *v1.PersistentVolume) error {
backoff := wait.Backoff{
Duration: operationDelay,
Factor: operationBackoffFactor,
Steps: operationRetryCount,
}
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) {
var apiError error
_, apiError = c.commonClient.KubeClient.CoreV1().PersistentVolumes().Update(ctx, pv, metav1.UpdateOptions{})
if apiError != nil {
klog.Errorf("error updating volume object %s: %v", pv.Name, apiError)
return false, nil
}
return true, nil
})
func (c *EFSAccessPointTagsController) updateVolume(ctx context.Context, pv *v1.PersistentVolume) error {
_, err := c.commonClient.KubeClient.CoreV1().PersistentVolumes().Update(ctx, pv, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Error updating PV %s: %v", pv.Name, err)
return err
}
return err
}

Expand All @@ -340,11 +314,11 @@ func newAndUpdatedTags(resourceTags []configv1.AWSResourceTag) []*efs.Tag {
return tags
}

func filterUpdatableVolumes(volumes []*v1.PersistentVolume, newTagsHash string) []*v1.PersistentVolume {
func (c *EFSAccessPointTagsController) filterUpdatableVolumes(volumes []*v1.PersistentVolume, newTagsHash string) []*v1.PersistentVolume {
var pvsToBeUpdated = make([]*v1.PersistentVolume, 0)
for _, volume := range volumes {
if volume.Spec.CSI != nil && volume.Spec.CSI.Driver == efsDriverName &&
parseAccessPointID(volume.Spec.CSI.VolumeHandle) != "" {
parseAccessPointID(volume.Spec.CSI.VolumeHandle) != "" && !c.isVolumeInFailedQueue(volume.Name) {
existingHash := getPVTagHash(volume)
if existingHash == "" || existingHash != newTagsHash {
pvsToBeUpdated = append(pvsToBeUpdated, volume)
Expand All @@ -354,6 +328,12 @@ func filterUpdatableVolumes(volumes []*v1.PersistentVolume, newTagsHash string)
return pvsToBeUpdated
}

// isVolumeInFailedQueue checks if a volume name is currently in the failed queue
func (c *EFSAccessPointTagsController) isVolumeInFailedQueue(volumeName string) bool {
// Check if the volume name is in the failed queue
return c.failedQueue.NumRequeues(volumeName) > 0
}

// setPVTagHash stores the hash in the PV annotations.
func setPVTagHash(pv *v1.PersistentVolume, hash string) *v1.PersistentVolume {

Expand Down
117 changes: 117 additions & 0 deletions pkg/driver/aws-efs/aws_efs_tags_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package aws_efs

import (
"crypto/sha256"
"encoding/hex"
"reflect"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/efs"
configv1 "github.com/openshift/api/config/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestNewAndUpdatedTags(t *testing.T) {
resourceTags := []configv1.AWSResourceTag{
{Key: "env", Value: "prod"},
{Key: "app", Value: "myapp"},
}

expectedTags := []*efs.Tag{
{Key: aws.String("env"), Value: aws.String("prod")},
{Key: aws.String("app"), Value: aws.String("myapp")},
}

tags := newAndUpdatedTags(resourceTags)
if !reflect.DeepEqual(tags, expectedTags) {
t.Errorf("Expected %v, but got %v", expectedTags, tags)
}
}

func TestComputeTagsHash(t *testing.T) {
resourceTags := []configv1.AWSResourceTag{
{Key: "env", Value: "prod"},
{Key: "app", Value: "myapp"},
}

// Expected hash is deterministic and sorted
concatenated := "app=myapp;env=prod;"
hash := sha256.Sum256([]byte(concatenated))
expectedHash := hex.EncodeToString(hash[:])

computedHash := computeTagsHash(resourceTags)
if computedHash != expectedHash {
t.Errorf("Expected hash %v, but got %v", expectedHash, computedHash)
}
}

func TestFilterUpdatableVolumes(t *testing.T) {
pvs := []*v1.PersistentVolume{
{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
tagHashAnnotationKey: "oldHash",
},
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
CSI: &v1.CSIPersistentVolumeSource{
Driver: efsDriverName,
VolumeHandle: "fs-abcd::fsap-abcd",
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
tagHashAnnotationKey: "newHash",
},
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
CSI: &v1.CSIPersistentVolumeSource{
Driver: efsDriverName,
VolumeHandle: "fs-abcd::fsap-abcd",
},
},
},
},
}

newTagsHash := "newHash"
updatable := filterUpdatableVolumes(pvs, newTagsHash)

if len(updatable) != 1 {
t.Errorf("Expected 1 updatable volume, but got %v", len(updatable))
}
if updatable[0].Annotations[tagHashAnnotationKey] != "oldHash" {
t.Errorf("Expected annotation 'oldHash', but got %v", updatable[0].Annotations[tagHashAnnotationKey])
}
}

func TestParseAccessPointID(t *testing.T) {
volumeHandle := "fs-0123456789abcdef/12345678"
expectedID := "12345678"
accessPointID := parseAccessPointID(volumeHandle)
if accessPointID != expectedID {
t.Errorf("Expected access point ID %v, but got %v", expectedID, accessPointID)
}
}

func TestSetPVTagHash(t *testing.T) {
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{},
},
}

newHash := "newHash"
updatedPV := setPVTagHash(pv, newHash)

if updatedPV.Annotations[tagHashAnnotationKey] != newHash {
t.Errorf("Expected annotation %v, but got %v", newHash, updatedPV.Annotations[tagHashAnnotationKey])
}
}
Loading

0 comments on commit 6c87a0e

Please sign in to comment.