Skip to content

Commit

Permalink
checkpoint feedback + lt tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Feb 25, 2025
1 parent babc7a2 commit 010638f
Show file tree
Hide file tree
Showing 18 changed files with 318 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ spec:
properties:
id:
description: ID is the capacity reservation id in EC2
pattern: ^cr-.+$
pattern: ^cr-[0-9a-z]+$
type: string
ownerID:
description: Owner is the owner id for the ami.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,4 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect
)

replace sigs.k8s.io/karpenter => github.com/jmdeal/karpenter v0.0.0-20250221104820-4c25410338d8
replace sigs.k8s.io/karpenter => github.com/jmdeal/karpenter v0.0.0-20250225003856-d34d71584c1a
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jmdeal/karpenter v0.0.0-20250221104820-4c25410338d8 h1:K89kW02bTZkegQnJPlOHSTt+a7WXGQOfrt+pP7lBJos=
github.com/jmdeal/karpenter v0.0.0-20250221104820-4c25410338d8/go.mod h1:/FgjYrt+hwAMcvY46hku76st/aeP4KjOib6RLEj312g=
github.com/jmdeal/karpenter v0.0.0-20250225003856-d34d71584c1a h1:r7gPnoafSMfTjVmirGrkQu/3Suo6wiVDSElWyWIscXk=
github.com/jmdeal/karpenter v0.0.0-20250225003856-d34d71584c1a/go.mod h1:/FgjYrt+hwAMcvY46hku76st/aeP4KjOib6RLEj312g=
github.com/jonathan-innis/aws-sdk-go-prometheus v0.1.1 h1:gmpuckrozJ3lfKqSIia9YMGh0caoQmEY7mQP5MsnbTM=
github.com/jonathan-innis/aws-sdk-go-prometheus v0.1.1/go.mod h1:168XvZFghCqo32ISSWnTXwdlMKzEq+x9TqdfswCjkrQ=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/crds/karpenter.k8s.aws_ec2nodeclasses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ spec:
properties:
id:
description: ID is the capacity reservation id in EC2
pattern: ^cr-.+$
pattern: ^cr-[0-9a-z]+$
type: string
ownerID:
description: Owner is the owner id for the ami.
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/v1/ec2nodeclass.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ type CapacityReservationSelectorTerm struct {
// +optional
Tags map[string]string `json:"tags,omitempty"`
// ID is the capacity reservation id in EC2
// +kubebuilder:validation:Pattern:="^cr-.+$"
// +kubebuilder:validation:Pattern:="^cr-[0-9a-z]+$"
// +optional
ID string `json:"id,omitempty"`
// Owner is the owner id for the ami.
Expand Down
3 changes: 3 additions & 0 deletions pkg/cloudprovider/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ func (c *CloudProvider) areSecurityGroupsDrifted(ec2Instance *instance.Instance,

// Checks if capacity reservations are drifted, by comparing the capacity reservations persisted to the NodeClass to
// the instance's capacity reservation.
// NOTE: We handle drift dynamically for capacity reservations rather than relying on the offerings inducing drift since
// a reserved instance may fall back to on-demand. Relying on offerings could result in drift occurring before fallback
// would cancel it out.
func (c *CloudProvider) isCapacityReservationDrifted(instance *instance.Instance, nodeClass *v1.EC2NodeClass) cloudprovider.DriftReason {
capacityReservationIDs := sets.New(lo.Map(nodeClass.Status.CapacityReservations, func(cr v1.CapacityReservation, _ int) string { return cr.ID })...)
if instance.CapacityReservationID != "" && !capacityReservationIDs.Has(instance.CapacityReservationID) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/controllers/nodeclaim/capacityreservation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
})

ncs := &karpv1.NodeClaimList{}
if err := c.kubeClient.List(ctx, ncs); err != nil {
if err := c.kubeClient.List(ctx, ncs, client.MatchingLabels{
karpv1.NodeRegisteredLabelKey: "true",
}); err != nil {
return reconcile.Result{}, fmt.Errorf("listing nodeclaims, %w", err)
}
updatedNodeClaims := sets.New[string]()
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/nodeclass/capacityreservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *CapacityReservation) Reconcile(ctx context.Context, nc *v1.EC2NodeClass
errors := []error{}
nc.Status.CapacityReservations = []v1.CapacityReservation{}
for _, r := range reservations {
reservation, err := capacityReservationFromEC2(r)
reservation, err := CapacityReservationFromEC2(r)
if err != nil {
errors = append(errors, err)
continue
Expand All @@ -89,7 +89,7 @@ func (c *CapacityReservation) Reconcile(ctx context.Context, nc *v1.EC2NodeClass
return reconcile.Result{RequeueAfter: c.requeueAfter(reservations...)}, nil
}

func capacityReservationFromEC2(cr *ec2types.CapacityReservation) (v1.CapacityReservation, error) {
func CapacityReservationFromEC2(cr *ec2types.CapacityReservation) (v1.CapacityReservation, error) {
// Guard against new instance match criteria added in the future. See https://github.com/kubernetes-sigs/karpenter/issues/806
// for a similar issue.
if !lo.Contains([]ec2types.InstanceMatchCriteria{
Expand Down
14 changes: 3 additions & 11 deletions pkg/controllers/nodeclass/capacityreservation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
. "sigs.k8s.io/karpenter/pkg/test/expectations"

v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1"
"github.com/aws/karpenter-provider-aws/pkg/utils"
)

const selfOwnerID = "012345678901"
Expand Down Expand Up @@ -54,7 +55,7 @@ var _ = Describe("NodeClass Capacity Reservation Reconciler", func() {
InstanceMatchCriteria: ec2types.InstanceMatchCriteriaTargeted,
CapacityReservationId: lo.ToPtr("cr-m5.large-1a-2"),
AvailableInstanceCount: lo.ToPtr[int32](10),
Tags: toEC2Tags(discoveryTags),
Tags: utils.MergeTags(discoveryTags),
State: ec2types.CapacityReservationStateActive,
},
{
Expand All @@ -73,7 +74,7 @@ var _ = Describe("NodeClass Capacity Reservation Reconciler", func() {
InstanceMatchCriteria: ec2types.InstanceMatchCriteriaTargeted,
CapacityReservationId: lo.ToPtr("cr-m5.large-1b-2"),
AvailableInstanceCount: lo.ToPtr[int32](15),
Tags: toEC2Tags(discoveryTags),
Tags: utils.MergeTags(discoveryTags),
State: ec2types.CapacityReservationStateActive,
},
},
Expand Down Expand Up @@ -171,12 +172,3 @@ var _ = Describe("NodeClass Capacity Reservation Reconciler", func() {
}),
)
})

func toEC2Tags(tags map[string]string) []ec2types.Tag {
return lo.MapToSlice(tags, func(key, value string) ec2types.Tag {
return ec2types.Tag{
Key: lo.ToPtr(key),
Value: lo.ToPtr(value),
}
})
}
6 changes: 5 additions & 1 deletion pkg/controllers/nodeclass/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ func TestAPIs(t *testing.T) {
}

var _ = BeforeSuite(func() {
env = coretest.NewEnvironment(coretest.WithCRDs(test.RemoveNodeClassTagValidation(apis.CRDs)...), coretest.WithCRDs(v1alpha1.CRDs...), coretest.WithFieldIndexers(coretest.NodeClaimNodeClassRefFieldIndexer(ctx)))
env = coretest.NewEnvironment(
coretest.WithCRDs(test.DisableCapacityReservationIDValidation(test.RemoveNodeClassTagValidation(apis.CRDs))...),
coretest.WithCRDs(v1alpha1.CRDs...),
coretest.WithFieldIndexers(coretest.NodeClaimNodeClassRefFieldIndexer(ctx)),
)
ctx = coreoptions.ToContext(ctx, coretest.Options(coretest.OptionsFields{FeatureGates: coretest.FeatureGates{ReservedCapacity: lo.ToPtr(true)}}))
ctx = options.ToContext(ctx, test.Options())
awsEnv = test.NewEnvironment(ctx, env)
Expand Down
47 changes: 18 additions & 29 deletions pkg/fake/ec2api.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type EC2Behavior struct {
LaunchTemplates sync.Map
InsufficientCapacityPools atomic.Slice[CapacityPool]
NextError AtomicError

// Tracks the capacity reservations associated with launch templates, if applicable
launchTemplateCapacityReservationIndex sync.Map
}

type EC2API struct {
Expand Down Expand Up @@ -109,6 +112,11 @@ func (e *EC2API) Reset() {
})
e.InsufficientCapacityPools.Reset()
e.NextError.Reset()

e.launchTemplateCapacityReservationIndex.Range(func(k, _ any) bool {
e.launchTemplateCapacityReservationIndex.Delete(k)
return true
})
}

// nolint: gocyclo
Expand Down Expand Up @@ -136,15 +144,6 @@ func (e *EC2API) CreateFleet(_ context.Context, input *ec2.CreateFleetInput, _ .
spotInstanceRequestID = aws.String(test.RandomName())
}

launchTemplates := map[string]*ec2.CreateLaunchTemplateInput{}
for e.CreateLaunchTemplateBehavior.CalledWithInput.Len() > 0 {
lt := e.CreateLaunchTemplateBehavior.CalledWithInput.Pop()
launchTemplates[*lt.LaunchTemplateName] = lt
}
for _, ltInput := range launchTemplates {
e.CreateLaunchTemplateBehavior.CalledWithInput.Add(ltInput)
}

fulfilled := 0
for _, ltc := range input.LaunchTemplateConfigs {
for _, override := range ltc.Overrides {
Expand All @@ -162,34 +161,21 @@ func (e *EC2API) CreateFleet(_ context.Context, input *ec2.CreateFleetInput, _ .
if skipInstance {
continue
}
amiID := lo.ToPtr("")
var capacityReservationID *string
if lt, ok := launchTemplates[lo.FromPtr(ltc.LaunchTemplateSpecification.LaunchTemplateName)]; ok {
amiID = lt.LaunchTemplateData.ImageId
if crs := lt.LaunchTemplateData.CapacityReservationSpecification; crs != nil && crs.CapacityReservationPreference == ec2types.CapacityReservationPreferenceCapacityReservationsOnly {
id := crs.CapacityReservationTarget.CapacityReservationId
if id == nil {
panic("received a launch template targeting capacity reservations without a provided ID")
}
capacityReservationID = id
}
}
if capacityReservationID != nil {

if crID, ok := e.launchTemplateCapacityReservationIndex.Load(*ltc.LaunchTemplateSpecification.LaunchTemplateName); ok {
if cr, ok := lo.Find(e.DescribeCapacityReservationsOutput.Clone().CapacityReservations, func(cr ec2types.CapacityReservation) bool {
return *cr.CapacityReservationId == *capacityReservationID
return *cr.CapacityReservationId == crID.(string)
}); !ok || *cr.AvailableInstanceCount == 0 {
reservationExceededPools = append(reservationExceededPools, CapacityPool{
InstanceType: string(override.InstanceType),
Zone: lo.FromPtr(override.AvailabilityZone),
CapacityType: karpv1.CapacityTypeReserved,
ReservationID: *capacityReservationID,
ReservationID: crID.(string),
})
skipInstance = true
continue
}
}
if skipInstance {
continue
}
amiID := lo.ToPtr("")
if e.CreateLaunchTemplateBehavior.CalledWithInput.Len() > 0 {
lt := e.CreateLaunchTemplateBehavior.CalledWithInput.Pop()
amiID = lt.LaunchTemplateData.ImageId
Expand Down Expand Up @@ -292,6 +278,9 @@ func (e *EC2API) CreateLaunchTemplate(ctx context.Context, input *ec2.CreateLaun
}
launchTemplate := ec2types.LaunchTemplate{LaunchTemplateName: input.LaunchTemplateName}
e.LaunchTemplates.Store(input.LaunchTemplateName, launchTemplate)
if crs := input.LaunchTemplateData.CapacityReservationSpecification; crs != nil && crs.CapacityReservationPreference == ec2types.CapacityReservationPreferenceCapacityReservationsOnly {
e.launchTemplateCapacityReservationIndex.Store(*input.LaunchTemplateName, *crs.CapacityReservationTarget.CapacityReservationId)
}
return &ec2.CreateLaunchTemplateOutput{LaunchTemplate: lo.ToPtr(launchTemplate)}, nil
})
}
Expand Down Expand Up @@ -442,7 +431,7 @@ func (e *EC2API) DescribeLaunchTemplates(_ context.Context, input *ec2.DescribeL
output := &ec2.DescribeLaunchTemplatesOutput{}
e.LaunchTemplates.Range(func(key, value interface{}) bool {
launchTemplate := value.(ec2types.LaunchTemplate)
if lo.Contains(input.LaunchTemplateNames, lo.FromPtr(launchTemplate.LaunchTemplateName)) || len(input.Filters) != 0 && Filter(input.Filters, aws.ToString(launchTemplate.LaunchTemplateId), aws.ToString(launchTemplate.LaunchTemplateName), "", launchTemplate.Tags) {
if lo.Contains(input.LaunchTemplateNames, lo.FromPtr(launchTemplate.LaunchTemplateName)) || len(input.Filters) != 0 && Filter(input.Filters, aws.ToString(launchTemplate.LaunchTemplateId), aws.ToString(launchTemplate.LaunchTemplateName), "", "", launchTemplate.Tags) {
output.LaunchTemplates = append(output.LaunchTemplates, launchTemplate)
}
return true
Expand Down
34 changes: 11 additions & 23 deletions pkg/fake/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ func SubnetsFromFleetRequest(createFleetInput *ec2.CreateFleetInput) []string {
// Filters are chained with a logical "AND"
func FilterDescribeSecurtyGroups(sgs []ec2types.SecurityGroup, filters []ec2types.Filter) []ec2types.SecurityGroup {
return lo.Filter(sgs, func(group ec2types.SecurityGroup, _ int) bool {
return Filter(filters, *group.GroupId, *group.GroupName, "", group.Tags)
return Filter(filters, *group.GroupId, *group.GroupName, "", "", group.Tags)
})
}

// FilterDescribeSubnets filters the passed in subnets based on the filters passed in.
// Filters are chained with a logical "AND"
func FilterDescribeSubnets(subnets []ec2types.Subnet, filters []ec2types.Filter) []ec2types.Subnet {
return lo.Filter(subnets, func(subnet ec2types.Subnet, _ int) bool {
return Filter(filters, *subnet.SubnetId, "", "", subnet.Tags)
return Filter(filters, *subnet.SubnetId, "", "", "", subnet.Tags)
})
}

Expand All @@ -108,38 +108,26 @@ func FilterDescribeCapacityReservations(crs []ec2types.CapacityReservation, ids
if len(ids) != 0 && !idSet.Has(*cr.CapacityReservationId) {
return false
}
if stateFilter, ok := lo.Find(filters, func(f ec2types.Filter) bool {
return lo.FromPtr(f.Name) == "state"
}); ok {
if !lo.Contains(stateFilter.Values, string(cr.State)) {
return false
}
}
return Filter(lo.Reject(filters, func(f ec2types.Filter, _ int) bool {
return lo.FromPtr(f.Name) == "state"
}), *cr.CapacityReservationId, "", *cr.OwnerId, cr.Tags)
return Filter(filters, *cr.CapacityReservationId, "", *cr.OwnerId, string(cr.State), cr.Tags)
})
}

func FilterDescribeImages(images []ec2types.Image, filters []ec2types.Filter) []ec2types.Image {
return lo.Filter(images, func(image ec2types.Image, _ int) bool {
if stateFilter, ok := lo.Find(filters, func(f ec2types.Filter) bool {
return lo.FromPtr(f.Name) == "state"
}); ok {
if !lo.Contains(stateFilter.Values, string(image.State)) {
return false
}
}
return Filter(lo.Reject(filters, func(f ec2types.Filter, _ int) bool {
return lo.FromPtr(f.Name) == "state"
}), *image.ImageId, *image.Name, "", image.Tags)
return Filter(filters, *image.ImageId, *image.Name, "", string(image.State), image.Tags)
})
}

//nolint:gocyclo
func Filter(filters []ec2types.Filter, id, name, owner string, tags []ec2types.Tag) bool {
func Filter(filters []ec2types.Filter, id, name, owner, state string, tags []ec2types.Tag) bool {
return lo.EveryBy(filters, func(filter ec2types.Filter) bool {
switch filterName := aws.ToString(filter.Name); {
case filterName == "state":
for _, val := range filter.Values {
if state == val {
return true
}
}
case filterName == "subnet-id" || filterName == "group-id" || filterName == "image-id":
for _, val := range filter.Values {
if id == val {
Expand Down
Loading

0 comments on commit 010638f

Please sign in to comment.