Skip to content

Commit

Permalink
checkpoint review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Feb 21, 2025
1 parent cc6c3ad commit 2e27014
Show file tree
Hide file tree
Showing 17 changed files with 112 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -679,9 +679,6 @@ spec:
availabilityZone:
description: The availability zone the capacity reservation is available in.
type: string
availableInstanceCount:
description: The last known available instance count for the capacity reservation.
type: integer
endTime:
description: |-
The time at which the capacity reservation expires. Once expired, the reserved capacity is released and Karpenter
Expand All @@ -705,17 +702,12 @@ spec:
description: The ID of the AWS account that owns the capacity reservation.
pattern: ^[0-9]{12}$
type: string
totalInstanceCount:
description: The total instance count for the capacity reservation.
type: integer
required:
- availabilityZone
- availableInstanceCount
- id
- instanceMatchCriteria
- instanceType
- ownerID
- totalInstanceCount
type: object
type: array
conditions:
Expand Down
8 changes: 0 additions & 8 deletions pkg/apis/crds/karpenter.k8s.aws_ec2nodeclasses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -676,9 +676,6 @@ spec:
availabilityZone:
description: The availability zone the capacity reservation is available in.
type: string
availableInstanceCount:
description: The last known available instance count for the capacity reservation.
type: integer
endTime:
description: |-
The time at which the capacity reservation expires. Once expired, the reserved capacity is released and Karpenter
Expand All @@ -702,17 +699,12 @@ spec:
description: The ID of the AWS account that owns the capacity reservation.
pattern: ^[0-9]{12}$
type: string
totalInstanceCount:
description: The total instance count for the capacity reservation.
type: integer
required:
- availabilityZone
- availableInstanceCount
- id
- instanceMatchCriteria
- instanceType
- ownerID
- totalInstanceCount
type: object
type: array
conditions:
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/v1/ec2nodeclass_hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ var _ = Describe("Hash", func() {
nodeClass.Spec.AMISelectorTerms = []v1.AMISelectorTerm{{
Tags: map[string]string{"ami-test-key": "ami-test-value"},
}}
nodeClass.Spec.SubnetSelectorTerms = []v1.SubnetSelectorTerm{{
nodeClass.Spec.CapacityReservationSelectorTerms = []v1.CapacityReservationSelectorTerm{{
Tags: map[string]string{"cr-test-key": "cr-test-value"},
}}
updatedHash := nodeClass.Hash()
Expand Down
6 changes: 0 additions & 6 deletions pkg/apis/v1/ec2nodeclass_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ type CapacityReservation struct {
// The availability zone the capacity reservation is available in.
// +required
AvailabilityZone string `json:"availabilityZone"`
// The last known available instance count for the capacity reservation.
// +required
AvailableInstanceCount int `json:"availableInstanceCount,omitempty" hash:"ignore"`
// The time at which the capacity reservation expires. Once expired, the reserved capacity is released and Karpenter
// will no longer be able to launch instances into that reservation.
// +optional
Expand All @@ -94,9 +91,6 @@ type CapacityReservation struct {
// +kubebuilder:validation:Pattern:="^[0-9]{12}$"
// +required
OwnerID string `json:"ownerID"`
// The total instance count for the capacity reservation.
// +required
TotalInstanceCount int `json:"totalInstanceCount" hash:"ignore"`
}

// EC2NodeClassStatus contains the resolved state of the EC2NodeClass
Expand Down
18 changes: 9 additions & 9 deletions pkg/apis/v1/ec2nodeclass_validation_cel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,15 @@ var _ = Describe("CEL/Validation", func() {
}}
Expect(env.Client.Create(ctx, nc)).To(Succeed())
})
It("should succeed for a valid ownerID", func() {
nc.Spec.CapacityReservationSelectorTerms = []v1.CapacityReservationSelectorTerm{{
OwnerID: "012345678901",
Tags: map[string]string{
"test": "testvalue",
},
}}
Expect(env.Client.Create(ctx, nc)).To(Succeed())
})
It("should fail with a capacity reservation selector on a malformed id", func() {
nc.Spec.CapacityReservationSelectorTerms = []v1.CapacityReservationSelectorTerm{{
ID: "r-12345749",
Expand Down Expand Up @@ -520,15 +529,6 @@ var _ = Describe("CEL/Validation", func() {
}}
Expect(env.Client.Create(ctx, nc)).ToNot(Succeed())
})
It("should succeed for a valid ownerID", func() {
nc.Spec.CapacityReservationSelectorTerms = []v1.CapacityReservationSelectorTerm{{
OwnerID: "012345678901",
Tags: map[string]string{
"test": "testvalue",
},
}}
Expect(env.Client.Create(ctx, nc)).To(Succeed())
})
It("should fail when the ownerID is malformed", func() {
nc.Spec.CapacityReservationSelectorTerms = []v1.CapacityReservationSelectorTerm{{
OwnerID: "01234567890", // OwnerID must be 12 digits, this is 11
Expand Down
15 changes: 7 additions & 8 deletions pkg/cache/unavailableofferings.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,32 +48,31 @@ func NewUnavailableOfferings() *UnavailableOfferings {
}

// IsUnavailable returns true if the offering appears in the cache
func (u *UnavailableOfferings) IsUnavailable(instanceType string, zone, capacityType string) bool {
func (u *UnavailableOfferings) IsUnavailable(instanceType ec2types.InstanceType, zone, capacityType string) bool {
_, found := u.cache.Get(u.key(instanceType, zone, capacityType))
return found
}

// MarkUnavailable communicates recently observed temporary capacity shortages in the provided offerings
func (u *UnavailableOfferings) MarkUnavailable(ctx context.Context, unavailableReason, instanceType, zone, capacityType string) {
func (u *UnavailableOfferings) MarkUnavailable(ctx context.Context, unavailableReason string, instanceType ec2types.InstanceType, zone, capacityType string) {
// even if the key is already in the cache, we still need to call Set to extend the cached entry's TTL
log.FromContext(ctx).WithValues(
"reason", unavailableReason,
"instance-type", instanceType,
"zone", zone,
"capacity-type", capacityType,
"ttl", UnavailableOfferingsTTL,
).V(1).Info("removing offering from offerings")
"ttl", UnavailableOfferingsTTL).V(1).Info("removing offering from offerings")
u.cache.SetDefault(u.key(instanceType, zone, capacityType), struct{}{})
atomic.AddUint64(&u.SeqNum, 1)
}

func (u *UnavailableOfferings) MarkUnavailableForFleetErr(ctx context.Context, fleetErr ec2types.CreateFleetError, capacityType string) {
instanceType := fleetErr.LaunchTemplateAndOverrides.Overrides.InstanceType
zone := aws.ToString(fleetErr.LaunchTemplateAndOverrides.Overrides.AvailabilityZone)
u.MarkUnavailable(ctx, lo.FromPtr(fleetErr.ErrorCode), string(instanceType), zone, capacityType)
u.MarkUnavailable(ctx, lo.FromPtr(fleetErr.ErrorCode), instanceType, zone, capacityType)
}

func (u *UnavailableOfferings) DeleteOffering(instanceType, zone, capacityType string) {
func (u *UnavailableOfferings) Delete(instanceType ec2types.InstanceType, zone string, capacityType string) {
u.cache.Delete(u.key(instanceType, zone, capacityType))
}

Expand All @@ -82,6 +81,6 @@ func (u *UnavailableOfferings) Flush() {
}

// key returns the cache key for all offerings in the cache
func (*UnavailableOfferings) key(instanceType, zone, capacityType string) string {
return fmt.Sprintf("o:%s:%s:%s", capacityType, instanceType, zone)
func (u *UnavailableOfferings) key(instanceType ec2types.InstanceType, zone string, capacityType string) string {
return fmt.Sprintf("%s:%s:%s", capacityType, instanceType, zone)
}
3 changes: 2 additions & 1 deletion pkg/controllers/interruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/metrics"

ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/awslabs/operatorpkg/singleton"
"go.uber.org/multierr"
Expand Down Expand Up @@ -207,7 +208,7 @@ func (c *Controller) handleNodeClaim(ctx context.Context, msg messages.Message,
zone := nodeClaim.Labels[corev1.LabelTopologyZone]
instanceType := nodeClaim.Labels[corev1.LabelInstanceTypeStable]
if zone != "" && instanceType != "" {
c.unavailableOfferingsCache.MarkUnavailable(ctx, string(msg.Kind()), instanceType, zone, karpv1.CapacityTypeSpot)
c.unavailableOfferingsCache.MarkUnavailable(ctx, string(msg.Kind()), ec2types.InstanceType(instanceType), zone, karpv1.CapacityTypeSpot)
}
}
if action != NoAction {
Expand Down
10 changes: 7 additions & 3 deletions pkg/controllers/nodeclass/capacityreservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (c *CapacityReservation) Reconcile(ctx context.Context, nc *v1.EC2NodeClass
return reconcile.Result{}, fmt.Errorf("getting capacity reservations, %w", err)
}
if len(reservations) == 0 {
nc.Status.CapacityReservations = nil
nc.StatusConditions().SetTrue(v1.ConditionTypeCapacityReservationsReady)
return reconcile.Result{RequeueAfter: capacityReservationPollPeriod}, nil
}
Expand Down Expand Up @@ -86,6 +87,8 @@ func (c *CapacityReservation) Reconcile(ctx context.Context, nc *v1.EC2NodeClass
}

func capacityReservationFromEC2(cr *ec2types.CapacityReservation) (v1.CapacityReservation, error) {
// Guard against new instance match criteria added in the future. See https://github.com/kubernetes-sigs/karpenter/issues/806
// for a similar issue.
if !lo.Contains([]ec2types.InstanceMatchCriteria{
ec2types.InstanceMatchCriteriaOpen,
ec2types.InstanceMatchCriteriaTargeted,
Expand All @@ -98,17 +101,18 @@ func capacityReservationFromEC2(cr *ec2types.CapacityReservation) (v1.CapacityRe
}

return v1.CapacityReservation{
AvailabilityZone: *cr.AvailabilityZone,
// AvailableInstanceCount: int(*cr.AvailableInstanceCount),
AvailabilityZone: *cr.AvailabilityZone,
EndTime: endTime,
ID: *cr.CapacityReservationId,
InstanceMatchCriteria: string(cr.InstanceMatchCriteria),
InstanceType: *cr.InstanceType,
OwnerID: *cr.OwnerId,
TotalInstanceCount: int(*cr.TotalInstanceCount),
}, nil
}

// requeueAfter determines the duration until the next target reconciliation time based on the provided reservations. If
// any reservations are expected to expire before we would typically requeue, the duration will be based on the
// nearest expiration time.
func (c *CapacityReservation) requeueAfter(reservations ...*ec2types.CapacityReservation) time.Duration {
var next *time.Time
for _, reservation := range reservations {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/nodeclass/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (n Validation) Reconcile(ctx context.Context, nodeClass *v1.EC2NodeClass) (
return reconcile.Result{}, nil
}

createLaunchTemplateInput := launchtemplate.GetCreateLaunchTemplateInput(mockOptions(*nodeClaim, nodeClass, tags), corev1.IPv4Protocol, "")
createLaunchTemplateInput := launchtemplate.GetCreateLaunchTemplateInput(ctx, mockOptions(*nodeClaim, nodeClass, tags), corev1.IPv4Protocol, "")
createLaunchTemplateInput.DryRun = aws.Bool(true)

if _, err := n.ec2api.CreateLaunchTemplate(ctx, createLaunchTemplateInput); awserrors.IgnoreDryRunError(err) != nil {
Expand Down
23 changes: 3 additions & 20 deletions pkg/providers/amifamily/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ func (r DefaultResolver) Resolve(nodeClass *v1.EC2NodeClass, nodeClaim *karpv1.N
// ordering in this string.
reservationIDs: lo.Ternary(
capacityType == karpv1.CapacityTypeReserved,
strings.Join(selectReservationIDs(it, nodeClaim), ","),
strings.Join(lo.FilterMap(it.Offerings, func(o *cloudprovider.Offering, _ int) (string, bool) {
return o.ReservationID(), o.CapacityType() == karpv1.CapacityTypeReserved
}), ","),
"",
),
}
Expand All @@ -173,25 +175,6 @@ func (r DefaultResolver) Resolve(nodeClass *v1.EC2NodeClass, nodeClaim *karpv1.N
return resolvedTemplates, nil
}

// selectReservationIDs filters the set of reservation IDs available on the given instance type to only include those
// that are compatible with the given NodeClaim. Additionally, if there are multiple reservations available in the same
// zone, only the reservation with the greatest availability is selected. This is to address a limitation in the
// CreateFleet interface, where you can only provide one override for a given instance-zone combination.
func selectReservationIDs(it *cloudprovider.InstanceType, nodeClaim *karpv1.NodeClaim) []string {
zonalOfferings := map[string]*cloudprovider.Offering{}
for _, o := range it.Offerings.Available().Compatible(scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...)) {
if o.CapacityType() != karpv1.CapacityTypeReserved {
continue
}
if current, ok := zonalOfferings[o.Zone()]; !ok || current.ReservationCapacity < o.ReservationCapacity {
zonalOfferings[o.Zone()] = o
}
}
return lo.Map(lo.Values(zonalOfferings), func(o *cloudprovider.Offering, _ int) string {
return o.ReservationID()
})
}

func GetAMIFamily(amiFamily string, options *Options) AMIFamily {
switch amiFamily {
case v1.AMIFamilyBottlerocket:
Expand Down
39 changes: 22 additions & 17 deletions pkg/providers/capacityreservation/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ type DefaultProvider struct {
cm *pretty.ChangeMonitor
}

func NewProvider(ec2api sdk.EC2API, clk clock.Clock, reservationCache, reservationAvailabilityCache *cache.Cache) *DefaultProvider {
func NewProvider(
ec2api sdk.EC2API,
clk clock.Clock,
reservationCache, reservationAvailabilityCache *cache.Cache,
) *DefaultProvider {
return &DefaultProvider{
availabilityCache: availabilityCache{
cache: reservationAvailabilityCache,
Expand All @@ -60,40 +64,41 @@ func NewProvider(ec2api sdk.EC2API, clk clock.Clock, reservationCache, reservati
}

func (p *DefaultProvider) List(ctx context.Context, selectorTerms ...v1.CapacityReservationSelectorTerm) ([]*ec2types.CapacityReservation, error) {
queries := QueriesFromSelectorTerms(selectorTerms...)

var reservations []*ec2types.CapacityReservation
var remainingQueries []*Query
for _, query := range queries {
if value, ok := p.reservationCache.Get(query.CacheKey()); ok {
reservations = append(reservations, value.([]*ec2types.CapacityReservation)...)
} else {
remainingQueries = append(remainingQueries, query)
}
}
if len(remainingQueries) == 0 {
queries := QueriesFromSelectorTerms(selectorTerms...)
reservations, queries = p.resolveCachedQueries(queries...)
if len(queries) == 0 {
return p.filterReservations(reservations), nil
}

for _, query := range remainingQueries {
paginator := ec2.NewDescribeCapacityReservationsPaginator(p.ec2api, query.DescribeCapacityReservationsInput())
for _, q := range queries {
paginator := ec2.NewDescribeCapacityReservationsPaginator(p.ec2api, q.DescribeCapacityReservationsInput())
for paginator.HasMorePages() {
out, err := paginator.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("listing capacity reservations, %w", err)
}
queryReservations := lo.ToSlicePtr(out.CapacityReservations)
p.reservationCache.SetDefault(query.CacheKey(), queryReservations)
p.reservationCache.SetDefault(q.CacheKey(), queryReservations)
reservations = append(reservations, queryReservations...)
p.syncAvailability(lo.SliceToMap(queryReservations, func(r *ec2types.CapacityReservation) (string, int) {
return *r.CapacityReservationId, int(*r.AvailableInstanceCount)
}))
}
}

return p.filterReservations(reservations), nil
}

func (p *DefaultProvider) resolveCachedQueries(queries ...*Query) (reservations []*ec2types.CapacityReservation, remainingQueries []*Query) {
for _, q := range queries {
if value, ok := p.reservationCache.Get(q.CacheKey()); ok {
reservations = append(reservations, value.([]*ec2types.CapacityReservation)...)
} else {
remainingQueries = append(remainingQueries, q)
}
}
return reservations, remainingQueries
}

// filterReservations removes duplicate and expired reservations
func (p *DefaultProvider) filterReservations(reservations []*ec2types.CapacityReservation) []*ec2types.CapacityReservation {
return lo.Filter(lo.UniqBy(reservations, func(r *ec2types.CapacityReservation) string {
Expand Down
Loading

0 comments on commit 2e27014

Please sign in to comment.