Skip to content

Commit

Permalink
checkpoint review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Feb 21, 2025
1 parent cc6c3ad commit 5095850
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 85 deletions.
2 changes: 1 addition & 1 deletion pkg/apis/v1/ec2nodeclass_hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ var _ = Describe("Hash", func() {
nodeClass.Spec.AMISelectorTerms = []v1.AMISelectorTerm{{
Tags: map[string]string{"ami-test-key": "ami-test-value"},
}}
nodeClass.Spec.SubnetSelectorTerms = []v1.SubnetSelectorTerm{{
nodeClass.Spec.CapacityReservationSelectorTerms = []v1.CapacityReservationSelectorTerm{{
Tags: map[string]string{"cr-test-key": "cr-test-value"},
}}
updatedHash := nodeClass.Hash()
Expand Down
18 changes: 9 additions & 9 deletions pkg/apis/v1/ec2nodeclass_validation_cel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,15 @@ var _ = Describe("CEL/Validation", func() {
}}
Expect(env.Client.Create(ctx, nc)).To(Succeed())
})
It("should succeed for a valid ownerID", func() {
nc.Spec.CapacityReservationSelectorTerms = []v1.CapacityReservationSelectorTerm{{
OwnerID: "012345678901",
Tags: map[string]string{
"test": "testvalue",
},
}}
Expect(env.Client.Create(ctx, nc)).To(Succeed())
})
It("should fail with a capacity reservation selector on a malformed id", func() {
nc.Spec.CapacityReservationSelectorTerms = []v1.CapacityReservationSelectorTerm{{
ID: "r-12345749",
Expand Down Expand Up @@ -520,15 +529,6 @@ var _ = Describe("CEL/Validation", func() {
}}
Expect(env.Client.Create(ctx, nc)).ToNot(Succeed())
})
It("should succeed for a valid ownerID", func() {
nc.Spec.CapacityReservationSelectorTerms = []v1.CapacityReservationSelectorTerm{{
OwnerID: "012345678901",
Tags: map[string]string{
"test": "testvalue",
},
}}
Expect(env.Client.Create(ctx, nc)).To(Succeed())
})
It("should fail when the ownerID is malformed", func() {
nc.Spec.CapacityReservationSelectorTerms = []v1.CapacityReservationSelectorTerm{{
OwnerID: "01234567890", // OwnerID must be 12 digits, this is 11
Expand Down
9 changes: 7 additions & 2 deletions pkg/controllers/nodeclass/capacityreservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (c *CapacityReservation) Reconcile(ctx context.Context, nc *v1.EC2NodeClass
return reconcile.Result{}, fmt.Errorf("getting capacity reservations, %w", err)
}
if len(reservations) == 0 {
nc.Status.CapacityReservations = nil
nc.StatusConditions().SetTrue(v1.ConditionTypeCapacityReservationsReady)
return reconcile.Result{RequeueAfter: capacityReservationPollPeriod}, nil
}
Expand Down Expand Up @@ -86,6 +87,8 @@ func (c *CapacityReservation) Reconcile(ctx context.Context, nc *v1.EC2NodeClass
}

func capacityReservationFromEC2(cr *ec2types.CapacityReservation) (v1.CapacityReservation, error) {
// Guard against new instance match criteria added in the future. See https://github.com/kubernetes-sigs/karpenter/issues/806
// for a similar issue.
if !lo.Contains([]ec2types.InstanceMatchCriteria{
ec2types.InstanceMatchCriteriaOpen,
ec2types.InstanceMatchCriteriaTargeted,
Expand All @@ -98,8 +101,7 @@ func capacityReservationFromEC2(cr *ec2types.CapacityReservation) (v1.CapacityRe
}

return v1.CapacityReservation{
AvailabilityZone: *cr.AvailabilityZone,
// AvailableInstanceCount: int(*cr.AvailableInstanceCount),
AvailabilityZone: *cr.AvailabilityZone,
EndTime: endTime,
ID: *cr.CapacityReservationId,
InstanceMatchCriteria: string(cr.InstanceMatchCriteria),
Expand All @@ -109,6 +111,9 @@ func capacityReservationFromEC2(cr *ec2types.CapacityReservation) (v1.CapacityRe
}, nil
}

// requeueAfter determines the duration until the next target reconciliation time based on the provided reservations. If
// any reservations are expected to expire before we would typically requeue, the duration will be based on the
// nearest expiration time.
func (c *CapacityReservation) requeueAfter(reservations ...*ec2types.CapacityReservation) time.Duration {
var next *time.Time
for _, reservation := range reservations {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/nodeclass/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (n Validation) Reconcile(ctx context.Context, nodeClass *v1.EC2NodeClass) (
return reconcile.Result{}, nil
}

createLaunchTemplateInput := launchtemplate.GetCreateLaunchTemplateInput(mockOptions(*nodeClaim, nodeClass, tags), corev1.IPv4Protocol, "")
createLaunchTemplateInput := launchtemplate.GetCreateLaunchTemplateInput(ctx, mockOptions(*nodeClaim, nodeClass, tags), corev1.IPv4Protocol, "")
createLaunchTemplateInput.DryRun = aws.Bool(true)

if _, err := n.ec2api.CreateLaunchTemplate(ctx, createLaunchTemplateInput); awserrors.IgnoreDryRunError(err) != nil {
Expand Down
39 changes: 22 additions & 17 deletions pkg/providers/capacityreservation/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ type DefaultProvider struct {
cm *pretty.ChangeMonitor
}

func NewProvider(ec2api sdk.EC2API, clk clock.Clock, reservationCache, reservationAvailabilityCache *cache.Cache) *DefaultProvider {
func NewProvider(
ec2api sdk.EC2API,
clk clock.Clock,
reservationCache, reservationAvailabilityCache *cache.Cache,
) *DefaultProvider {
return &DefaultProvider{
availabilityCache: availabilityCache{
cache: reservationAvailabilityCache,
Expand All @@ -60,40 +64,41 @@ func NewProvider(ec2api sdk.EC2API, clk clock.Clock, reservationCache, reservati
}

func (p *DefaultProvider) List(ctx context.Context, selectorTerms ...v1.CapacityReservationSelectorTerm) ([]*ec2types.CapacityReservation, error) {
queries := QueriesFromSelectorTerms(selectorTerms...)

var reservations []*ec2types.CapacityReservation
var remainingQueries []*Query
for _, query := range queries {
if value, ok := p.reservationCache.Get(query.CacheKey()); ok {
reservations = append(reservations, value.([]*ec2types.CapacityReservation)...)
} else {
remainingQueries = append(remainingQueries, query)
}
}
if len(remainingQueries) == 0 {
queries := QueriesFromSelectorTerms(selectorTerms...)
reservations, queries = p.resolveCachedQueries(queries...)
if len(queries) == 0 {
return p.filterReservations(reservations), nil
}

for _, query := range remainingQueries {
paginator := ec2.NewDescribeCapacityReservationsPaginator(p.ec2api, query.DescribeCapacityReservationsInput())
for _, q := range queries {
paginator := ec2.NewDescribeCapacityReservationsPaginator(p.ec2api, q.DescribeCapacityReservationsInput())
for paginator.HasMorePages() {
out, err := paginator.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("listing capacity reservations, %w", err)
}
queryReservations := lo.ToSlicePtr(out.CapacityReservations)
p.reservationCache.SetDefault(query.CacheKey(), queryReservations)
p.reservationCache.SetDefault(q.CacheKey(), queryReservations)
reservations = append(reservations, queryReservations...)
p.syncAvailability(lo.SliceToMap(queryReservations, func(r *ec2types.CapacityReservation) (string, int) {
return *r.CapacityReservationId, int(*r.AvailableInstanceCount)
}))
}
}

return p.filterReservations(reservations), nil
}

func (p *DefaultProvider) resolveCachedQueries(queries ...*Query) (reservations []*ec2types.CapacityReservation, remainingQueries []*Query) {
for _, q := range queries {
if value, ok := p.reservationCache.Get(q.CacheKey()); ok {
reservations = append(reservations, value.([]*ec2types.CapacityReservation)...)
} else {
remainingQueries = append(remainingQueries, q)
}
}
return reservations, remainingQueries
}

// filterReservations removes duplicate and expired reservations
func (p *DefaultProvider) filterReservations(reservations []*ec2types.CapacityReservation) []*ec2types.CapacityReservation {
return lo.Filter(lo.UniqBy(reservations, func(r *ec2types.CapacityReservation) string {
Expand Down
63 changes: 25 additions & 38 deletions pkg/providers/capacityreservation/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,52 +60,39 @@ func (q *Query) CacheKey() string {
}

func (q *Query) DescribeCapacityReservationsInput() *ec2.DescribeCapacityReservationsInput {
filters := []ec2types.Filter{{
Name: lo.ToPtr("state"),
Values: []string{string(ec2types.CapacityReservationStateActive)},
}}
if len(q.ids) != 0 {
return &ec2.DescribeCapacityReservationsInput{
Filters: []ec2types.Filter{lo.Must(q.stateFilter())[0]},
Filters: filters,
CapacityReservationIds: q.ids,
}
}
type filterProvider func() ([]ec2types.Filter, bool)
return &ec2.DescribeCapacityReservationsInput{
Filters: lo.Flatten(lo.FilterMap([]filterProvider{
q.stateFilter,
q.ownerIDFilter,
q.tagsFilter,
}, func(f filterProvider, _ int) ([]ec2types.Filter, bool) {
return f()
})),
if q.ownerID != "" {
filters = append(filters, ec2types.Filter{
Name: lo.ToPtr("owner-id"),
Values: []string{q.ownerID},
})
}
}

func (q *Query) stateFilter() ([]ec2types.Filter, bool) {
return []ec2types.Filter{{
Name: lo.ToPtr("state"),
Values: []string{string(ec2types.CapacityReservationStateActive)},
}}, true
}

func (q *Query) ownerIDFilter() ([]ec2types.Filter, bool) {
return []ec2types.Filter{{
Name: lo.ToPtr("owner-id"),
Values: []string{q.ownerID},
}}, q.ownerID != ""
}

func (q *Query) tagsFilter() ([]ec2types.Filter, bool) {
return lo.MapToSlice(q.tags, func(k, v string) ec2types.Filter {
if v == "*" {
if len(q.tags) != 0 {
filters = append(filters, lo.MapToSlice(q.tags, func(k, v string) ec2types.Filter {
if v == "*" {
return ec2types.Filter{
Name: lo.ToPtr("tag-key"),
Values: []string{k},
}
}
return ec2types.Filter{
Name: lo.ToPtr("tag-key"),
Values: []string{k},
Name: lo.ToPtr(fmt.Sprintf("tag:%s", k)),
Values: []string{v},
}
}
return ec2types.Filter{
Name: lo.ToPtr(fmt.Sprintf("tag:%s", k)),
Values: []string{v},
}
}), len(q.tags) != 0

})...)
}
return &ec2.DescribeCapacityReservationsInput{
Filters: filters,
}
}

type availabilityCache struct {
Expand Down
46 changes: 29 additions & 17 deletions pkg/providers/launchtemplate/launchtemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"

karpoptions "sigs.k8s.io/karpenter/pkg/operator/options"

v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1"
awserrors "github.com/aws/karpenter-provider-aws/pkg/errors"
"github.com/aws/karpenter-provider-aws/pkg/operator/options"
Expand Down Expand Up @@ -231,7 +233,7 @@ func (p *DefaultProvider) createLaunchTemplate(ctx context.Context, options *ami
if err != nil {
return ec2types.LaunchTemplate{}, err
}
createLaunchTemplateInput := GetCreateLaunchTemplateInput(options, p.ClusterIPFamily, userData)
createLaunchTemplateInput := GetCreateLaunchTemplateInput(ctx, options, p.ClusterIPFamily, userData)
output, err := p.ec2api.CreateLaunchTemplate(ctx, createLaunchTemplateInput)
if err != nil {
return ec2types.LaunchTemplate{}, err
Expand All @@ -241,32 +243,23 @@ func (p *DefaultProvider) createLaunchTemplate(ctx context.Context, options *ami
}

// you need UserData, AmiID, tags, blockdevicemappings, instance profile,
func GetCreateLaunchTemplateInput(options *amifamily.LaunchTemplate, ClusterIPFamily corev1.IPFamily, userData string) *ec2.CreateLaunchTemplateInput {
func GetCreateLaunchTemplateInput(
ctx context.Context,
options *amifamily.LaunchTemplate,
ClusterIPFamily corev1.IPFamily,
userData string,
) *ec2.CreateLaunchTemplateInput {
launchTemplateDataTags := []ec2types.LaunchTemplateTagSpecificationRequest{
{ResourceType: ec2types.ResourceTypeNetworkInterface, Tags: utils.MergeTags(options.Tags)},
}
if options.CapacityType == karpv1.CapacityTypeSpot {
launchTemplateDataTags = append(launchTemplateDataTags, ec2types.LaunchTemplateTagSpecificationRequest{ResourceType: ec2types.ResourceTypeSpotInstancesRequest, Tags: utils.MergeTags(options.Tags)})
}
networkInterfaces := generateNetworkInterfaces(options, ClusterIPFamily)
return &ec2.CreateLaunchTemplateInput{
lt := &ec2.CreateLaunchTemplateInput{
LaunchTemplateName: aws.String(LaunchTemplateName(options)),
LaunchTemplateData: &ec2types.RequestLaunchTemplateData{
BlockDeviceMappings: blockDeviceMappings(options.BlockDeviceMappings),
CapacityReservationSpecification: &ec2types.LaunchTemplateCapacityReservationSpecificationRequest{
CapacityReservationPreference: lo.Ternary(
options.CapacityType == karpv1.CapacityTypeReserved,
ec2types.CapacityReservationPreferenceCapacityReservationsOnly,
ec2types.CapacityReservationPreferenceNone,
),
CapacityReservationTarget: lo.Ternary(
options.CapacityType == karpv1.CapacityTypeReserved,
&ec2types.CapacityReservationTarget{
CapacityReservationId: &options.CapacityReservationID,
},
nil,
),
},
IamInstanceProfile: &ec2types.LaunchTemplateIamInstanceProfileSpecificationRequest{
Name: aws.String(options.InstanceProfile),
},
Expand Down Expand Up @@ -301,6 +294,25 @@ func GetCreateLaunchTemplateInput(options *amifamily.LaunchTemplate, ClusterIPFa
},
},
}
// Gate this specifically since the update to CapacityReservationPreference will opt od / spot launches out of open
// ODCRs, which is a breaking change from the pre-native ODCR support behavior.
if karpoptions.FromContext(ctx).FeatureGates.ReservedCapacity {
lt.LaunchTemplateData.CapacityReservationSpecification = &ec2types.LaunchTemplateCapacityReservationSpecificationRequest{
CapacityReservationPreference: lo.Ternary(
options.CapacityType == karpv1.CapacityTypeReserved,
ec2types.CapacityReservationPreferenceCapacityReservationsOnly,
ec2types.CapacityReservationPreferenceNone,
),
CapacityReservationTarget: lo.Ternary(
options.CapacityType == karpv1.CapacityTypeReserved,
&ec2types.CapacityReservationTarget{
CapacityReservationId: &options.CapacityReservationID,
},
nil,
),
}
}
return lt
}

// generateNetworkInterfaces generates network interfaces for the launch template.
Expand Down

0 comments on commit 5095850

Please sign in to comment.