Skip to content

Commit

Permalink
fix: cache ec2nodeclass validation state (#7803)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal authored Feb 27, 2025
1 parent a91eeb7 commit 491e48a
Show file tree
Hide file tree
Showing 15 changed files with 278 additions and 81 deletions.
1 change: 1 addition & 0 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func main() {
op.EventRecorder,
op.UnavailableOfferingsCache,
op.SSMCache,
op.ValidationCache,
cloudProvider,
op.SubnetProvider,
op.SecurityGroupProvider,
Expand Down
6 changes: 3 additions & 3 deletions pkg/cloudprovider/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,7 @@ var _ = Describe("CloudProvider", func() {
{SubnetId: aws.String("test-subnet-2"), AvailabilityZone: aws.String("test-zone-1a"), AvailabilityZoneId: aws.String("tstz1-1a"), AvailableIpAddressCount: aws.Int32(100),
Tags: []ec2types.Tag{{Key: aws.String("Name"), Value: aws.String("test-subnet-2")}}},
}})
controller := nodeclass.NewController(ctx, awsEnv.Clock, env.Client, recorder, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider, awsEnv.CapacityReservationProvider, awsEnv.EC2API)
controller := nodeclass.NewController(ctx, awsEnv.Clock, env.Client, recorder, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider, awsEnv.CapacityReservationProvider, awsEnv.EC2API, awsEnv.ValidationCache)
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
ExpectObjectReconciled(ctx, env.Client, controller, nodeClass)
pod := coretest.UnschedulablePod(coretest.PodOptions{NodeSelector: map[string]string{corev1.LabelTopologyZone: "test-zone-1a"}})
Expand All @@ -1203,7 +1203,7 @@ var _ = Describe("CloudProvider", func() {
{SubnetId: aws.String("test-subnet-2"), AvailabilityZone: aws.String("test-zone-1a"), AvailabilityZoneId: aws.String("tstz1-1a"), AvailableIpAddressCount: aws.Int32(11),
Tags: []ec2types.Tag{{Key: aws.String("Name"), Value: aws.String("test-subnet-2")}}},
}})
controller := nodeclass.NewController(ctx, awsEnv.Clock, env.Client, recorder, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider, awsEnv.CapacityReservationProvider, awsEnv.EC2API)
controller := nodeclass.NewController(ctx, awsEnv.Clock, env.Client, recorder, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider, awsEnv.CapacityReservationProvider, awsEnv.EC2API, awsEnv.ValidationCache)
nodeClass.Spec.Kubelet = &v1.KubeletConfiguration{
MaxPods: aws.Int32(1),
}
Expand Down Expand Up @@ -1244,7 +1244,7 @@ var _ = Describe("CloudProvider", func() {
}})
nodeClass.Spec.SubnetSelectorTerms = []v1.SubnetSelectorTerm{{Tags: map[string]string{"Name": "test-subnet-1"}}}
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
controller := nodeclass.NewController(ctx, awsEnv.Clock, env.Client, recorder, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider, awsEnv.CapacityReservationProvider, awsEnv.EC2API)
controller := nodeclass.NewController(ctx, awsEnv.Clock, env.Client, recorder, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider, awsEnv.CapacityReservationProvider, awsEnv.EC2API, awsEnv.ValidationCache)
ExpectObjectReconciled(ctx, env.Client, controller, nodeClass)
podSubnet1 := coretest.UnschedulablePod()
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, podSubnet1)
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func NewControllers(
recorder events.Recorder,
unavailableOfferings *awscache.UnavailableOfferings,
ssmCache *cache.Cache,
validationCache *cache.Cache,
cloudProvider cloudprovider.CloudProvider,
subnetProvider subnet.Provider,
securityGroupProvider securitygroup.Provider,
Expand All @@ -85,7 +86,7 @@ func NewControllers(
) []controller.Controller {
controllers := []controller.Controller{
nodeclasshash.NewController(kubeClient),
nodeclass.NewController(ctx, clk, kubeClient, recorder, subnetProvider, securityGroupProvider, amiProvider, instanceProfileProvider, launchTemplateProvider, capacityReservationProvider, ec2api),
nodeclass.NewController(ctx, clk, kubeClient, recorder, subnetProvider, securityGroupProvider, amiProvider, instanceProfileProvider, launchTemplateProvider, capacityReservationProvider, ec2api, validationCache),
nodeclaimgarbagecollection.NewController(kubeClient, cloudProvider),
nodeclaimtagging.NewController(kubeClient, cloudProvider, instanceProvider),
controllerspricing.NewController(pricingProvider),
Expand Down
12 changes: 7 additions & 5 deletions pkg/controllers/nodeclass/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
"sigs.k8s.io/karpenter/pkg/utils/result"

"github.com/patrickmn/go-cache"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -75,6 +76,7 @@ func NewController(
launchTemplateProvider launchtemplate.Provider,
capacityReservationProvider capacityreservation.Provider,
ec2api sdk.EC2API,
validationCache *cache.Cache,
) *Controller {
return &Controller{
kubeClient: kubeClient,
Expand All @@ -84,11 +86,11 @@ func NewController(
reconcilers: []reconcile.TypedReconciler[*v1.EC2NodeClass]{
NewAMIReconciler(amiProvider),
NewCapacityReservationReconciler(clk, capacityReservationProvider),
&Subnet{subnetProvider: subnetProvider},
&SecurityGroup{securityGroupProvider: securityGroupProvider},
&InstanceProfile{instanceProfileProvider: instanceProfileProvider},
&Validation{ec2api: ec2api, amiProvider: amiProvider},
&Readiness{launchTemplateProvider: launchTemplateProvider},
NewSubnetReconciler(subnetProvider),
NewSecurityGroupReconciler(securityGroupProvider),
NewInstanceProfileReconciler(instanceProfileProvider),
NewValidationReconciler(ec2api, amiProvider, validationCache),
NewReadinessReconciler(launchTemplateProvider),
},
}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/nodeclass/instanceprofile.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ type InstanceProfile struct {
instanceProfileProvider instanceprofile.Provider
}

func NewInstanceProfileReconciler(instanceProfileProvider instanceprofile.Provider) *InstanceProfile {
return &InstanceProfile{
instanceProfileProvider: instanceProfileProvider,
}
}

func (ip *InstanceProfile) Reconcile(ctx context.Context, nodeClass *v1.EC2NodeClass) (reconcile.Result, error) {
if nodeClass.Spec.Role != "" {
name, err := ip.instanceProfileProvider.Create(ctx, nodeClass)
Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/nodeclass/readiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ type Readiness struct {
launchTemplateProvider launchtemplate.Provider
}

func NewReadinessReconciler(launchTemplateProvider launchtemplate.Provider) *Readiness {
return &Readiness{
launchTemplateProvider: launchTemplateProvider,
}
}

func (n Readiness) Reconcile(ctx context.Context, nodeClass *v1.EC2NodeClass) (reconcile.Result, error) {
// A NodeClass that uses AL2023 requires the cluster CIDR for launching nodes.
// To allow Karpenter to be used for Non-EKS clusters, resolving the Cluster CIDR
Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/nodeclass/securitygroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ type SecurityGroup struct {
securityGroupProvider securitygroup.Provider
}

func NewSecurityGroupReconciler(securityGroupProvider securitygroup.Provider) *SecurityGroup {
return &SecurityGroup{
securityGroupProvider: securityGroupProvider,
}
}

func (sg *SecurityGroup) Reconcile(ctx context.Context, nodeClass *v1.EC2NodeClass) (reconcile.Result, error) {
securityGroups, err := sg.securityGroupProvider.List(ctx, nodeClass)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/nodeclass/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ type Subnet struct {
subnetProvider subnet.Provider
}

func NewSubnetReconciler(subnetProvider subnet.Provider) *Subnet {
return &Subnet{
subnetProvider: subnetProvider,
}
}

func (s *Subnet) Reconcile(ctx context.Context, nodeClass *v1.EC2NodeClass) (reconcile.Result, error) {
subnets, err := s.subnetProvider.List(ctx, nodeClass)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/nodeclass/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ var _ = BeforeSuite(func() {
awsEnv.LaunchTemplateProvider,
awsEnv.CapacityReservationProvider,
awsEnv.EC2API,
awsEnv.ValidationCache,
)
})

Expand Down
183 changes: 145 additions & 38 deletions pkg/controllers/nodeclass/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"

"github.com/mitchellh/hashstructure/v2"
"github.com/patrickmn/go-cache"
"github.com/samber/lo"

"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -39,33 +41,60 @@ import (
"github.com/aws/karpenter-provider-aws/pkg/utils"
)

type Validation struct {
ec2api sdk.EC2API
const (
ConditionReasonCreateFleetAuthFailed = "CreateFleetAuthCheckFailed"
ConditionReasonCreateLaunchTemplateAuthFailed = "CreateLaunchTemplateAuthCheckFailed"
ConditionReasonRunInstancesAuthFailed = "RunInstancesAuthCheckFailed"
ConditionReasonDependenciesNotReady = "DependenciesNotReady"
ConditionReasonTagValidationFailed = "TagValidationFailed"
)

var ValidationConditionMessages = map[string]string{
ConditionReasonCreateFleetAuthFailed: "Controller isn't authorized to call ec2:CreateFleet",
ConditionReasonCreateLaunchTemplateAuthFailed: "Controller isn't authorized to call ec2:CreateLaunchTemplate",
ConditionReasonRunInstancesAuthFailed: "Controller isn't authorized to call ec2:RunInstances",
}

type Validation struct {
ec2api sdk.EC2API
amiProvider amifamily.Provider
cache *cache.Cache
}

// nolint:gocyclo
func (n Validation) Reconcile(ctx context.Context, nodeClass *v1.EC2NodeClass) (reconcile.Result, error) {
// Tag Validation
if offendingTag, found := lo.FindKeyBy(nodeClass.Spec.Tags, func(k string, v string) bool {
for _, exp := range v1.RestrictedTagPatterns {
if exp.MatchString(k) {
return true
}
}
return false
}); found {
nodeClass.StatusConditions().SetFalse(v1.ConditionTypeValidationSucceeded, "TagValidationFailed",
fmt.Sprintf("%q tag does not pass tag validation requirements", offendingTag))
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("%q tag does not pass tag validation requirements", offendingTag))
func NewValidationReconciler(ec2api sdk.EC2API, amiProvider amifamily.Provider, cache *cache.Cache) *Validation {
return &Validation{
ec2api: ec2api,
amiProvider: amiProvider,
cache: cache,
}
// Auth Validation
if !nodeClass.StatusConditions().Get(v1.ConditionTypeSecurityGroupsReady).IsTrue() || !nodeClass.StatusConditions().Get(v1.ConditionTypeAMIsReady).IsTrue() || !nodeClass.StatusConditions().Get(v1.ConditionTypeInstanceProfileReady).IsTrue() || !nodeClass.StatusConditions().Get(v1.ConditionTypeSubnetsReady).IsTrue() {
nodeClass.StatusConditions().SetFalse(v1.ConditionTypeValidationSucceeded, "DependenciesNotReady", "Waiting for SecurityGroups, AMIs, Subnets and InstanceProfiles to go true")
}

// nolint:gocyclo
func (v *Validation) Reconcile(ctx context.Context, nodeClass *v1.EC2NodeClass) (reconcile.Result, error) {
if _, ok := lo.Find(v.requiredConditions(), func(cond string) bool {
return nodeClass.StatusConditions().Get(cond).IsFalse()
}); ok {
// If any of the required status conditions are false, we know validation will fail regardless of the other values.
nodeClass.StatusConditions().SetFalse(
v1.ConditionTypeValidationSucceeded,
ConditionReasonDependenciesNotReady,
"Awaiting AMI, Instance Profile, Security Group, and Subnet resolution",
)
return reconcile.Result{}, nil
}
if _, ok := lo.Find(v.requiredConditions(), func(cond string) bool {
return nodeClass.StatusConditions().Get(cond).IsUnknown()
}); ok {
// If none of the status conditions are false, but at least one is unknown, we should also consider the validation
// state to be unknown. Once all required conditions collapse to a true or false state, we can test validation.
nodeClass.StatusConditions().SetUnknownWithReason(
v1.ConditionTypeValidationSucceeded,
ConditionReasonDependenciesNotReady,
"Awaiting AMI, Instance Profile, Security Group, and Subnet resolution",
)
return reconcile.Result{}, nil
}

nodeClaim := &karpv1.NodeClaim{
Spec: karpv1.NodeClaimSpec{
NodeClassRef: &karpv1.NodeClassReference{
Expand All @@ -75,38 +104,95 @@ func (n Validation) Reconcile(ctx context.Context, nodeClass *v1.EC2NodeClass) (
}
tags, err := utils.GetTags(nodeClass, nodeClaim, options.FromContext(ctx).ClusterName)
if err != nil {
return reconcile.Result{}, fmt.Errorf("getting tags, %w", err)
nodeClass.StatusConditions().SetFalse(v1.ConditionTypeValidationSucceeded, ConditionReasonTagValidationFailed, err.Error())
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("validating tags, %w", err))
}

if val, ok := v.cache.Get(v.cacheKey(nodeClass, tags)); ok {
// We still update the status condition even if it's cached since we may have had a conflict error previously
if val == "" {
nodeClass.StatusConditions().SetTrue(v1.ConditionTypeValidationSucceeded)
} else {
nodeClass.StatusConditions().SetFalse(
v1.ConditionTypeValidationSucceeded,
val.(string),
ValidationConditionMessages[val.(string)],
)
}
return reconcile.Result{}, nil
}
for _, isValid := range []validatorFunc{
v.validateCreateFleetAuthorization,
v.validateCreateLaunchTemplateAuthorization,
v.validateRunInstancesAuthorization,
} {
if failureReason, err := isValid(ctx, nodeClass, nodeClaim, tags); err != nil {
return reconcile.Result{}, err
} else if failureReason != "" {
v.cache.SetDefault(v.cacheKey(nodeClass, tags), failureReason)
nodeClass.StatusConditions().SetFalse(
v1.ConditionTypeValidationSucceeded,
failureReason,
ValidationConditionMessages[failureReason],
)
return reconcile.Result{}, nil
}
}

v.cache.SetDefault(v.cacheKey(nodeClass, tags), "")
nodeClass.StatusConditions().SetTrue(v1.ConditionTypeValidationSucceeded)
return reconcile.Result{}, nil
}

type validatorFunc func(context.Context, *v1.EC2NodeClass, *karpv1.NodeClaim, map[string]string) (string, error)

func (v *Validation) validateCreateFleetAuthorization(
ctx context.Context,
nodeClass *v1.EC2NodeClass,
_ *karpv1.NodeClaim,
tags map[string]string,
) (reason string, err error) {
createFleetInput := instance.GetCreateFleetInput(nodeClass, string(karpv1.CapacityTypeOnDemand), tags, mockLaunchTemplateConfig())
createFleetInput.DryRun = aws.Bool(true)

if _, err := n.ec2api.CreateFleet(ctx, createFleetInput); awserrors.IgnoreDryRunError(err) != nil {
if _, err := v.ec2api.CreateFleet(ctx, createFleetInput); awserrors.IgnoreDryRunError(err) != nil {
if awserrors.IgnoreUnauthorizedOperationError(err) != nil {
// Dry run should only ever return UnauthorizedOperation or DryRunOperation so if we receive any other error
// it would be an unexpected state
return reconcile.Result{}, fmt.Errorf("unexpected error during CreateFleet validation: %w", err)
return "", fmt.Errorf("validating ec2:CreateFleet authorization, %w", err)
}
nodeClass.StatusConditions().SetFalse(v1.ConditionTypeValidationSucceeded, "CreateFleetAuthCheckFailed", "Controller isn't authorized to call CreateFleet")
return reconcile.Result{}, nil
return ConditionReasonCreateFleetAuthFailed, nil
}
return "", nil
}

func (v *Validation) validateCreateLaunchTemplateAuthorization(
ctx context.Context,
nodeClass *v1.EC2NodeClass,
nodeClaim *karpv1.NodeClaim,
tags map[string]string,
) (reason string, err error) {
createLaunchTemplateInput := launchtemplate.GetCreateLaunchTemplateInput(ctx, mockOptions(*nodeClaim, nodeClass, tags), corev1.IPv4Protocol, "")
createLaunchTemplateInput.DryRun = aws.Bool(true)

if _, err := n.ec2api.CreateLaunchTemplate(ctx, createLaunchTemplateInput); awserrors.IgnoreDryRunError(err) != nil {
if _, err := v.ec2api.CreateLaunchTemplate(ctx, createLaunchTemplateInput); awserrors.IgnoreDryRunError(err) != nil {
if awserrors.IgnoreUnauthorizedOperationError(err) != nil {
// Dry run should only ever return UnauthorizedOperation or DryRunOperation so if we receive any other error
// it would be an unexpected state
return reconcile.Result{}, fmt.Errorf("unexpected error during CreateLaunchTemplate validation: %w", err)
return "", fmt.Errorf("validating ec2:CreateLaunchTemplates authorization, %w", err)
}
nodeClass.StatusConditions().SetFalse(v1.ConditionTypeValidationSucceeded, "CreateLaunchTemplateAuthCheckFailed", "Controller isn't authorized to call CreateLaunchTemplate")
return reconcile.Result{}, nil
return ConditionReasonCreateLaunchTemplateAuthFailed, nil
}
return "", nil
}

// This should never occur as AMIs should already be resolved during the AMI resolution phase
func (v *Validation) validateRunInstancesAuthorization(
ctx context.Context,
nodeClass *v1.EC2NodeClass,
nodeClaim *karpv1.NodeClaim,
tags map[string]string,
) (reason string, err error) {
// NOTE: Since we've already validated the AMI status condition is true, this should never occur
if len(nodeClass.Status.AMIs) == 0 {
return reconcile.Result{}, fmt.Errorf("no resolved AMIs in status: %w", err)
return "", fmt.Errorf("no resolved amis in status")
}

var instanceType ec2types.InstanceType
Expand Down Expand Up @@ -148,17 +234,37 @@ func (n Validation) Reconcile(ctx context.Context, nodeClass *v1.EC2NodeClass) (
ImageId: lo.ToPtr(nodeClass.Status.AMIs[0].ID),
}

if _, err = n.ec2api.RunInstances(ctx, runInstancesInput); awserrors.IgnoreDryRunError(err) != nil {
if _, err := v.ec2api.RunInstances(ctx, runInstancesInput); awserrors.IgnoreDryRunError(err) != nil {
if awserrors.IgnoreUnauthorizedOperationError(err) != nil {
// Dry run should only ever return UnauthorizedOperation or DryRunOperation so if we receive any other error
// it would be an unexpected state
return reconcile.Result{}, fmt.Errorf("unexpected error during RunInstances validation: %w", err)
return "", fmt.Errorf("validating ec2:RunInstances authorization, %w", err)
}
nodeClass.StatusConditions().SetFalse(v1.ConditionTypeValidationSucceeded, "RunInstancesAuthCheckFailed", "Controller isn't authorized to call RunInstances")
return reconcile.Result{}, nil
return ConditionReasonRunInstancesAuthFailed, nil
}
nodeClass.StatusConditions().SetTrue(v1.ConditionTypeValidationSucceeded)
return reconcile.Result{}, nil
return "", nil
}

func (*Validation) requiredConditions() []string {
return []string{
v1.ConditionTypeAMIsReady,
v1.ConditionTypeInstanceProfileReady,
v1.ConditionTypeSecurityGroupsReady,
v1.ConditionTypeSubnetsReady,
}
}

func (*Validation) cacheKey(nodeClass *v1.EC2NodeClass, tags map[string]string) string {
hash := lo.Must(hashstructure.Hash([]interface{}{
nodeClass.Status.Subnets,
nodeClass.Status.SecurityGroups,
nodeClass.Status.AMIs,
nodeClass.Status.InstanceProfile,
nodeClass.Spec.MetadataOptions,
nodeClass.Spec.BlockDeviceMappings,
tags,
}, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true}))
return fmt.Sprintf("%s-%016x", nodeClass.Name, hash)
}

func mockLaunchTemplateConfig() []ec2types.FleetLaunchTemplateConfigRequest {
Expand All @@ -182,6 +288,7 @@ func mockLaunchTemplateConfig() []ec2types.FleetLaunchTemplateConfigRequest {
},
}
}

func mockOptions(nodeClaim karpv1.NodeClaim, nodeClass *v1.EC2NodeClass, tags map[string]string) *amifamily.LaunchTemplate {
return &amifamily.LaunchTemplate{
Options: &amifamily.Options{
Expand Down
Loading

0 comments on commit 491e48a

Please sign in to comment.