Skip to content

Commit

Permalink
remaining functional + feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Feb 25, 2025
1 parent e3d1cdd commit 9d6c1b0
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ spec:
type: string
id:
description: The id for the capacity reservation.
pattern: ^cr-.+$
pattern: ^cr-[0-9a-z]+$
type: string
instanceMatchCriteria:
description: Indicates the type of instance launches the capacity reservation accepts.
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/crds/karpenter.k8s.aws_ec2nodeclasses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ spec:
type: string
id:
description: The id for the capacity reservation.
pattern: ^cr-.+$
pattern: ^cr-[0-9a-z]+$
type: string
instanceMatchCriteria:
description: Indicates the type of instance launches the capacity reservation accepts.
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/v1/ec2nodeclass_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type CapacityReservation struct {
// +optional
EndTime *metav1.Time `json:"endTime,omitempty" hash:"ignore"`
// The id for the capacity reservation.
// +kubebuilder:validation:Pattern:="^cr-.+$"
// +kubebuilder:validation:Pattern:="^cr-[0-9a-z]+$"
// +required
ID string `json:"id"`
// Indicates the type of instance launches the capacity reservation accepts.
Expand Down
8 changes: 4 additions & 4 deletions pkg/cloudprovider/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,11 +876,11 @@ var _ = Describe("CloudProvider", func() {
It("should dynamically drift nodeclaims for capacity reservations", func() {
nodeClass.Status.CapacityReservations = []v1.CapacityReservation{
{
AvailabilityZone: "test-zone-1a",
ID: "cr-foo",
AvailabilityZone: "test-zone-1a",
ID: "cr-foo",
InstanceMatchCriteria: string(ec2types.InstanceMatchCriteriaTargeted),
InstanceType: "m5.large",
OwnerID: "012345678901",
InstanceType: "m5.large",
OwnerID: "012345678901",
},
}
setReservationID := func(id string) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func NewControllers(
ssminvalidation.NewController(ssmCache, amiProvider),
status.NewController[*v1.EC2NodeClass](kubeClient, mgr.GetEventRecorderFor("karpenter"), status.EmitDeprecatedMetrics),
controllersversion.NewController(versionProvider, versionProvider.UpdateVersionWithValidation),
capacityreservation.NewController(),
capacityreservation.NewController(kubeClient, cloudProvider),
}
if options.FromContext(ctx).InterruptionQueue != "" {
sqsapi := servicesqs.NewFromConfig(cfg)
Expand Down
28 changes: 15 additions & 13 deletions pkg/controllers/nodeclaim/capacityreservation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@ type Controller struct {
kubeClient client.Client
}

func NewController() *Controller {
return nil
func NewController(kubeClient client.Client, cp cloudprovider.CloudProvider) *Controller {
return &Controller{
cp: cp,
kubeClient: kubeClient,
}
}

func (*Controller) Name() string {
Expand All @@ -58,15 +61,13 @@ func (c *Controller) Register(_ context.Context, m manager.Manager) error {

func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, c.Name())

cpNodeClaims, err := c.cp.List(ctx)
if err != nil {
return reconcile.Result{}, fmt.Errorf("listing instance types, %w", err)
}
cpNodeClaimIndex := lo.SliceToMap(cpNodeClaims, func(nc *karpv1.NodeClaim) (string, *karpv1.NodeClaim) {
providerIDsToCPNodeClaims := lo.SliceToMap(cpNodeClaims, func(nc *karpv1.NodeClaim) (string, *karpv1.NodeClaim) {
return nc.Status.ProviderID, nc
})

ncs := &karpv1.NodeClaimList{}
if err := c.kubeClient.List(ctx, ncs, client.MatchingLabels{
karpv1.NodeRegisteredLabelKey: "true",
Expand All @@ -76,7 +77,7 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
updatedNodeClaims := sets.New[string]()
var errs []error
for i := range ncs.Items {
cpNC, ok := cpNodeClaimIndex[ncs.Items[i].Status.ProviderID]
cpNC, ok := providerIDsToCPNodeClaims[ncs.Items[i].Status.ProviderID]
if !ok {
continue
}
Expand All @@ -88,10 +89,11 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
updatedNodeClaims.Insert(ncs.Items[i].Name)
}
}
log.FromContext(ctx).WithValues("NodeClaims", lo.Map(updatedNodeClaims.UnsortedList(), func(name string, _ int) klog.ObjectRef {
return klog.KRef("", name)
})).V(1).Info("updated capacity type for nodeclaims")

if len(updatedNodeClaims) != 0 {
log.FromContext(ctx).WithValues("NodeClaims", lo.Map(updatedNodeClaims.UnsortedList(), func(name string, _ int) klog.ObjectRef {
return klog.KRef("", name)
})).V(1).Info("updated capacity type for nodeclaims")
}
if len(errs) != 0 {
if lo.EveryBy(errs, func(err error) bool { return errors.IsConflict(err) }) {
return reconcile.Result{Requeue: true}, nil
Expand All @@ -118,7 +120,7 @@ func (c *Controller) syncCapacityType(ctx context.Context, capacityType string,
stored := nc.DeepCopy()
nc.Labels[karpv1.CapacityTypeLabelKey] = karpv1.CapacityTypeOnDemand
delete(nc.Labels, cloudprovider.ReservationIDLabel)
if err := c.kubeClient.Patch(ctx, nc, client.MergeFrom(stored)); err != nil {
if err := c.kubeClient.Patch(ctx, nc, client.MergeFrom(stored)); client.IgnoreNotFound(err) != nil {
return false, fmt.Errorf("patching nodeclaim %q, %w", nc.Name, err)
}
}
Expand All @@ -144,8 +146,8 @@ func (c *Controller) syncCapacityType(ctx context.Context, capacityType string,
}
stored := n.DeepCopy()
n.Labels[karpv1.CapacityTypeLabelKey] = karpv1.CapacityTypeOnDemand
delete(nc.Labels, cloudprovider.ReservationIDLabel)
if err := c.kubeClient.Patch(ctx, n, client.MergeFrom(stored)); err != nil {
delete(n.Labels, cloudprovider.ReservationIDLabel)
if err := c.kubeClient.Patch(ctx, n, client.MergeFrom(stored)); client.IgnoreNotFound(err) != nil {
return false, fmt.Errorf("patching node %q, %w", n.Name, err)
}
}
Expand Down
169 changes: 169 additions & 0 deletions pkg/controllers/nodeclaim/capacityreservation/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package capacityreservation_test

import (
"context"
"fmt"
"testing"

"github.com/aws/aws-sdk-go-v2/service/ec2"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
corecloudprovider "sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/events"
coretest "sigs.k8s.io/karpenter/pkg/test"

"github.com/aws/karpenter-provider-aws/pkg/apis"
"github.com/aws/karpenter-provider-aws/pkg/cloudprovider"
"github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclaim/capacityreservation"
"github.com/aws/karpenter-provider-aws/pkg/fake"
"github.com/aws/karpenter-provider-aws/pkg/operator/options"
"github.com/aws/karpenter-provider-aws/pkg/test"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
"sigs.k8s.io/karpenter/pkg/test/v1alpha1"
. "sigs.k8s.io/karpenter/pkg/utils/testing"
)

var ctx context.Context
var stop context.CancelFunc
var env *coretest.Environment
var awsEnv *test.Environment
var controller *capacityreservation.Controller

func TestAWS(t *testing.T) {
ctx = TestContextWithLogger(t)
RegisterFailHandler(Fail)
RunSpecs(t, "SSM Invalidation Controller")
}

var _ = BeforeSuite(func() {
env = coretest.NewEnvironment(coretest.WithCRDs(apis.CRDs...), coretest.WithCRDs(v1alpha1.CRDs...), coretest.WithFieldIndexers(coretest.NodeProviderIDFieldIndexer(ctx)))
ctx = options.ToContext(ctx, test.Options())
ctx, stop = context.WithCancel(ctx)
awsEnv = test.NewEnvironment(ctx, env)

cloudProvider := cloudprovider.New(awsEnv.InstanceTypesProvider, awsEnv.InstanceProvider, events.NewRecorder(&record.FakeRecorder{}),
env.Client, awsEnv.AMIProvider, awsEnv.SecurityGroupProvider, awsEnv.CapacityReservationProvider)
controller = capacityreservation.NewController(env.Client, cloudProvider)
})

var _ = AfterSuite(func() {
stop()
Expect(env.Stop()).To(Succeed(), "Failed to stop environment")
})

var _ = Describe("Capacity Reservation NodeClaim Controller", func() {
var nodeClaim *karpv1.NodeClaim
var node *corev1.Node
var reservationID string
BeforeEach(func() {
reservationID = "cr-foo"
instance := ec2types.Instance{
ImageId: lo.ToPtr(fake.ImageID()),
InstanceType: ec2types.InstanceType("m5.large"),
SubnetId: lo.ToPtr(fake.SubnetID()),
SpotInstanceRequestId: nil,
State: &ec2types.InstanceState{
Name: ec2types.InstanceStateNameRunning,
},
InstanceId: lo.ToPtr(fake.InstanceID()),
CapacityReservationId: &reservationID,
Placement: &ec2types.Placement{
AvailabilityZone: lo.ToPtr("test-zone-1a"),
},
SecurityGroups: []ec2types.GroupIdentifier{{GroupId: lo.ToPtr(fake.SecurityGroupID())}},
}
awsEnv.EC2API.DescribeInstancesBehavior.Output.Set(&ec2.DescribeInstancesOutput{
Reservations: []ec2types.Reservation{{Instances: []ec2types.Instance{instance}}},
})

nodeClaim = coretest.NodeClaim(karpv1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
karpv1.CapacityTypeLabelKey: karpv1.CapacityTypeReserved,
corecloudprovider.ReservationIDLabel: reservationID,
karpv1.NodeRegisteredLabelKey: "true",
},
},
Status: karpv1.NodeClaimStatus{
ProviderID: fmt.Sprintf("aws:///test-zone-1a/%s", *instance.InstanceId),
},
})
node = coretest.NodeClaimLinkedNode(nodeClaim)
})
It("should demote nodeclaims and nodes from reserved to on-demand", func() {
ExpectApplied(ctx, env.Client, nodeClaim, node)
ExpectSingletonReconciled(ctx, controller)

// Since the backing instance is still under a capacity reservation, we shouldn't demote the nodeclaim or node
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.Labels).To(HaveKeyWithValue(karpv1.CapacityTypeLabelKey, karpv1.CapacityTypeReserved))
Expect(nodeClaim.Labels).To(HaveKeyWithValue(corecloudprovider.ReservationIDLabel, reservationID))
node = ExpectExists(ctx, env.Client, node)
Expect(node.Labels).To(HaveKeyWithValue(karpv1.CapacityTypeLabelKey, karpv1.CapacityTypeReserved))
Expect(node.Labels).To(HaveKeyWithValue(corecloudprovider.ReservationIDLabel, reservationID))

out := awsEnv.EC2API.DescribeInstancesBehavior.Output.Clone()
out.Reservations[0].Instances[0].CapacityReservationId = nil
awsEnv.EC2API.DescribeInstancesBehavior.Output.Set(out)

// Now that the backing instance is no longer part of a capacity reservation, we should demote the resources by
// updating the capacity type to on-demand and removing the reservation ID label.
ExpectSingletonReconciled(ctx, controller)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.Labels).To(HaveKeyWithValue(karpv1.CapacityTypeLabelKey, karpv1.CapacityTypeOnDemand))
Expect(nodeClaim.Labels).ToNot(HaveKey(corecloudprovider.ReservationIDLabel))
node = ExpectExists(ctx, env.Client, node)
Expect(node.Labels).To(HaveKeyWithValue(karpv1.CapacityTypeLabelKey, karpv1.CapacityTypeOnDemand))
Expect(node.Labels).ToNot(HaveKey(corecloudprovider.ReservationIDLabel))
})
It("should demote nodes from reserved to on-demand even if their nodeclaim was demoted previously", func() {
out := awsEnv.EC2API.DescribeInstancesBehavior.Output.Clone()
out.Reservations[0].Instances[0].CapacityReservationId = nil
awsEnv.EC2API.DescribeInstancesBehavior.Output.Set(out)

ExpectApplied(ctx, env.Client, nodeClaim)
ExpectSingletonReconciled(ctx, controller)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.Labels).To(HaveKeyWithValue(karpv1.CapacityTypeLabelKey, karpv1.CapacityTypeOnDemand))
Expect(nodeClaim.Labels).ToNot(HaveKey(corecloudprovider.ReservationIDLabel))

ExpectApplied(ctx, env.Client, node)
ExpectSingletonReconciled(ctx, controller)
node = ExpectExists(ctx, env.Client, node)
Expect(node.Labels).To(HaveKeyWithValue(karpv1.CapacityTypeLabelKey, karpv1.CapacityTypeOnDemand))
Expect(node.Labels).ToNot(HaveKey(corecloudprovider.ReservationIDLabel))
})
It("should ignore nodeclaims which aren't registered", func() {
out := awsEnv.EC2API.DescribeInstancesBehavior.Output.Clone()
out.Reservations[0].Instances[0].CapacityReservationId = nil
awsEnv.EC2API.DescribeInstancesBehavior.Output.Set(out)
delete(nodeClaim.Labels, karpv1.NodeRegisteredLabelKey)

ExpectApplied(ctx, env.Client, nodeClaim)
ExpectSingletonReconciled(ctx, controller)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.Labels).To(HaveKeyWithValue(karpv1.CapacityTypeLabelKey, karpv1.CapacityTypeReserved))
Expect(nodeClaim.Labels).To(HaveKeyWithValue(corecloudprovider.ReservationIDLabel, reservationID))
})
})
13 changes: 6 additions & 7 deletions pkg/fake/ec2api.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,11 @@ type EC2Behavior struct {
CreateLaunchTemplateBehavior MockedFunction[ec2.CreateLaunchTemplateInput, ec2.CreateLaunchTemplateOutput]
CalledWithDescribeImagesInput AtomicPtrSlice[ec2.DescribeImagesInput]
Instances sync.Map
LaunchTemplates sync.Map
InsufficientCapacityPools atomic.Slice[CapacityPool]
NextError AtomicError

// Tracks the capacity reservations associated with launch templates, if applicable
launchTemplateCapacityReservationIndex sync.Map
LaunchTemplates sync.Map
launchTemplatesToCapacityReservations sync.Map // map[lt-name]cr-id
}

type EC2API struct {
Expand Down Expand Up @@ -113,8 +112,8 @@ func (e *EC2API) Reset() {
e.InsufficientCapacityPools.Reset()
e.NextError.Reset()

e.launchTemplateCapacityReservationIndex.Range(func(k, _ any) bool {
e.launchTemplateCapacityReservationIndex.Delete(k)
e.launchTemplatesToCapacityReservations.Range(func(k, _ any) bool {
e.launchTemplatesToCapacityReservations.Delete(k)
return true
})
}
Expand Down Expand Up @@ -162,7 +161,7 @@ func (e *EC2API) CreateFleet(_ context.Context, input *ec2.CreateFleetInput, _ .
continue
}

if crID, ok := e.launchTemplateCapacityReservationIndex.Load(*ltc.LaunchTemplateSpecification.LaunchTemplateName); ok {
if crID, ok := e.launchTemplatesToCapacityReservations.Load(*ltc.LaunchTemplateSpecification.LaunchTemplateName); ok {
if cr, ok := lo.Find(e.DescribeCapacityReservationsOutput.Clone().CapacityReservations, func(cr ec2types.CapacityReservation) bool {
return *cr.CapacityReservationId == crID.(string)
}); !ok || *cr.AvailableInstanceCount == 0 {
Expand Down Expand Up @@ -279,7 +278,7 @@ func (e *EC2API) CreateLaunchTemplate(ctx context.Context, input *ec2.CreateLaun
launchTemplate := ec2types.LaunchTemplate{LaunchTemplateName: input.LaunchTemplateName}
e.LaunchTemplates.Store(input.LaunchTemplateName, launchTemplate)
if crs := input.LaunchTemplateData.CapacityReservationSpecification; crs != nil && crs.CapacityReservationPreference == ec2types.CapacityReservationPreferenceCapacityReservationsOnly {
e.launchTemplateCapacityReservationIndex.Store(*input.LaunchTemplateName, *crs.CapacityReservationTarget.CapacityReservationId)
e.launchTemplatesToCapacityReservations.Store(*input.LaunchTemplateName, *crs.CapacityReservationTarget.CapacityReservationId)
}
return &ec2.CreateLaunchTemplateOutput{LaunchTemplate: lo.ToPtr(launchTemplate)}, nil
})
Expand Down
25 changes: 20 additions & 5 deletions pkg/providers/capacityreservation/suite_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package capacityreservation_test

import (
Expand All @@ -6,14 +20,15 @@ import (

"github.com/aws/aws-sdk-go-v2/service/ec2"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
"github.com/samber/lo"
coreoptions "sigs.k8s.io/karpenter/pkg/operator/options"
coretest "sigs.k8s.io/karpenter/pkg/test"

"github.com/aws/karpenter-provider-aws/pkg/apis"
v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1"
"github.com/aws/karpenter-provider-aws/pkg/operator/options"
"github.com/aws/karpenter-provider-aws/pkg/test"
"github.com/aws/karpenter-provider-aws/pkg/utils"
"github.com/samber/lo"
coreoptions "sigs.k8s.io/karpenter/pkg/operator/options"
coretest "sigs.k8s.io/karpenter/pkg/test"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -58,7 +73,7 @@ var _ = Describe("Capacity Reservation Provider", func() {
InstanceMatchCriteria: ec2types.InstanceMatchCriteriaTargeted,
CapacityReservationId: lo.ToPtr("cr-m5.large-1a-1"),
AvailableInstanceCount: lo.ToPtr[int32](10),
Tags: utils.MergeTags(discoveryTags),
Tags: utils.MergeTags(discoveryTags),
State: ec2types.CapacityReservationStateActive,
},
{
Expand All @@ -68,7 +83,7 @@ var _ = Describe("Capacity Reservation Provider", func() {
InstanceMatchCriteria: ec2types.InstanceMatchCriteriaTargeted,
CapacityReservationId: lo.ToPtr("cr-m5.large-1a-2"),
AvailableInstanceCount: lo.ToPtr[int32](15),
Tags: utils.MergeTags(discoveryTags),
Tags: utils.MergeTags(discoveryTags),
State: ec2types.CapacityReservationStateActive,
},
}
Expand Down
Loading

0 comments on commit 9d6c1b0

Please sign in to comment.