diff --git a/charts/karpenter-crd/templates/karpenter.k8s.aws_ec2nodeclasses.yaml b/charts/karpenter-crd/templates/karpenter.k8s.aws_ec2nodeclasses.yaml index 6021dac17fde..0289a71a179a 100644 --- a/charts/karpenter-crd/templates/karpenter.k8s.aws_ec2nodeclasses.yaml +++ b/charts/karpenter-crd/templates/karpenter.k8s.aws_ec2nodeclasses.yaml @@ -247,7 +247,7 @@ spec: properties: id: description: ID is the capacity reservation id in EC2 - pattern: ^cr-.+$ + pattern: ^cr-[0-9a-z]+$ type: string ownerID: description: Owner is the owner id for the ami. diff --git a/go.mod b/go.mod index 8574120a5834..95e05a2b4e0d 100644 --- a/go.mod +++ b/go.mod @@ -120,4 +120,4 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect ) -replace sigs.k8s.io/karpenter => github.com/jmdeal/karpenter v0.0.0-20250221104820-4c25410338d8 +replace sigs.k8s.io/karpenter => github.com/jmdeal/karpenter v0.0.0-20250225003856-d34d71584c1a diff --git a/go.sum b/go.sum index 61189aeaed90..2ebb3e64c252 100644 --- a/go.sum +++ b/go.sum @@ -116,8 +116,8 @@ github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= -github.com/jmdeal/karpenter v0.0.0-20250221104820-4c25410338d8 h1:K89kW02bTZkegQnJPlOHSTt+a7WXGQOfrt+pP7lBJos= -github.com/jmdeal/karpenter v0.0.0-20250221104820-4c25410338d8/go.mod h1:/FgjYrt+hwAMcvY46hku76st/aeP4KjOib6RLEj312g= +github.com/jmdeal/karpenter v0.0.0-20250225003856-d34d71584c1a h1:r7gPnoafSMfTjVmirGrkQu/3Suo6wiVDSElWyWIscXk= +github.com/jmdeal/karpenter v0.0.0-20250225003856-d34d71584c1a/go.mod h1:/FgjYrt+hwAMcvY46hku76st/aeP4KjOib6RLEj312g= github.com/jonathan-innis/aws-sdk-go-prometheus v0.1.1 h1:gmpuckrozJ3lfKqSIia9YMGh0caoQmEY7mQP5MsnbTM= github.com/jonathan-innis/aws-sdk-go-prometheus v0.1.1/go.mod h1:168XvZFghCqo32ISSWnTXwdlMKzEq+x9TqdfswCjkrQ= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= diff --git a/pkg/apis/crds/karpenter.k8s.aws_ec2nodeclasses.yaml b/pkg/apis/crds/karpenter.k8s.aws_ec2nodeclasses.yaml index 09e845096d29..d8680a337e0f 100644 --- a/pkg/apis/crds/karpenter.k8s.aws_ec2nodeclasses.yaml +++ b/pkg/apis/crds/karpenter.k8s.aws_ec2nodeclasses.yaml @@ -244,7 +244,7 @@ spec: properties: id: description: ID is the capacity reservation id in EC2 - pattern: ^cr-.+$ + pattern: ^cr-[0-9a-z]+$ type: string ownerID: description: Owner is the owner id for the ami. diff --git a/pkg/apis/v1/ec2nodeclass.go b/pkg/apis/v1/ec2nodeclass.go index 717cee67a7e9..fdc76e33458f 100644 --- a/pkg/apis/v1/ec2nodeclass.go +++ b/pkg/apis/v1/ec2nodeclass.go @@ -184,7 +184,7 @@ type CapacityReservationSelectorTerm struct { // +optional Tags map[string]string `json:"tags,omitempty"` // ID is the capacity reservation id in EC2 - // +kubebuilder:validation:Pattern:="^cr-.+$" + // +kubebuilder:validation:Pattern:="^cr-[0-9a-z]+$" // +optional ID string `json:"id,omitempty"` // Owner is the owner id for the ami. diff --git a/pkg/cloudprovider/drift.go b/pkg/cloudprovider/drift.go index 276e6ce7c26d..dd04549f4b9c 100644 --- a/pkg/cloudprovider/drift.go +++ b/pkg/cloudprovider/drift.go @@ -128,6 +128,9 @@ func (c *CloudProvider) areSecurityGroupsDrifted(ec2Instance *instance.Instance, // Checks if capacity reservations are drifted, by comparing the capacity reservations persisted to the NodeClass to // the instance's capacity reservation. +// NOTE: We handle drift dynamically for capacity reservations rather than relying on the offerings inducing drift since +// a reserved instance may fall back to on-demand. Relying on offerings could result in drift occurring before fallback +// would cancel it out. func (c *CloudProvider) isCapacityReservationDrifted(instance *instance.Instance, nodeClass *v1.EC2NodeClass) cloudprovider.DriftReason { capacityReservationIDs := sets.New(lo.Map(nodeClass.Status.CapacityReservations, func(cr v1.CapacityReservation, _ int) string { return cr.ID })...) if instance.CapacityReservationID != "" && !capacityReservationIDs.Has(instance.CapacityReservationID) { diff --git a/pkg/controllers/nodeclaim/capacityreservation/controller.go b/pkg/controllers/nodeclaim/capacityreservation/controller.go index 32efa849791d..ed1821748a77 100644 --- a/pkg/controllers/nodeclaim/capacityreservation/controller.go +++ b/pkg/controllers/nodeclaim/capacityreservation/controller.go @@ -68,7 +68,9 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) { }) ncs := &karpv1.NodeClaimList{} - if err := c.kubeClient.List(ctx, ncs); err != nil { + if err := c.kubeClient.List(ctx, ncs, client.MatchingLabels{ + karpv1.NodeRegisteredLabelKey: "true", + }); err != nil { return reconcile.Result{}, fmt.Errorf("listing nodeclaims, %w", err) } updatedNodeClaims := sets.New[string]() diff --git a/pkg/controllers/nodeclass/capacityreservation.go b/pkg/controllers/nodeclass/capacityreservation.go index 7f9d3b149807..a7ee60275b9e 100644 --- a/pkg/controllers/nodeclass/capacityreservation.go +++ b/pkg/controllers/nodeclass/capacityreservation.go @@ -72,7 +72,7 @@ func (c *CapacityReservation) Reconcile(ctx context.Context, nc *v1.EC2NodeClass errors := []error{} nc.Status.CapacityReservations = []v1.CapacityReservation{} for _, r := range reservations { - reservation, err := capacityReservationFromEC2(r) + reservation, err := CapacityReservationFromEC2(r) if err != nil { errors = append(errors, err) continue @@ -89,7 +89,7 @@ func (c *CapacityReservation) Reconcile(ctx context.Context, nc *v1.EC2NodeClass return reconcile.Result{RequeueAfter: c.requeueAfter(reservations...)}, nil } -func capacityReservationFromEC2(cr *ec2types.CapacityReservation) (v1.CapacityReservation, error) { +func CapacityReservationFromEC2(cr *ec2types.CapacityReservation) (v1.CapacityReservation, error) { // Guard against new instance match criteria added in the future. See https://github.com/kubernetes-sigs/karpenter/issues/806 // for a similar issue. if !lo.Contains([]ec2types.InstanceMatchCriteria{ diff --git a/pkg/controllers/nodeclass/capacityreservation_test.go b/pkg/controllers/nodeclass/capacityreservation_test.go index a8d115cd1212..f8909b2b8cf9 100644 --- a/pkg/controllers/nodeclass/capacityreservation_test.go +++ b/pkg/controllers/nodeclass/capacityreservation_test.go @@ -25,6 +25,7 @@ import ( . "sigs.k8s.io/karpenter/pkg/test/expectations" v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1" + "github.com/aws/karpenter-provider-aws/pkg/utils" ) const selfOwnerID = "012345678901" @@ -54,7 +55,7 @@ var _ = Describe("NodeClass Capacity Reservation Reconciler", func() { InstanceMatchCriteria: ec2types.InstanceMatchCriteriaTargeted, CapacityReservationId: lo.ToPtr("cr-m5.large-1a-2"), AvailableInstanceCount: lo.ToPtr[int32](10), - Tags: toEC2Tags(discoveryTags), + Tags: utils.MergeTags(discoveryTags), State: ec2types.CapacityReservationStateActive, }, { @@ -73,7 +74,7 @@ var _ = Describe("NodeClass Capacity Reservation Reconciler", func() { InstanceMatchCriteria: ec2types.InstanceMatchCriteriaTargeted, CapacityReservationId: lo.ToPtr("cr-m5.large-1b-2"), AvailableInstanceCount: lo.ToPtr[int32](15), - Tags: toEC2Tags(discoveryTags), + Tags: utils.MergeTags(discoveryTags), State: ec2types.CapacityReservationStateActive, }, }, @@ -171,12 +172,3 @@ var _ = Describe("NodeClass Capacity Reservation Reconciler", func() { }), ) }) - -func toEC2Tags(tags map[string]string) []ec2types.Tag { - return lo.MapToSlice(tags, func(key, value string) ec2types.Tag { - return ec2types.Tag{ - Key: lo.ToPtr(key), - Value: lo.ToPtr(value), - } - }) -} diff --git a/pkg/controllers/nodeclass/suite_test.go b/pkg/controllers/nodeclass/suite_test.go index 5a7af82be21a..7bf0e2f4506b 100644 --- a/pkg/controllers/nodeclass/suite_test.go +++ b/pkg/controllers/nodeclass/suite_test.go @@ -61,7 +61,11 @@ func TestAPIs(t *testing.T) { } var _ = BeforeSuite(func() { - env = coretest.NewEnvironment(coretest.WithCRDs(test.RemoveNodeClassTagValidation(apis.CRDs)...), coretest.WithCRDs(v1alpha1.CRDs...), coretest.WithFieldIndexers(coretest.NodeClaimNodeClassRefFieldIndexer(ctx))) + env = coretest.NewEnvironment( + coretest.WithCRDs(test.DisableCapacityReservationIDValidation(test.RemoveNodeClassTagValidation(apis.CRDs))...), + coretest.WithCRDs(v1alpha1.CRDs...), + coretest.WithFieldIndexers(coretest.NodeClaimNodeClassRefFieldIndexer(ctx)), + ) ctx = coreoptions.ToContext(ctx, coretest.Options(coretest.OptionsFields{FeatureGates: coretest.FeatureGates{ReservedCapacity: lo.ToPtr(true)}})) ctx = options.ToContext(ctx, test.Options()) awsEnv = test.NewEnvironment(ctx, env) diff --git a/pkg/fake/ec2api.go b/pkg/fake/ec2api.go index 112c441aed07..194a1c2954fa 100644 --- a/pkg/fake/ec2api.go +++ b/pkg/fake/ec2api.go @@ -68,6 +68,9 @@ type EC2Behavior struct { LaunchTemplates sync.Map InsufficientCapacityPools atomic.Slice[CapacityPool] NextError AtomicError + + // Tracks the capacity reservations associated with launch templates, if applicable + launchTemplateCapacityReservationIndex sync.Map } type EC2API struct { @@ -109,6 +112,11 @@ func (e *EC2API) Reset() { }) e.InsufficientCapacityPools.Reset() e.NextError.Reset() + + e.launchTemplateCapacityReservationIndex.Range(func(k, _ any) bool { + e.launchTemplateCapacityReservationIndex.Delete(k) + return true + }) } // nolint: gocyclo @@ -136,15 +144,6 @@ func (e *EC2API) CreateFleet(_ context.Context, input *ec2.CreateFleetInput, _ . spotInstanceRequestID = aws.String(test.RandomName()) } - launchTemplates := map[string]*ec2.CreateLaunchTemplateInput{} - for e.CreateLaunchTemplateBehavior.CalledWithInput.Len() > 0 { - lt := e.CreateLaunchTemplateBehavior.CalledWithInput.Pop() - launchTemplates[*lt.LaunchTemplateName] = lt - } - for _, ltInput := range launchTemplates { - e.CreateLaunchTemplateBehavior.CalledWithInput.Add(ltInput) - } - fulfilled := 0 for _, ltc := range input.LaunchTemplateConfigs { for _, override := range ltc.Overrides { @@ -162,34 +161,21 @@ func (e *EC2API) CreateFleet(_ context.Context, input *ec2.CreateFleetInput, _ . if skipInstance { continue } - amiID := lo.ToPtr("") - var capacityReservationID *string - if lt, ok := launchTemplates[lo.FromPtr(ltc.LaunchTemplateSpecification.LaunchTemplateName)]; ok { - amiID = lt.LaunchTemplateData.ImageId - if crs := lt.LaunchTemplateData.CapacityReservationSpecification; crs != nil && crs.CapacityReservationPreference == ec2types.CapacityReservationPreferenceCapacityReservationsOnly { - id := crs.CapacityReservationTarget.CapacityReservationId - if id == nil { - panic("received a launch template targeting capacity reservations without a provided ID") - } - capacityReservationID = id - } - } - if capacityReservationID != nil { + + if crID, ok := e.launchTemplateCapacityReservationIndex.Load(*ltc.LaunchTemplateSpecification.LaunchTemplateName); ok { if cr, ok := lo.Find(e.DescribeCapacityReservationsOutput.Clone().CapacityReservations, func(cr ec2types.CapacityReservation) bool { - return *cr.CapacityReservationId == *capacityReservationID + return *cr.CapacityReservationId == crID.(string) }); !ok || *cr.AvailableInstanceCount == 0 { reservationExceededPools = append(reservationExceededPools, CapacityPool{ InstanceType: string(override.InstanceType), Zone: lo.FromPtr(override.AvailabilityZone), CapacityType: karpv1.CapacityTypeReserved, - ReservationID: *capacityReservationID, + ReservationID: crID.(string), }) - skipInstance = true + continue } } - if skipInstance { - continue - } + amiID := lo.ToPtr("") if e.CreateLaunchTemplateBehavior.CalledWithInput.Len() > 0 { lt := e.CreateLaunchTemplateBehavior.CalledWithInput.Pop() amiID = lt.LaunchTemplateData.ImageId @@ -292,6 +278,9 @@ func (e *EC2API) CreateLaunchTemplate(ctx context.Context, input *ec2.CreateLaun } launchTemplate := ec2types.LaunchTemplate{LaunchTemplateName: input.LaunchTemplateName} e.LaunchTemplates.Store(input.LaunchTemplateName, launchTemplate) + if crs := input.LaunchTemplateData.CapacityReservationSpecification; crs != nil && crs.CapacityReservationPreference == ec2types.CapacityReservationPreferenceCapacityReservationsOnly { + e.launchTemplateCapacityReservationIndex.Store(*input.LaunchTemplateName, *crs.CapacityReservationTarget.CapacityReservationId) + } return &ec2.CreateLaunchTemplateOutput{LaunchTemplate: lo.ToPtr(launchTemplate)}, nil }) } @@ -442,7 +431,7 @@ func (e *EC2API) DescribeLaunchTemplates(_ context.Context, input *ec2.DescribeL output := &ec2.DescribeLaunchTemplatesOutput{} e.LaunchTemplates.Range(func(key, value interface{}) bool { launchTemplate := value.(ec2types.LaunchTemplate) - if lo.Contains(input.LaunchTemplateNames, lo.FromPtr(launchTemplate.LaunchTemplateName)) || len(input.Filters) != 0 && Filter(input.Filters, aws.ToString(launchTemplate.LaunchTemplateId), aws.ToString(launchTemplate.LaunchTemplateName), "", launchTemplate.Tags) { + if lo.Contains(input.LaunchTemplateNames, lo.FromPtr(launchTemplate.LaunchTemplateName)) || len(input.Filters) != 0 && Filter(input.Filters, aws.ToString(launchTemplate.LaunchTemplateId), aws.ToString(launchTemplate.LaunchTemplateName), "", "", launchTemplate.Tags) { output.LaunchTemplates = append(output.LaunchTemplates, launchTemplate) } return true diff --git a/pkg/fake/utils.go b/pkg/fake/utils.go index 7f1bc6170b9e..539d778c689d 100644 --- a/pkg/fake/utils.go +++ b/pkg/fake/utils.go @@ -90,7 +90,7 @@ func SubnetsFromFleetRequest(createFleetInput *ec2.CreateFleetInput) []string { // Filters are chained with a logical "AND" func FilterDescribeSecurtyGroups(sgs []ec2types.SecurityGroup, filters []ec2types.Filter) []ec2types.SecurityGroup { return lo.Filter(sgs, func(group ec2types.SecurityGroup, _ int) bool { - return Filter(filters, *group.GroupId, *group.GroupName, "", group.Tags) + return Filter(filters, *group.GroupId, *group.GroupName, "", "", group.Tags) }) } @@ -98,7 +98,7 @@ func FilterDescribeSecurtyGroups(sgs []ec2types.SecurityGroup, filters []ec2type // Filters are chained with a logical "AND" func FilterDescribeSubnets(subnets []ec2types.Subnet, filters []ec2types.Filter) []ec2types.Subnet { return lo.Filter(subnets, func(subnet ec2types.Subnet, _ int) bool { - return Filter(filters, *subnet.SubnetId, "", "", subnet.Tags) + return Filter(filters, *subnet.SubnetId, "", "", "", subnet.Tags) }) } @@ -108,38 +108,26 @@ func FilterDescribeCapacityReservations(crs []ec2types.CapacityReservation, ids if len(ids) != 0 && !idSet.Has(*cr.CapacityReservationId) { return false } - if stateFilter, ok := lo.Find(filters, func(f ec2types.Filter) bool { - return lo.FromPtr(f.Name) == "state" - }); ok { - if !lo.Contains(stateFilter.Values, string(cr.State)) { - return false - } - } - return Filter(lo.Reject(filters, func(f ec2types.Filter, _ int) bool { - return lo.FromPtr(f.Name) == "state" - }), *cr.CapacityReservationId, "", *cr.OwnerId, cr.Tags) + return Filter(filters, *cr.CapacityReservationId, "", *cr.OwnerId, string(cr.State), cr.Tags) }) } func FilterDescribeImages(images []ec2types.Image, filters []ec2types.Filter) []ec2types.Image { return lo.Filter(images, func(image ec2types.Image, _ int) bool { - if stateFilter, ok := lo.Find(filters, func(f ec2types.Filter) bool { - return lo.FromPtr(f.Name) == "state" - }); ok { - if !lo.Contains(stateFilter.Values, string(image.State)) { - return false - } - } - return Filter(lo.Reject(filters, func(f ec2types.Filter, _ int) bool { - return lo.FromPtr(f.Name) == "state" - }), *image.ImageId, *image.Name, "", image.Tags) + return Filter(filters, *image.ImageId, *image.Name, "", string(image.State), image.Tags) }) } //nolint:gocyclo -func Filter(filters []ec2types.Filter, id, name, owner string, tags []ec2types.Tag) bool { +func Filter(filters []ec2types.Filter, id, name, owner, state string, tags []ec2types.Tag) bool { return lo.EveryBy(filters, func(filter ec2types.Filter) bool { switch filterName := aws.ToString(filter.Name); { + case filterName == "state": + for _, val := range filter.Values { + if state == val { + return true + } + } case filterName == "subnet-id" || filterName == "group-id" || filterName == "image-id": for _, val := range filter.Values { if id == val { diff --git a/pkg/providers/capacityreservation/suite_test.go b/pkg/providers/capacityreservation/suite_test.go new file mode 100644 index 000000000000..620b6669223c --- /dev/null +++ b/pkg/providers/capacityreservation/suite_test.go @@ -0,0 +1,109 @@ +package capacityreservation_test + +import ( + "context" + "testing" + + "github.com/aws/aws-sdk-go-v2/service/ec2" + ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" + "github.com/aws/karpenter-provider-aws/pkg/apis" + v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1" + "github.com/aws/karpenter-provider-aws/pkg/operator/options" + "github.com/aws/karpenter-provider-aws/pkg/test" + "github.com/aws/karpenter-provider-aws/pkg/utils" + "github.com/samber/lo" + coreoptions "sigs.k8s.io/karpenter/pkg/operator/options" + coretest "sigs.k8s.io/karpenter/pkg/test" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "sigs.k8s.io/karpenter/pkg/test/v1alpha1" + . "sigs.k8s.io/karpenter/pkg/utils/testing" +) + +var ctx context.Context +var env *coretest.Environment +var awsEnv *test.Environment + +func TestAPIs(t *testing.T) { + ctx = TestContextWithLogger(t) + RegisterFailHandler(Fail) + RunSpecs(t, "EC2NodeClass") +} + +var _ = BeforeSuite(func() { + env = coretest.NewEnvironment( + coretest.WithCRDs(test.DisableCapacityReservationIDValidation(test.RemoveNodeClassTagValidation(apis.CRDs))...), + coretest.WithCRDs(v1alpha1.CRDs...), + ) + ctx = coreoptions.ToContext(ctx, coretest.Options(coretest.OptionsFields{FeatureGates: coretest.FeatureGates{ReservedCapacity: lo.ToPtr(true)}})) + ctx = options.ToContext(ctx, test.Options()) + awsEnv = test.NewEnvironment(ctx, env) +}) + +// NOTE: Tests for different selector terms can be found in the nodeclass reconciler tests +var _ = Describe("Capacity Reservation Provider", func() { + var discoveryTags map[string]string + var reservations map[string]int + + BeforeEach(func() { + discoveryTags = map[string]string{ + "karpenter.sh/discovery": "test", + } + crs := []ec2types.CapacityReservation{ + { + AvailabilityZone: lo.ToPtr("test-zone-1a"), + InstanceType: lo.ToPtr("m5.large"), + OwnerId: lo.ToPtr("012345678901"), + InstanceMatchCriteria: ec2types.InstanceMatchCriteriaTargeted, + CapacityReservationId: lo.ToPtr("cr-m5.large-1a-1"), + AvailableInstanceCount: lo.ToPtr[int32](10), + Tags: utils.MergeTags(discoveryTags), + State: ec2types.CapacityReservationStateActive, + }, + { + AvailabilityZone: lo.ToPtr("test-zone-1a"), + InstanceType: lo.ToPtr("m5.large"), + OwnerId: lo.ToPtr("012345678901"), + InstanceMatchCriteria: ec2types.InstanceMatchCriteriaTargeted, + CapacityReservationId: lo.ToPtr("cr-m5.large-1a-2"), + AvailableInstanceCount: lo.ToPtr[int32](15), + Tags: utils.MergeTags(discoveryTags), + State: ec2types.CapacityReservationStateActive, + }, + } + awsEnv.EC2API.DescribeCapacityReservationsOutput.Set(&ec2.DescribeCapacityReservationsOutput{ + CapacityReservations: crs, + }) + reservations = make(map[string]int) + for _, cr := range crs { + reservations[*cr.CapacityReservationId] = int(*cr.AvailableInstanceCount) + } + }) + Context("Availability Cache", func() { + It("should sync availability cache when listing reservations", func() { + crs, err := awsEnv.CapacityReservationProvider.List(ctx, v1.CapacityReservationSelectorTerm{ + Tags: discoveryTags, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(crs).To(HaveLen(2)) + for id, count := range reservations { + Expect(awsEnv.CapacityReservationProvider.GetAvailableInstanceCount(id)).To(Equal(count)) + } + }) + It("should decrement availability when reservation is marked as launched", func() { + awsEnv.CapacityReservationProvider.SetAvailableInstanceCount("cr-test", 5) + awsEnv.CapacityReservationProvider.MarkLaunched("cr-test-2") + Expect(awsEnv.CapacityReservationProvider.GetAvailableInstanceCount("cr-test")).To(Equal(5)) + awsEnv.CapacityReservationProvider.MarkLaunched("cr-test") + Expect(awsEnv.CapacityReservationProvider.GetAvailableInstanceCount("cr-test")).To(Equal(4)) + }) + It("should increment availability when reservation is marked as terminated", func() { + awsEnv.CapacityReservationProvider.SetAvailableInstanceCount("cr-test", 5) + awsEnv.CapacityReservationProvider.MarkTerminated("cr-test-2") + Expect(awsEnv.CapacityReservationProvider.GetAvailableInstanceCount("cr-test")).To(Equal(5)) + awsEnv.CapacityReservationProvider.MarkTerminated("cr-test") + Expect(awsEnv.CapacityReservationProvider.GetAvailableInstanceCount("cr-test")).To(Equal(6)) + }) + }) +}) diff --git a/pkg/providers/instance/suite_test.go b/pkg/providers/instance/suite_test.go index 9146d9b96a0a..1c7e633a3369 100644 --- a/pkg/providers/instance/suite_test.go +++ b/pkg/providers/instance/suite_test.go @@ -63,7 +63,7 @@ func TestAWS(t *testing.T) { } var _ = BeforeSuite(func() { - env = coretest.NewEnvironment(coretest.WithCRDs(apis.CRDs...), coretest.WithCRDs(v1alpha1.CRDs...)) + env = coretest.NewEnvironment(coretest.WithCRDs(test.DisableCapacityReservationIDValidation(apis.CRDs)...), coretest.WithCRDs(v1alpha1.CRDs...)) ctx = coreoptions.ToContext(ctx, coretest.Options(coretest.OptionsFields{FeatureGates: coretest.FeatureGates{ReservedCapacity: lo.ToPtr(true)}})) ctx = options.ToContext(ctx, test.Options()) awsEnv = test.NewEnvironment(ctx, env) diff --git a/pkg/providers/instancetype/offering/provider.go b/pkg/providers/instancetype/offering/provider.go index e68e8ea06801..3bbcfc6b47ad 100644 --- a/pkg/providers/instancetype/offering/provider.go +++ b/pkg/providers/instancetype/offering/provider.go @@ -128,7 +128,7 @@ func (p *DefaultProvider) createOfferings( var offerings []*cloudprovider.Offering itZones := sets.New(it.Requirements.Get(corev1.LabelTopologyZone).Values()...) - if ofs, ok := p.cache.Get(p.cacheKeyFromInstanceType(it, subnetZones)); ok { + if ofs, ok := p.cache.Get(p.cacheKeyFromInstanceType(it)); ok { offerings = append(offerings, ofs.([]*cloudprovider.Offering)...) } else { var cachedOfferings []*cloudprovider.Offering @@ -139,7 +139,6 @@ func (p *DefaultProvider) createOfferings( continue } isUnavailable := p.unavailableOfferings.IsUnavailable(ec2types.InstanceType(it.Name), zone, capacityType) - _, hasSubnetZone := subnetZones[zone] var price float64 var hasPrice bool switch capacityType { @@ -157,7 +156,7 @@ func (p *DefaultProvider) createOfferings( scheduling.NewRequirement(cloudprovider.ReservationIDLabel, corev1.NodeSelectorOpDoesNotExist), ), Price: price, - Available: !isUnavailable && hasPrice && itZones.Has(zone) && hasSubnetZone, + Available: !isUnavailable && hasPrice && itZones.Has(zone), } if id, ok := subnetZones[zone]; ok { offering.Requirements.Add(scheduling.NewRequirement(v1.LabelTopologyZoneID, corev1.NodeSelectorOpIn, id)) @@ -166,7 +165,7 @@ func (p *DefaultProvider) createOfferings( offerings = append(cachedOfferings, offering) } } - p.cache.SetDefault(p.cacheKeyFromInstanceType(it, subnetZones), cachedOfferings) + p.cache.SetDefault(p.cacheKeyFromInstanceType(it), cachedOfferings) offerings = append(offerings, cachedOfferings...) } if !options.FromContext(ctx).FeatureGates.ReservedCapacity { @@ -178,8 +177,6 @@ func (p *DefaultProvider) createOfferings( continue } reservation := &nodeClass.Status.CapacityReservations[i] - - _, hasSubnetZone := subnetZones[reservation.AvailabilityZone] price := 0.0 if odPrice, ok := p.pricingProvider.OnDemandPrice(ec2types.InstanceType(it.Name)); ok { // Divide the on-demand price by a sufficiently large constant. This allows us to treat the reservation as "free", @@ -196,7 +193,7 @@ func (p *DefaultProvider) createOfferings( scheduling.NewRequirement(cloudprovider.ReservationIDLabel, corev1.NodeSelectorOpIn, reservation.ID), ), Price: price, - Available: reservationCapacity != 0 && itZones.Has(reservation.AvailabilityZone) && hasSubnetZone, + Available: reservationCapacity != 0 && itZones.Has(reservation.AvailabilityZone), ReservationCapacity: reservationCapacity, } if id, ok := subnetZones[reservation.AvailabilityZone]; ok { @@ -207,27 +204,21 @@ func (p *DefaultProvider) createOfferings( return offerings } -func (p *DefaultProvider) cacheKeyFromInstanceType(it *cloudprovider.InstanceType, subnetZones map[string]string) string { +func (p *DefaultProvider) cacheKeyFromInstanceType(it *cloudprovider.InstanceType) string { zonesHash, _ := hashstructure.Hash( it.Requirements.Get(corev1.LabelTopologyZone).Values(), hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true}, ) - subnetZonesHash, _ := hashstructure.Hash( - subnetZones, - hashstructure.FormatV2, - &hashstructure.HashOptions{SlicesAsSets: true}, - ) capacityTypesHash, _ := hashstructure.Hash( it.Requirements.Get(karpv1.CapacityTypeLabelKey).Values(), hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true}, ) return fmt.Sprintf( - "%s-%016x-%016x-%016x-%d", + "%s-%016x-%016x-%d", it.Name, zonesHash, - subnetZonesHash, capacityTypesHash, p.unavailableOfferings.SeqNum, ) diff --git a/pkg/providers/instancetype/types.go b/pkg/providers/instancetype/types.go index 65a0eb0ea8fb..7e9178981cfb 100644 --- a/pkg/providers/instancetype/types.go +++ b/pkg/providers/instancetype/types.go @@ -28,6 +28,7 @@ import ( "github.com/samber/lo" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/sets" karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1" v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1" @@ -123,8 +124,8 @@ func NewInstanceType( ctx context.Context, info ec2types.InstanceTypeInfo, region string, - zones []string, - zonesToZoneIDs map[string]string, + offeringZones []string, + subnetZonesToZoneIDs map[string]string, blockDeviceMappings []*v1.BlockDeviceMapping, instanceStorePolicy *v1.InstanceStorePolicy, maxPods *int32, @@ -139,7 +140,7 @@ func NewInstanceType( amiFamily := amifamily.GetAMIFamily(amiFamilyType, &amifamily.Options{}) it := &cloudprovider.InstanceType{ Name: string(info.InstanceType), - Requirements: computeRequirements(info, region, zones, zonesToZoneIDs, amiFamily, capacityReservations), + Requirements: computeRequirements(info, region, offeringZones, subnetZonesToZoneIDs, amiFamily, capacityReservations), Capacity: computeCapacity(ctx, info, amiFamily, blockDeviceMappings, instanceStorePolicy, maxPods, podsPerCore), Overhead: &cloudprovider.InstanceTypeOverhead{ KubeReserved: kubeReservedResources(cpu(info), pods(ctx, info, amiFamily, maxPods, podsPerCore), ENILimitedPods(ctx, info), amiFamily, kubeReserved), @@ -157,8 +158,8 @@ func NewInstanceType( func computeRequirements( info ec2types.InstanceTypeInfo, region string, - zones []string, - zonesToZoneIDs map[string]string, + offeringZones []string, + subnetZonesToZoneIDs map[string]string, amiFamily amifamily.AMIFamily, capacityReservations []v1.CapacityReservation, ) scheduling.Requirements { @@ -172,12 +173,15 @@ func computeRequirements( capacityTypes = append(capacityTypes, karpv1.CapacityTypeReserved) } + // Available zones is the set intersection between zones where the instance type is available, and zones which are + // available via the provided EC2NodeClass. + availableZones := sets.New(offeringZones...).Intersection(sets.New(lo.Keys(subnetZonesToZoneIDs)...)) requirements := scheduling.NewRequirements( // Well Known Upstream scheduling.NewRequirement(corev1.LabelInstanceTypeStable, corev1.NodeSelectorOpIn, string(info.InstanceType)), scheduling.NewRequirement(corev1.LabelArchStable, corev1.NodeSelectorOpIn, getArchitecture(info)), scheduling.NewRequirement(corev1.LabelOSStable, corev1.NodeSelectorOpIn, getOS(info, amiFamily)...), - scheduling.NewRequirement(corev1.LabelTopologyZone, corev1.NodeSelectorOpIn, zones...), + scheduling.NewRequirement(corev1.LabelTopologyZone, corev1.NodeSelectorOpIn, availableZones.UnsortedList()...), scheduling.NewRequirement(corev1.LabelTopologyRegion, corev1.NodeSelectorOpIn, region), scheduling.NewRequirement(corev1.LabelWindowsBuild, corev1.NodeSelectorOpDoesNotExist), // Well Known to Karpenter @@ -206,8 +210,8 @@ func computeRequirements( ) // Only add zone-id label when available in offerings. It may not be available if a user has upgraded from a // previous version of Karpenter w/o zone-id support and the nodeclass subnet status has not yet updated. - if zoneIDs := lo.FilterMap(zones, func(zone string, _ int) (string, bool) { - id, ok := zonesToZoneIDs[zone] + if zoneIDs := lo.FilterMap(availableZones.UnsortedList(), func(zone string, _ int) (string, bool) { + id, ok := subnetZonesToZoneIDs[zone] return id, ok }); len(zoneIDs) != 0 { requirements.Add(scheduling.NewRequirement(v1.LabelTopologyZoneID, corev1.NodeSelectorOpIn, zoneIDs...)) diff --git a/pkg/providers/launchtemplate/suite_test.go b/pkg/providers/launchtemplate/suite_test.go index c969455ee5fd..71d2c5966c69 100644 --- a/pkg/providers/launchtemplate/suite_test.go +++ b/pkg/providers/launchtemplate/suite_test.go @@ -90,7 +90,7 @@ func TestAWS(t *testing.T) { } var _ = BeforeSuite(func() { - env = coretest.NewEnvironment(coretest.WithCRDs(apis.CRDs...), coretest.WithCRDs(v1alpha1.CRDs...)) + env = coretest.NewEnvironment(coretest.WithCRDs(test.DisableCapacityReservationIDValidation(apis.CRDs)...), coretest.WithCRDs(v1alpha1.CRDs...)) ctx = coreoptions.ToContext(ctx, coretest.Options(coretest.OptionsFields{FeatureGates: coretest.FeatureGates{ReservedCapacity: lo.ToPtr(true)}})) ctx = options.ToContext(ctx, test.Options()) ctx, stop = context.WithCancel(ctx) @@ -2300,6 +2300,129 @@ essential = true ) }) }) + It("should generate a unique launch template per capacity reservation", func() { + crs := []ec2types.CapacityReservation{ + { + AvailabilityZone: lo.ToPtr("test-zone-1a"), + InstanceType: lo.ToPtr("m5.large"), + OwnerId: lo.ToPtr("012345678901"), + InstanceMatchCriteria: ec2types.InstanceMatchCriteriaTargeted, + CapacityReservationId: lo.ToPtr("cr-m5.large-1a-1"), + AvailableInstanceCount: lo.ToPtr[int32](10), + State: ec2types.CapacityReservationStateActive, + }, + { + AvailabilityZone: lo.ToPtr("test-zone-1a"), + InstanceType: lo.ToPtr("m5.large"), + OwnerId: lo.ToPtr("012345678901"), + InstanceMatchCriteria: ec2types.InstanceMatchCriteriaTargeted, + CapacityReservationId: lo.ToPtr("cr-m5.large-1a-2"), + AvailableInstanceCount: lo.ToPtr[int32](15), + State: ec2types.CapacityReservationStateActive, + }, + { + AvailabilityZone: lo.ToPtr("test-zone-1b"), + InstanceType: lo.ToPtr("m5.large"), + OwnerId: lo.ToPtr("012345678901"), + InstanceMatchCriteria: ec2types.InstanceMatchCriteriaTargeted, + CapacityReservationId: lo.ToPtr("cr-m5.large-1b-1"), + AvailableInstanceCount: lo.ToPtr[int32](10), + State: ec2types.CapacityReservationStateActive, + }, + { + AvailabilityZone: lo.ToPtr("test-zone-1b"), + InstanceType: lo.ToPtr("m5.xlarge"), + OwnerId: lo.ToPtr("012345678901"), + InstanceMatchCriteria: ec2types.InstanceMatchCriteriaTargeted, + CapacityReservationId: lo.ToPtr("cr-m5.xlarge-1b-1"), + AvailableInstanceCount: lo.ToPtr[int32](15), + State: ec2types.CapacityReservationStateActive, + }, + } + awsEnv.EC2API.DescribeCapacityReservationsOutput.Set(&ec2.DescribeCapacityReservationsOutput{ + CapacityReservations: crs, + }) + for _, cr := range crs { + nodeClass.Status.CapacityReservations = append(nodeClass.Status.CapacityReservations, lo.Must(nodeclass.CapacityReservationFromEC2(&cr))) + awsEnv.CapacityReservationProvider.SetAvailableInstanceCount(*cr.CapacityReservationId, int(*cr.AvailableInstanceCount)) + } + + nodePool.Spec.Template.Spec.Requirements = []karpv1.NodeSelectorRequirementWithMinValues{{NodeSelectorRequirement: corev1.NodeSelectorRequirement{ + Key: karpv1.CapacityTypeLabelKey, + Operator: corev1.NodeSelectorOpIn, + Values: []string{karpv1.CapacityTypeReserved}, + }}} + pod := coretest.UnschedulablePod() + ExpectApplied(ctx, env.Client, pod, nodePool, nodeClass) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod) + ExpectScheduled(ctx, env.Client, pod) + + launchTemplates := map[string]*ec2.CreateLaunchTemplateInput{} + for awsEnv.EC2API.CreateLaunchTemplateBehavior.CalledWithInput.Len() != 0 { + lt := awsEnv.EC2API.CreateLaunchTemplateBehavior.CalledWithInput.Pop() + launchTemplates[*lt.LaunchTemplateName] = lt + } + // We should have created 3 launch templates, rather than 4 since we only create 1 launch template per capacity pool + Expect(launchTemplates).To(HaveLen(3)) + reservationIDs := lo.Uniq(lo.Map(lo.Values(launchTemplates), func(input *ec2.CreateLaunchTemplateInput, _ int) string { + return *input.LaunchTemplateData.CapacityReservationSpecification.CapacityReservationTarget.CapacityReservationId + })) + Expect(reservationIDs).To(HaveLen(3)) + Expect(reservationIDs).To(ConsistOf( + // We don't include the m5.large offering in 1a because we select the zonal offering with the highest capacity + "cr-m5.large-1a-2", + "cr-m5.large-1b-1", + "cr-m5.xlarge-1b-1", + )) + for _, input := range launchTemplates { + Expect(input.LaunchTemplateData.CapacityReservationSpecification.CapacityReservationPreference).To(Equal(ec2types.CapacityReservationPreferenceCapacityReservationsOnly)) + } + + // Validate that we generate one override per launch template, and the override is for the instance pool associated + // with the capacity reservation. + Expect(awsEnv.EC2API.CreateFleetBehavior.CalledWithInput.Len()).ToNot(Equal(0)) + createFleetInput := awsEnv.EC2API.CreateFleetBehavior.CalledWithInput.Pop() + Expect(createFleetInput.LaunchTemplateConfigs).To(HaveLen(3)) + for _, ltc := range createFleetInput.LaunchTemplateConfigs { + Expect(ltc.Overrides).To(HaveLen(1)) + Expect(launchTemplates).To(HaveKey(*ltc.LaunchTemplateSpecification.LaunchTemplateName)) + lt := launchTemplates[*ltc.LaunchTemplateSpecification.LaunchTemplateName] + cr, ok := lo.Find(crs, func(cr ec2types.CapacityReservation) bool { + return *cr.CapacityReservationId == *lt.LaunchTemplateData.CapacityReservationSpecification.CapacityReservationTarget.CapacityReservationId + }) + Expect(ok).To(BeTrue()) + Expect(*ltc.Overrides[0].AvailabilityZone).To(Equal(*cr.AvailabilityZone)) + Expect(ltc.Overrides[0].InstanceType).To(Equal(ec2types.InstanceType(*cr.InstanceType))) + } + }) + DescribeTable( + "should set the capacity reservation specification accoriding to the capacity reservation feature flag", + func(enabled bool) { + coreoptions.FromContext(ctx).FeatureGates.ReservedCapacity = enabled + + pod := coretest.UnschedulablePod() + ExpectApplied(ctx, env.Client, pod, nodePool, nodeClass) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod) + ExpectScheduled(ctx, env.Client, pod) + + var launchTemplates []*ec2.CreateLaunchTemplateInput + for awsEnv.EC2API.CreateLaunchTemplateBehavior.CalledWithInput.Len() != 0 { + launchTemplates = append(launchTemplates, awsEnv.EC2API.CreateLaunchTemplateBehavior.CalledWithInput.Pop()) + } + for _, input := range launchTemplates { + crs := input.LaunchTemplateData.CapacityReservationSpecification + if !enabled { + Expect(crs).To(BeNil()) + } else { + Expect(*crs).To(Equal(ec2types.LaunchTemplateCapacityReservationSpecificationRequest{ + CapacityReservationPreference: ec2types.CapacityReservationPreferenceNone, + })) + } + } + }, + Entry("enabled", true), + Entry("disabled", false), + ) }) // ExpectTags verifies that the expected tags are a subset of the tags found diff --git a/pkg/test/utils.go b/pkg/test/utils.go index 4e4adebd5752..17b7dc075f2f 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -31,3 +31,18 @@ func RemoveNodeClassTagValidation(crds []*apiextensionsv1.CustomResourceDefiniti } return crds } + +// DisableCapacityReservationIDValidation updates the regex validation used for capacity reservation IDs to allow any +// string after the "cr-" prefix. This enables us to embed useful debugging information in the reservation ID, such as +// the instance type and zone. +func DisableCapacityReservationIDValidation(crds []*apiextensionsv1.CustomResourceDefinition) []*apiextensionsv1.CustomResourceDefinition { + for _, crd := range crds { + if crd.Name != "ec2nodeclasses.karpenter.k8s.aws" { + continue + } + idProps := crd.Spec.Versions[0].Schema.OpenAPIV3Schema.Properties["spec"].Properties["capacityReservationSelectorTerms"].Items.Schema.Properties["id"] + idProps.Pattern = `^cr-.+$` + crd.Spec.Versions[0].Schema.OpenAPIV3Schema.Properties["spec"].Properties["capacityReservationSelectorTerms"].Items.Schema.Properties["id"] = idProps + } + return crds +}