diff --git a/pkg/apis/v1/ec2nodeclass_hash_test.go b/pkg/apis/v1/ec2nodeclass_hash_test.go index a523cfd8189b..87ad4de0b5af 100644 --- a/pkg/apis/v1/ec2nodeclass_hash_test.go +++ b/pkg/apis/v1/ec2nodeclass_hash_test.go @@ -193,7 +193,7 @@ var _ = Describe("Hash", func() { nodeClass.Spec.AMISelectorTerms = []v1.AMISelectorTerm{{ Tags: map[string]string{"ami-test-key": "ami-test-value"}, }} - nodeClass.Spec.SubnetSelectorTerms = []v1.SubnetSelectorTerm{{ + nodeClass.Spec.CapacityReservationSelectorTerms = []v1.CapacityReservationSelectorTerm{{ Tags: map[string]string{"cr-test-key": "cr-test-value"}, }} updatedHash := nodeClass.Hash() diff --git a/pkg/apis/v1/ec2nodeclass_validation_cel_test.go b/pkg/apis/v1/ec2nodeclass_validation_cel_test.go index 898ba9dc7ad1..bbf844aea15c 100644 --- a/pkg/apis/v1/ec2nodeclass_validation_cel_test.go +++ b/pkg/apis/v1/ec2nodeclass_validation_cel_test.go @@ -450,6 +450,15 @@ var _ = Describe("CEL/Validation", func() { }} Expect(env.Client.Create(ctx, nc)).To(Succeed()) }) + It("should succeed for a valid ownerID", func() { + nc.Spec.CapacityReservationSelectorTerms = []v1.CapacityReservationSelectorTerm{{ + OwnerID: "012345678901", + Tags: map[string]string{ + "test": "testvalue", + }, + }} + Expect(env.Client.Create(ctx, nc)).To(Succeed()) + }) It("should fail with a capacity reservation selector on a malformed id", func() { nc.Spec.CapacityReservationSelectorTerms = []v1.CapacityReservationSelectorTerm{{ ID: "r-12345749", @@ -520,15 +529,6 @@ var _ = Describe("CEL/Validation", func() { }} Expect(env.Client.Create(ctx, nc)).ToNot(Succeed()) }) - It("should succeed for a valid ownerID", func() { - nc.Spec.CapacityReservationSelectorTerms = []v1.CapacityReservationSelectorTerm{{ - OwnerID: "012345678901", - Tags: map[string]string{ - "test": "testvalue", - }, - }} - Expect(env.Client.Create(ctx, nc)).To(Succeed()) - }) It("should fail when the ownerID is malformed", func() { nc.Spec.CapacityReservationSelectorTerms = []v1.CapacityReservationSelectorTerm{{ OwnerID: "01234567890", // OwnerID must be 12 digits, this is 11 diff --git a/pkg/controllers/nodeclass/capacityreservation.go b/pkg/controllers/nodeclass/capacityreservation.go index dbcad52186aa..6e6477f8720d 100644 --- a/pkg/controllers/nodeclass/capacityreservation.go +++ b/pkg/controllers/nodeclass/capacityreservation.go @@ -56,6 +56,7 @@ func (c *CapacityReservation) Reconcile(ctx context.Context, nc *v1.EC2NodeClass return reconcile.Result{}, fmt.Errorf("getting capacity reservations, %w", err) } if len(reservations) == 0 { + nc.Status.CapacityReservations = nil nc.StatusConditions().SetTrue(v1.ConditionTypeCapacityReservationsReady) return reconcile.Result{RequeueAfter: capacityReservationPollPeriod}, nil } @@ -86,6 +87,8 @@ func (c *CapacityReservation) Reconcile(ctx context.Context, nc *v1.EC2NodeClass } func capacityReservationFromEC2(cr *ec2types.CapacityReservation) (v1.CapacityReservation, error) { + // Guard against new instance match criteria added in the future. See https://github.com/kubernetes-sigs/karpenter/issues/806 + // for a similar issue. if !lo.Contains([]ec2types.InstanceMatchCriteria{ ec2types.InstanceMatchCriteriaOpen, ec2types.InstanceMatchCriteriaTargeted, @@ -98,8 +101,7 @@ func capacityReservationFromEC2(cr *ec2types.CapacityReservation) (v1.CapacityRe } return v1.CapacityReservation{ - AvailabilityZone: *cr.AvailabilityZone, - // AvailableInstanceCount: int(*cr.AvailableInstanceCount), + AvailabilityZone: *cr.AvailabilityZone, EndTime: endTime, ID: *cr.CapacityReservationId, InstanceMatchCriteria: string(cr.InstanceMatchCriteria), @@ -109,6 +111,9 @@ func capacityReservationFromEC2(cr *ec2types.CapacityReservation) (v1.CapacityRe }, nil } +// requeueAfter determines the duration until the next target reconciliation time based on the provided reservations. If +// any reservations are expected to expire before we would typically requeue, the duration will be based on the +// nearest expiration time. func (c *CapacityReservation) requeueAfter(reservations ...*ec2types.CapacityReservation) time.Duration { var next *time.Time for _, reservation := range reservations { diff --git a/pkg/controllers/nodeclass/validation.go b/pkg/controllers/nodeclass/validation.go index 8131b66047dc..3621e2044b89 100644 --- a/pkg/controllers/nodeclass/validation.go +++ b/pkg/controllers/nodeclass/validation.go @@ -91,7 +91,7 @@ func (n Validation) Reconcile(ctx context.Context, nodeClass *v1.EC2NodeClass) ( return reconcile.Result{}, nil } - createLaunchTemplateInput := launchtemplate.GetCreateLaunchTemplateInput(mockOptions(*nodeClaim, nodeClass, tags), corev1.IPv4Protocol, "") + createLaunchTemplateInput := launchtemplate.GetCreateLaunchTemplateInput(ctx, mockOptions(*nodeClaim, nodeClass, tags), corev1.IPv4Protocol, "") createLaunchTemplateInput.DryRun = aws.Bool(true) if _, err := n.ec2api.CreateLaunchTemplate(ctx, createLaunchTemplateInput); awserrors.IgnoreDryRunError(err) != nil { diff --git a/pkg/providers/capacityreservation/provider.go b/pkg/providers/capacityreservation/provider.go index 2a136debaaa3..e16f5f867b8a 100644 --- a/pkg/providers/capacityreservation/provider.go +++ b/pkg/providers/capacityreservation/provider.go @@ -46,7 +46,11 @@ type DefaultProvider struct { cm *pretty.ChangeMonitor } -func NewProvider(ec2api sdk.EC2API, clk clock.Clock, reservationCache, reservationAvailabilityCache *cache.Cache) *DefaultProvider { +func NewProvider( + ec2api sdk.EC2API, + clk clock.Clock, + reservationCache, reservationAvailabilityCache *cache.Cache, +) *DefaultProvider { return &DefaultProvider{ availabilityCache: availabilityCache{ cache: reservationAvailabilityCache, @@ -60,40 +64,41 @@ func NewProvider(ec2api sdk.EC2API, clk clock.Clock, reservationCache, reservati } func (p *DefaultProvider) List(ctx context.Context, selectorTerms ...v1.CapacityReservationSelectorTerm) ([]*ec2types.CapacityReservation, error) { - queries := QueriesFromSelectorTerms(selectorTerms...) - var reservations []*ec2types.CapacityReservation - var remainingQueries []*Query - for _, query := range queries { - if value, ok := p.reservationCache.Get(query.CacheKey()); ok { - reservations = append(reservations, value.([]*ec2types.CapacityReservation)...) - } else { - remainingQueries = append(remainingQueries, query) - } - } - if len(remainingQueries) == 0 { + queries := QueriesFromSelectorTerms(selectorTerms...) + reservations, queries = p.resolveCachedQueries(queries...) + if len(queries) == 0 { return p.filterReservations(reservations), nil } - - for _, query := range remainingQueries { - paginator := ec2.NewDescribeCapacityReservationsPaginator(p.ec2api, query.DescribeCapacityReservationsInput()) + for _, q := range queries { + paginator := ec2.NewDescribeCapacityReservationsPaginator(p.ec2api, q.DescribeCapacityReservationsInput()) for paginator.HasMorePages() { out, err := paginator.NextPage(ctx) if err != nil { return nil, fmt.Errorf("listing capacity reservations, %w", err) } queryReservations := lo.ToSlicePtr(out.CapacityReservations) - p.reservationCache.SetDefault(query.CacheKey(), queryReservations) + p.reservationCache.SetDefault(q.CacheKey(), queryReservations) reservations = append(reservations, queryReservations...) p.syncAvailability(lo.SliceToMap(queryReservations, func(r *ec2types.CapacityReservation) (string, int) { return *r.CapacityReservationId, int(*r.AvailableInstanceCount) })) } } - return p.filterReservations(reservations), nil } +func (p *DefaultProvider) resolveCachedQueries(queries ...*Query) (reservations []*ec2types.CapacityReservation, remainingQueries []*Query) { + for _, q := range queries { + if value, ok := p.reservationCache.Get(q.CacheKey()); ok { + reservations = append(reservations, value.([]*ec2types.CapacityReservation)...) + } else { + remainingQueries = append(remainingQueries, q) + } + } + return reservations, remainingQueries +} + // filterReservations removes duplicate and expired reservations func (p *DefaultProvider) filterReservations(reservations []*ec2types.CapacityReservation) []*ec2types.CapacityReservation { return lo.Filter(lo.UniqBy(reservations, func(r *ec2types.CapacityReservation) string { diff --git a/pkg/providers/capacityreservation/types.go b/pkg/providers/capacityreservation/types.go index bd970b0c33b4..d5ec2c0c5461 100644 --- a/pkg/providers/capacityreservation/types.go +++ b/pkg/providers/capacityreservation/types.go @@ -60,52 +60,39 @@ func (q *Query) CacheKey() string { } func (q *Query) DescribeCapacityReservationsInput() *ec2.DescribeCapacityReservationsInput { + filters := []ec2types.Filter{{ + Name: lo.ToPtr("state"), + Values: []string{string(ec2types.CapacityReservationStateActive)}, + }} if len(q.ids) != 0 { return &ec2.DescribeCapacityReservationsInput{ - Filters: []ec2types.Filter{lo.Must(q.stateFilter())[0]}, + Filters: filters, CapacityReservationIds: q.ids, } } - type filterProvider func() ([]ec2types.Filter, bool) - return &ec2.DescribeCapacityReservationsInput{ - Filters: lo.Flatten(lo.FilterMap([]filterProvider{ - q.stateFilter, - q.ownerIDFilter, - q.tagsFilter, - }, func(f filterProvider, _ int) ([]ec2types.Filter, bool) { - return f() - })), + if q.ownerID != "" { + filters = append(filters, ec2types.Filter{ + Name: lo.ToPtr("owner-id"), + Values: []string{q.ownerID}, + }) } -} - -func (q *Query) stateFilter() ([]ec2types.Filter, bool) { - return []ec2types.Filter{{ - Name: lo.ToPtr("state"), - Values: []string{string(ec2types.CapacityReservationStateActive)}, - }}, true -} - -func (q *Query) ownerIDFilter() ([]ec2types.Filter, bool) { - return []ec2types.Filter{{ - Name: lo.ToPtr("owner-id"), - Values: []string{q.ownerID}, - }}, q.ownerID != "" -} - -func (q *Query) tagsFilter() ([]ec2types.Filter, bool) { - return lo.MapToSlice(q.tags, func(k, v string) ec2types.Filter { - if v == "*" { + if len(q.tags) != 0 { + filters = append(filters, lo.MapToSlice(q.tags, func(k, v string) ec2types.Filter { + if v == "*" { + return ec2types.Filter{ + Name: lo.ToPtr("tag-key"), + Values: []string{k}, + } + } return ec2types.Filter{ - Name: lo.ToPtr("tag-key"), - Values: []string{k}, + Name: lo.ToPtr(fmt.Sprintf("tag:%s", k)), + Values: []string{v}, } - } - return ec2types.Filter{ - Name: lo.ToPtr(fmt.Sprintf("tag:%s", k)), - Values: []string{v}, - } - }), len(q.tags) != 0 - + })...) + } + return &ec2.DescribeCapacityReservationsInput{ + Filters: filters, + } } type availabilityCache struct { diff --git a/pkg/providers/launchtemplate/launchtemplate.go b/pkg/providers/launchtemplate/launchtemplate.go index d07c37b09bb3..103173b7c72a 100644 --- a/pkg/providers/launchtemplate/launchtemplate.go +++ b/pkg/providers/launchtemplate/launchtemplate.go @@ -38,6 +38,8 @@ import ( "k8s.io/apimachinery/pkg/api/resource" karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1" + karpoptions "sigs.k8s.io/karpenter/pkg/operator/options" + v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1" awserrors "github.com/aws/karpenter-provider-aws/pkg/errors" "github.com/aws/karpenter-provider-aws/pkg/operator/options" @@ -231,7 +233,7 @@ func (p *DefaultProvider) createLaunchTemplate(ctx context.Context, options *ami if err != nil { return ec2types.LaunchTemplate{}, err } - createLaunchTemplateInput := GetCreateLaunchTemplateInput(options, p.ClusterIPFamily, userData) + createLaunchTemplateInput := GetCreateLaunchTemplateInput(ctx, options, p.ClusterIPFamily, userData) output, err := p.ec2api.CreateLaunchTemplate(ctx, createLaunchTemplateInput) if err != nil { return ec2types.LaunchTemplate{}, err @@ -241,7 +243,12 @@ func (p *DefaultProvider) createLaunchTemplate(ctx context.Context, options *ami } // you need UserData, AmiID, tags, blockdevicemappings, instance profile, -func GetCreateLaunchTemplateInput(options *amifamily.LaunchTemplate, ClusterIPFamily corev1.IPFamily, userData string) *ec2.CreateLaunchTemplateInput { +func GetCreateLaunchTemplateInput( + ctx context.Context, + options *amifamily.LaunchTemplate, + ClusterIPFamily corev1.IPFamily, + userData string, +) *ec2.CreateLaunchTemplateInput { launchTemplateDataTags := []ec2types.LaunchTemplateTagSpecificationRequest{ {ResourceType: ec2types.ResourceTypeNetworkInterface, Tags: utils.MergeTags(options.Tags)}, } @@ -249,24 +256,10 @@ func GetCreateLaunchTemplateInput(options *amifamily.LaunchTemplate, ClusterIPFa launchTemplateDataTags = append(launchTemplateDataTags, ec2types.LaunchTemplateTagSpecificationRequest{ResourceType: ec2types.ResourceTypeSpotInstancesRequest, Tags: utils.MergeTags(options.Tags)}) } networkInterfaces := generateNetworkInterfaces(options, ClusterIPFamily) - return &ec2.CreateLaunchTemplateInput{ + lt := &ec2.CreateLaunchTemplateInput{ LaunchTemplateName: aws.String(LaunchTemplateName(options)), LaunchTemplateData: &ec2types.RequestLaunchTemplateData{ BlockDeviceMappings: blockDeviceMappings(options.BlockDeviceMappings), - CapacityReservationSpecification: &ec2types.LaunchTemplateCapacityReservationSpecificationRequest{ - CapacityReservationPreference: lo.Ternary( - options.CapacityType == karpv1.CapacityTypeReserved, - ec2types.CapacityReservationPreferenceCapacityReservationsOnly, - ec2types.CapacityReservationPreferenceNone, - ), - CapacityReservationTarget: lo.Ternary( - options.CapacityType == karpv1.CapacityTypeReserved, - &ec2types.CapacityReservationTarget{ - CapacityReservationId: &options.CapacityReservationID, - }, - nil, - ), - }, IamInstanceProfile: &ec2types.LaunchTemplateIamInstanceProfileSpecificationRequest{ Name: aws.String(options.InstanceProfile), }, @@ -301,6 +294,25 @@ func GetCreateLaunchTemplateInput(options *amifamily.LaunchTemplate, ClusterIPFa }, }, } + // Gate this specifically since the update to CapacityReservationPreference will opt od / spot launches out of open + // ODCRs, which is a breaking change from the pre-native ODCR support behavior. + if karpoptions.FromContext(ctx).FeatureGates.ReservedCapacity { + lt.LaunchTemplateData.CapacityReservationSpecification = &ec2types.LaunchTemplateCapacityReservationSpecificationRequest{ + CapacityReservationPreference: lo.Ternary( + options.CapacityType == karpv1.CapacityTypeReserved, + ec2types.CapacityReservationPreferenceCapacityReservationsOnly, + ec2types.CapacityReservationPreferenceNone, + ), + CapacityReservationTarget: lo.Ternary( + options.CapacityType == karpv1.CapacityTypeReserved, + &ec2types.CapacityReservationTarget{ + CapacityReservationId: &options.CapacityReservationID, + }, + nil, + ), + } + } + return lt } // generateNetworkInterfaces generates network interfaces for the launch template.