diff --git a/Makefile b/Makefile index 0fcce58d5b9b..1d0bac23058b 100644 --- a/Makefile +++ b/Makefile @@ -17,8 +17,9 @@ HELM_OPTS ?= --set serviceAccount.annotations.eks\\.amazonaws\\.com/role-arn=${K --set controller.resources.requests.memory=1Gi \ --set controller.resources.limits.cpu=1 \ --set controller.resources.limits.memory=1Gi \ - --set settings.featureGates.spotToSpotConsolidation=true \ --set settings.featureGates.nodeRepair=true \ + --set settings.featureGates.reservedCapacity=true \ + --set settings.featureGates.spotToSpotConsolidation=true \ --create-namespace # CR for local builds of Karpenter diff --git a/charts/karpenter/templates/deployment.yaml b/charts/karpenter/templates/deployment.yaml index edd2aea0429a..0f2a69e89592 100644 --- a/charts/karpenter/templates/deployment.yaml +++ b/charts/karpenter/templates/deployment.yaml @@ -107,7 +107,7 @@ spec: divisor: "0" resource: limits.memory - name: FEATURE_GATES - value: "SpotToSpotConsolidation={{ .Values.settings.featureGates.spotToSpotConsolidation }},NodeRepair={{ .Values.settings.featureGates.nodeRepair }}" + value: "ReservedCapacity={{ .Values.settings.featureGates.reservedCapacity }},SpotToSpotConsolidation={{ .Values.settings.featureGates.spotToSpotConsolidation }},NodeRepair={{ .Values.settings.featureGates.nodeRepair }}" {{- with .Values.settings.batchMaxDuration }} - name: BATCH_MAX_DURATION value: "{{ . }}" diff --git a/charts/karpenter/values.yaml b/charts/karpenter/values.yaml index bc24a5852ee9..da1680e49beb 100644 --- a/charts/karpenter/values.yaml +++ b/charts/karpenter/values.yaml @@ -184,9 +184,12 @@ settings: # -- Feature Gate configuration values. Feature Gates will follow the same graduation process and requirements as feature gates # in Kubernetes. More information here https://kubernetes.io/docs/reference/command-line-tools-reference/feature-gates/#feature-gates-for-alpha-or-beta-features featureGates: - # -- spotToSpotConsolidation is ALPHA and is disabled by default. - # Setting this to true will enable spot replacement consolidation for both single and multi-node consolidation. - spotToSpotConsolidation: false # -- nodeRepair is ALPHA and is disabled by default. # Setting this to true will enable node repair. nodeRepair: false + # -- reservedCapacity is ALPHA and is disabled by default. + # Setting this will enable native on-demand capacity reservation support. + reservedCapacity: false + # -- spotToSpotConsolidation is ALPHA and is disabled by default. + # Setting this to true will enable spot replacement consolidation for both single and multi-node consolidation. + spotToSpotConsolidation: false diff --git a/cmd/controller/main.go b/cmd/controller/main.go index ea7df8a5fd6e..7761bdfebed1 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -35,6 +35,7 @@ func main() { op.GetClient(), op.AMIProvider, op.SecurityGroupProvider, + op.CapacityReservationProvider, ) cloudProvider := metrics.Decorate(awsCloudProvider) clusterState := state.NewCluster(op.Clock, op.GetClient(), cloudProvider) diff --git a/go.mod b/go.mod index a34dfda10ea6..8574120a5834 100644 --- a/go.mod +++ b/go.mod @@ -98,7 +98,7 @@ require ( github.com/rivo/uniseg v0.4.4 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/spf13/cobra v1.8.1 // indirect - github.com/spf13/pflag v1.0.5 // indirect + github.com/spf13/pflag v1.0.6 // indirect github.com/x448/float16 v0.8.4 // indirect golang.org/x/net v0.35.0 // indirect golang.org/x/oauth2 v0.23.0 // indirect @@ -106,16 +106,18 @@ require ( golang.org/x/term v0.29.0 // indirect golang.org/x/text v0.22.0 // indirect golang.org/x/time v0.10.0 // indirect - golang.org/x/tools v0.28.0 // indirect + golang.org/x/tools v0.30.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect - google.golang.org/protobuf v1.36.1 // indirect + google.golang.org/protobuf v1.36.4 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/cloud-provider v0.32.1 // indirect + k8s.io/cloud-provider v0.32.2 // indirect k8s.io/component-base v0.32.2 // indirect - k8s.io/csi-translation-lib v0.32.1 // indirect + k8s.io/csi-translation-lib v0.32.2 // indirect k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect ) + +replace sigs.k8s.io/karpenter => github.com/jmdeal/karpenter v0.0.0-20250221104820-4c25410338d8 diff --git a/go.sum b/go.sum index e114e20ff9ea..61189aeaed90 100644 --- a/go.sum +++ b/go.sum @@ -116,6 +116,8 @@ github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/jmdeal/karpenter v0.0.0-20250221104820-4c25410338d8 h1:K89kW02bTZkegQnJPlOHSTt+a7WXGQOfrt+pP7lBJos= +github.com/jmdeal/karpenter v0.0.0-20250221104820-4c25410338d8/go.mod h1:/FgjYrt+hwAMcvY46hku76st/aeP4KjOib6RLEj312g= github.com/jonathan-innis/aws-sdk-go-prometheus v0.1.1 h1:gmpuckrozJ3lfKqSIia9YMGh0caoQmEY7mQP5MsnbTM= github.com/jonathan-innis/aws-sdk-go-prometheus v0.1.1/go.mod h1:168XvZFghCqo32ISSWnTXwdlMKzEq+x9TqdfswCjkrQ= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= @@ -179,15 +181,16 @@ github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= -github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/samber/lo v1.49.1 h1:4BIFyVfuQSEpluc7Fua+j1NolZHiEHEpaSEKdsH0tew= github.com/samber/lo v1.49.1/go.mod h1:dO6KHFzUKXgP8LDhU0oI8d2hekjXnGOu0DB8Jecxd6o= github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= -github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= +github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -297,16 +300,16 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= -golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8= -golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw= +golang.org/x/tools v0.30.0 h1:BgcpHewrV5AUp2G9MebG4XPFI1E2W41zU1SaqVA9vJY= +golang.org/x/tools v0.30.0/go.mod h1:c347cR/OJfw5TI+GfX7RUPNMdDRRbjvYTS0jPyvsVtY= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gomodules.xyz/jsonpatch/v2 v2.4.0 h1:Ci3iUJyx9UeRx7CeFN8ARgGbkESwJK+KB9lLcWxY/Zw= gomodules.xyz/jsonpatch/v2 v2.4.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY= -google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= -google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM= +google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= @@ -325,12 +328,12 @@ k8s.io/apimachinery v0.32.2 h1:yoQBR9ZGkA6Rgmhbp/yuT9/g+4lxtsGYwW6dR6BDPLQ= k8s.io/apimachinery v0.32.2/go.mod h1:GpHVgxoKlTxClKcteaeuF1Ul/lDVb74KpZcxcmLDElE= k8s.io/client-go v0.32.2 h1:4dYCD4Nz+9RApM2b/3BtVvBHw54QjMFUl1OLcJG5yOA= k8s.io/client-go v0.32.2/go.mod h1:fpZ4oJXclZ3r2nDOv+Ux3XcJutfrwjKTCHz2H3sww94= -k8s.io/cloud-provider v0.32.1 h1:74rRhnfca3o4CsjjnIp/C3ARVuSmyNsxgWPtH0yc9Z0= -k8s.io/cloud-provider v0.32.1/go.mod h1:GECSanFT+EeZ/ToX3xlasjETzMUI+VFu92zHUDUsGHw= +k8s.io/cloud-provider v0.32.2 h1:8EC+fCYo0r0REczSjOZcVuQPCMxXxCKlgxDbYMrzC30= +k8s.io/cloud-provider v0.32.2/go.mod h1:2s8TeAXhVezp5VISaTxM6vW3yDonOZXoN4Aryz1p1PQ= k8s.io/component-base v0.32.2 h1:1aUL5Vdmu7qNo4ZsE+569PV5zFatM9hl+lb3dEea2zU= k8s.io/component-base v0.32.2/go.mod h1:PXJ61Vx9Lg+P5mS8TLd7bCIr+eMJRQTyXe8KvkrvJq0= -k8s.io/csi-translation-lib v0.32.1 h1:qqlB+eKiIdUM+GGZfJN/4FMNeuIPIELLxfWfv/LWUYk= -k8s.io/csi-translation-lib v0.32.1/go.mod h1:dc7zXqpUW4FykfAe6TqU32tYewsGhrjI63ZwJWQng3k= +k8s.io/csi-translation-lib v0.32.2 h1:aLzAyaoJUc5rgtLi8Xd4No1tet6UpvUsGIgRoGnPSSE= +k8s.io/csi-translation-lib v0.32.2/go.mod h1:PlOKan6Vc0G6a+giQbm36plJ+E1LH+GPRLAVMQMSMcY= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f h1:GA7//TjRY9yWGy1poLzYYJJ4JRdzg3+O6e8I+e+8T5Y= @@ -341,8 +344,6 @@ sigs.k8s.io/controller-runtime v0.20.2 h1:/439OZVxoEc02psi1h4QO3bHzTgu49bb347Xp4 sigs.k8s.io/controller-runtime v0.20.2/go.mod h1:xg2XB0K5ShQzAgsoujxuKN4LNXR2LfwwHsPj7Iaw+XY= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 h1:/Rv+M11QRah1itp8VhT6HoVx1Ray9eB4DBr+K+/sCJ8= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8i4DhaJ65T6EruyzmoQqI2BVDo= -sigs.k8s.io/karpenter v1.2.1-0.20250212185021-45f73ec7a790 h1:FXm0rL9jchktDDEqJ9bGhNkpGzauYhXxroMzzvohAO8= -sigs.k8s.io/karpenter v1.2.1-0.20250212185021-45f73ec7a790/go.mod h1:R6cr2+SbbgXtKtiuyRFdZCbqWN2kNTduqshnQRoyOr8= sigs.k8s.io/structured-merge-diff/v4 v4.4.2 h1:MdmvkGuXi/8io6ixD5wud3vOLwc1rj0aNqRlpuvjmwA= sigs.k8s.io/structured-merge-diff/v4 v4.4.2/go.mod h1:N8f93tFZh9U6vpxwRArLiikrE5/2tiu1w1AGfACIGE4= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= diff --git a/hack/tools/allocatable_diff/main.go b/hack/tools/allocatable_diff/main.go index d686d9330740..385c5e78598f 100644 --- a/hack/tools/allocatable_diff/main.go +++ b/hack/tools/allocatable_diff/main.go @@ -78,6 +78,7 @@ func main() { op.GetClient(), op.AMIProvider, op.SecurityGroupProvider, + op.CapacityReservationProvider, ) instanceTypes := lo.Must(cloudProvider.GetInstanceTypes(ctx, nil)) diff --git a/pkg/apis/v1/ec2nodeclass_status.go b/pkg/apis/v1/ec2nodeclass_status.go index 532fd48f4341..28308900e2bc 100644 --- a/pkg/apis/v1/ec2nodeclass_status.go +++ b/pkg/apis/v1/ec2nodeclass_status.go @@ -74,7 +74,7 @@ type CapacityReservation struct { AvailabilityZone string `json:"availabilityZone"` // The last known available instance count for the capacity reservation. // +required - AvailableInstanceCount int `json:"availableInstanceCount" hash:"ignore"` + AvailableInstanceCount int `json:"availableInstanceCount,omitempty" hash:"ignore"` // The time at which the capacity reservation expires. Once expired, the reserved capacity is released and Karpenter // will no longer be able to launch instances into that reservation. // +optional diff --git a/pkg/cache/unavailableofferings.go b/pkg/cache/unavailableofferings.go index 8efd3a2e5c42..e9b10154de35 100644 --- a/pkg/cache/unavailableofferings.go +++ b/pkg/cache/unavailableofferings.go @@ -53,10 +53,6 @@ func (u *UnavailableOfferings) IsUnavailable(instanceType string, zone, capacity return found } -func (u *UnavailableOfferings) IsReservationUnavailable(reservationID string) bool { - return false -} - // MarkUnavailable communicates recently observed temporary capacity shortages in the provided offerings func (u *UnavailableOfferings) MarkUnavailable(ctx context.Context, unavailableReason, instanceType, zone, capacityType string) { // even if the key is already in the cache, we still need to call Set to extend the cached entry's TTL @@ -65,7 +61,8 @@ func (u *UnavailableOfferings) MarkUnavailable(ctx context.Context, unavailableR "instance-type", instanceType, "zone", zone, "capacity-type", capacityType, - "ttl", UnavailableOfferingsTTL).V(1).Info("removing offering from offerings") + "ttl", UnavailableOfferingsTTL, + ).V(1).Info("removing offering from offerings") u.cache.SetDefault(u.key(instanceType, zone, capacityType), struct{}{}) atomic.AddUint64(&u.SeqNum, 1) } @@ -76,7 +73,7 @@ func (u *UnavailableOfferings) MarkUnavailableForFleetErr(ctx context.Context, f u.MarkUnavailable(ctx, lo.FromPtr(fleetErr.ErrorCode), string(instanceType), zone, capacityType) } -func (u *UnavailableOfferings) Delete(instanceType, zone, capacityType string) { +func (u *UnavailableOfferings) DeleteOffering(instanceType, zone, capacityType string) { u.cache.Delete(u.key(instanceType, zone, capacityType)) } @@ -85,6 +82,6 @@ func (u *UnavailableOfferings) Flush() { } // key returns the cache key for all offerings in the cache -func (u *UnavailableOfferings) key(instanceType, zone, capacityType string) string { - return fmt.Sprintf("%s:%s:%s", capacityType, instanceType, zone) +func (*UnavailableOfferings) key(instanceType, zone, capacityType string) string { + return fmt.Sprintf("o:%s:%s:%s", capacityType, instanceType, zone) } diff --git a/pkg/cloudprovider/cloudprovider.go b/pkg/cloudprovider/cloudprovider.go index 6bb838cd67e5..11c32ce04a30 100644 --- a/pkg/cloudprovider/cloudprovider.go +++ b/pkg/cloudprovider/cloudprovider.go @@ -46,6 +46,7 @@ import ( cloudproviderevents "github.com/aws/karpenter-provider-aws/pkg/cloudprovider/events" "github.com/aws/karpenter-provider-aws/pkg/providers/amifamily" + "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" "github.com/aws/karpenter-provider-aws/pkg/providers/instance" "github.com/aws/karpenter-provider-aws/pkg/providers/instancetype" "github.com/aws/karpenter-provider-aws/pkg/providers/securitygroup" @@ -59,21 +60,30 @@ type CloudProvider struct { kubeClient client.Client recorder events.Recorder - instanceTypeProvider instancetype.Provider - instanceProvider instance.Provider - amiProvider amifamily.Provider - securityGroupProvider securitygroup.Provider + instanceTypeProvider instancetype.Provider + instanceProvider instance.Provider + amiProvider amifamily.Provider + securityGroupProvider securitygroup.Provider + capacityReservationProvider capacityreservation.Provider } -func New(instanceTypeProvider instancetype.Provider, instanceProvider instance.Provider, recorder events.Recorder, - kubeClient client.Client, amiProvider amifamily.Provider, securityGroupProvider securitygroup.Provider) *CloudProvider { +func New( + instanceTypeProvider instancetype.Provider, + instanceProvider instance.Provider, + recorder events.Recorder, + kubeClient client.Client, + amiProvider amifamily.Provider, + securityGroupProvider securitygroup.Provider, + capacityReservationProvider capacityreservation.Provider, +) *CloudProvider { return &CloudProvider{ - instanceTypeProvider: instanceTypeProvider, - instanceProvider: instanceProvider, - kubeClient: kubeClient, - amiProvider: amiProvider, - securityGroupProvider: securityGroupProvider, - recorder: recorder, + instanceTypeProvider: instanceTypeProvider, + instanceProvider: instanceProvider, + kubeClient: kubeClient, + amiProvider: amiProvider, + securityGroupProvider: securityGroupProvider, + capacityReservationProvider: capacityReservationProvider, + recorder: recorder, } } @@ -111,6 +121,9 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *karpv1.NodeClaim) if err != nil { return nil, fmt.Errorf("creating instance, %w", err) } + if instance.CapacityType == karpv1.CapacityTypeReserved { + c.capacityReservationProvider.MarkLaunched(instance.CapacityReservationID) + } instanceType, _ := lo.Find(instanceTypes, func(i *cloudprovider.InstanceType) bool { return i.Name == string(instance.Type) }) @@ -399,6 +412,9 @@ func (c *CloudProvider) instanceToNodeClaim(i *instance.Instance, instanceType * } } labels[karpv1.CapacityTypeLabelKey] = i.CapacityType + if i.CapacityType == karpv1.CapacityTypeReserved { + labels[cloudprovider.ReservationIDLabel] = i.CapacityReservationID + } if v, ok := i.Tags[karpv1.NodePoolLabelKey]; ok { labels[karpv1.NodePoolLabelKey] = v } diff --git a/pkg/cloudprovider/suite_test.go b/pkg/cloudprovider/suite_test.go index 55fd0dc8059f..39d84ba1bb7a 100644 --- a/pkg/cloudprovider/suite_test.go +++ b/pkg/cloudprovider/suite_test.go @@ -88,7 +88,7 @@ var _ = BeforeSuite(func() { fakeClock = clock.NewFakeClock(time.Now()) recorder = events.NewRecorder(&record.FakeRecorder{}) cloudProvider = cloudprovider.New(awsEnv.InstanceTypesProvider, awsEnv.InstanceProvider, recorder, - env.Client, awsEnv.AMIProvider, awsEnv.SecurityGroupProvider) + env.Client, awsEnv.AMIProvider, awsEnv.SecurityGroupProvider, awsEnv.CapacityReservationProvider) cluster = state.NewCluster(fakeClock, env.Client, cloudProvider) prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster, fakeClock) }) diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 2c17f7013794..74d8058eab4c 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -34,7 +34,7 @@ import ( controllerspricing "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/pricing" ssminvalidation "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/ssm/invalidation" controllersversion "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/version" - "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" + capacityreservationprovider "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" "github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate" "github.com/aws/karpenter-provider-aws/pkg/providers/version" @@ -80,7 +80,7 @@ func NewControllers( launchTemplateProvider launchtemplate.Provider, versionProvider *version.DefaultProvider, instanceTypeProvider *instancetype.DefaultProvider, - capacityReservationProvider capacityreservation.Provider, + capacityReservationProvider capacityreservationprovider.Provider, ) []controller.Controller { controllers := []controller.Controller{ nodeclasshash.NewController(kubeClient), diff --git a/pkg/controllers/interruption/suite_test.go b/pkg/controllers/interruption/suite_test.go index 042131d03164..cb119a240ff5 100644 --- a/pkg/controllers/interruption/suite_test.go +++ b/pkg/controllers/interruption/suite_test.go @@ -91,7 +91,7 @@ var _ = BeforeSuite(func() { sqsapi = &fake.SQSAPI{} sqsProvider = lo.Must(sqs.NewDefaultProvider(sqsapi, fmt.Sprintf("https://sqs.%s.amazonaws.com/%s/test-cluster", fake.DefaultRegion, fake.DefaultAccount))) cloudProvider := cloudprovider.New(awsEnv.InstanceTypesProvider, awsEnv.InstanceProvider, events.NewRecorder(&record.FakeRecorder{}), - env.Client, awsEnv.AMIProvider, awsEnv.SecurityGroupProvider) + env.Client, awsEnv.AMIProvider, awsEnv.SecurityGroupProvider, awsEnv.CapacityReservationProvider) controller = interruption.NewController(env.Client, cloudProvider, fakeClock, events.NewRecorder(&record.FakeRecorder{}), sqsProvider, unavailableOfferingsCache) }) diff --git a/pkg/controllers/nodeclaim/garbagecollection/suite_test.go b/pkg/controllers/nodeclaim/garbagecollection/suite_test.go index db2b76bf6f2b..2ab7dca2c3f7 100644 --- a/pkg/controllers/nodeclaim/garbagecollection/suite_test.go +++ b/pkg/controllers/nodeclaim/garbagecollection/suite_test.go @@ -65,7 +65,7 @@ var _ = BeforeSuite(func() { env = coretest.NewEnvironment(coretest.WithCRDs(apis.CRDs...), coretest.WithCRDs(v1alpha1.CRDs...)) awsEnv = test.NewEnvironment(ctx, env) cloudProvider = cloudprovider.New(awsEnv.InstanceTypesProvider, awsEnv.InstanceProvider, events.NewRecorder(&record.FakeRecorder{}), - env.Client, awsEnv.AMIProvider, awsEnv.SecurityGroupProvider) + env.Client, awsEnv.AMIProvider, awsEnv.SecurityGroupProvider, awsEnv.CapacityReservationProvider) garbageCollectionController = garbagecollection.NewController(env.Client, cloudProvider) }) diff --git a/pkg/controllers/nodeclaim/tagging/suite_test.go b/pkg/controllers/nodeclaim/tagging/suite_test.go index 5f284108543f..627d1e6c8ddb 100644 --- a/pkg/controllers/nodeclaim/tagging/suite_test.go +++ b/pkg/controllers/nodeclaim/tagging/suite_test.go @@ -64,7 +64,7 @@ var _ = BeforeSuite(func() { ctx = options.ToContext(ctx, test.Options()) awsEnv = test.NewEnvironment(ctx, env) cloudProvider := cloudprovider.New(awsEnv.InstanceTypesProvider, awsEnv.InstanceProvider, events.NewRecorder(&record.FakeRecorder{}), - env.Client, awsEnv.AMIProvider, awsEnv.SecurityGroupProvider) + env.Client, awsEnv.AMIProvider, awsEnv.SecurityGroupProvider, awsEnv.CapacityReservationProvider) taggingController = tagging.NewController(env.Client, cloudProvider, awsEnv.InstanceProvider) }) var _ = AfterSuite(func() { diff --git a/pkg/controllers/nodeclass/capacityreservation.go b/pkg/controllers/nodeclass/capacityreservation.go index dac3c238ba36..dbcad52186aa 100644 --- a/pkg/controllers/nodeclass/capacityreservation.go +++ b/pkg/controllers/nodeclass/capacityreservation.go @@ -98,14 +98,14 @@ func capacityReservationFromEC2(cr *ec2types.CapacityReservation) (v1.CapacityRe } return v1.CapacityReservation{ - AvailabilityZone: *cr.AvailabilityZone, - AvailableInstanceCount: int(*cr.AvailableInstanceCount), - EndTime: endTime, - ID: *cr.CapacityReservationId, - InstanceMatchCriteria: string(cr.InstanceMatchCriteria), - InstanceType: *cr.InstanceType, - OwnerID: *cr.OwnerId, - TotalInstanceCount: int(*cr.TotalInstanceCount), + AvailabilityZone: *cr.AvailabilityZone, + // AvailableInstanceCount: int(*cr.AvailableInstanceCount), + EndTime: endTime, + ID: *cr.CapacityReservationId, + InstanceMatchCriteria: string(cr.InstanceMatchCriteria), + InstanceType: *cr.InstanceType, + OwnerID: *cr.OwnerId, + TotalInstanceCount: int(*cr.TotalInstanceCount), }, nil } diff --git a/pkg/controllers/providers/instancetype/capacity/suite_test.go b/pkg/controllers/providers/instancetype/capacity/suite_test.go index 246d1656d71d..b6a6e5f272d7 100644 --- a/pkg/controllers/providers/instancetype/capacity/suite_test.go +++ b/pkg/controllers/providers/instancetype/capacity/suite_test.go @@ -80,7 +80,7 @@ var _ = BeforeSuite(func() { nodeClaim = coretest.NodeClaim() node = coretest.Node() cloudProvider := cloudprovider.New(awsEnv.InstanceTypesProvider, awsEnv.InstanceProvider, events.NewRecorder(&record.FakeRecorder{}), - env.Client, awsEnv.AMIProvider, awsEnv.SecurityGroupProvider) + env.Client, awsEnv.AMIProvider, awsEnv.SecurityGroupProvider, awsEnv.CapacityReservationProvider) controller = controllersinstancetypecapacity.NewController(env.Client, cloudProvider, awsEnv.InstanceTypesProvider) }) diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index d3dfe9e1c8ea..22d57c569cc3 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -42,6 +42,8 @@ var ( "EntityAlreadyExists", ) + reservationCapacityExceededErrorCode = "ReservationCapacityExceeded" + // unfulfillableCapacityErrorCodes signify that capacity is temporarily unable to be launched unfulfillableCapacityErrorCodes = sets.New[string]( "InsufficientInstanceCapacity", @@ -50,6 +52,7 @@ var ( "UnfulfillableCapacity", "Unsupported", "InsufficientFreeAddressesInSubnet", + reservationCapacityExceededErrorCode, ) ) @@ -135,6 +138,10 @@ func IsUnfulfillableCapacity(err ec2types.CreateFleetError) bool { return unfulfillableCapacityErrorCodes.Has(*err.ErrorCode) } +func IsReservationCapacityExceeded(err ec2types.CreateFleetError) bool { + return *err.ErrorCode == reservationCapacityExceededErrorCode +} + func IsLaunchTemplateNotFound(err error) bool { if err == nil { return false diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 398183a7ae63..ac4dfc71d4de 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -23,6 +23,7 @@ import ( "net" "os" "strings" + "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws/middleware" @@ -183,6 +184,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont unavailableOfferingsCache, instancetype.NewDefaultResolver(cfg.Region), ) + capacityReservationProvider := capacityreservation.NewProvider(ec2api, operator.Clock, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), cache.New(time.Hour*24, awscache.DefaultCleanupInterval)) instanceProvider := instance.NewDefaultProvider( ctx, cfg.Region, @@ -190,8 +192,8 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont unavailableOfferingsCache, subnetProvider, launchTemplateProvider, + capacityReservationProvider, ) - capacityReservationProvider := capacityreservation.NewProvider(ec2api, operator.Clock, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)) // Setup field indexers on instanceID -- specifically for the interruption controller if options.FromContext(ctx).InterruptionQueue != "" { diff --git a/pkg/providers/amifamily/resolver.go b/pkg/providers/amifamily/resolver.go index 84d53eaa1f7b..e8a9e213558d 100644 --- a/pkg/providers/amifamily/resolver.go +++ b/pkg/providers/amifamily/resolver.go @@ -147,15 +147,6 @@ func (r DefaultResolver) Resolve(nodeClass *v1.EC2NodeClass, nodeClaim *karpv1.N reservationIDs string } paramsToInstanceTypes := lo.GroupBy(instanceTypes, func(it *cloudprovider.InstanceType) launchTemplateParams { - var reservationIDs []string - if capacityType == karpv1.CapacityTypeReserved { - for i := range it.Offerings { - if it.Offerings[i].Requirements.Get(karpv1.CapacityTypeLabelKey).Any() != karpv1.CapacityTypeReserved { - continue - } - reservationIDs = append(reservationIDs, it.Offerings[i].Requirements.Get(cloudprovider.ReservationIDLabel).Any()) - } - } return launchTemplateParams{ efaCount: lo.Ternary( lo.Contains(lo.Keys(nodeClaim.Spec.Resources.Requests), v1.ResourceEFA), @@ -166,7 +157,11 @@ func (r DefaultResolver) Resolve(nodeClass *v1.EC2NodeClass, nodeClaim *karpv1.N // If we're dealing with reserved instances, there's only going to be a single instance per group. This invariant // is due to reservation IDs not being shared across instance types. Because of this, we don't need to worry about // ordering in this string. - reservationIDs: strings.Join(reservationIDs, ","), + reservationIDs: lo.Ternary( + capacityType == karpv1.CapacityTypeReserved, + strings.Join(selectReservationIDs(it, nodeClaim), ","), + "", + ), } }) @@ -178,6 +173,25 @@ func (r DefaultResolver) Resolve(nodeClass *v1.EC2NodeClass, nodeClaim *karpv1.N return resolvedTemplates, nil } +// selectReservationIDs filters the set of reservation IDs available on the given instance type to only include those +// that are compatible with the given NodeClaim. Additionally, if there are multiple reservations available in the same +// zone, only the reservation with the greatest availability is selected. This is to address a limitation in the +// CreateFleet interface, where you can only provide one override for a given instance-zone combination. +func selectReservationIDs(it *cloudprovider.InstanceType, nodeClaim *karpv1.NodeClaim) []string { + zonalOfferings := map[string]*cloudprovider.Offering{} + for _, o := range it.Offerings.Available().Compatible(scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...)) { + if o.CapacityType() != karpv1.CapacityTypeReserved { + continue + } + if current, ok := zonalOfferings[o.Zone()]; !ok || current.ReservationCapacity < o.ReservationCapacity { + zonalOfferings[o.Zone()] = o + } + } + return lo.Map(lo.Values(zonalOfferings), func(o *cloudprovider.Offering, _ int) string { + return o.ReservationID() + }) +} + func GetAMIFamily(amiFamily string, options *Options) AMIFamily { switch amiFamily { case v1.AMIFamilyBottlerocket: diff --git a/pkg/providers/capacityreservation/provider.go b/pkg/providers/capacityreservation/provider.go index 753df300cf80..2a136debaaa3 100644 --- a/pkg/providers/capacityreservation/provider.go +++ b/pkg/providers/capacityreservation/provider.go @@ -17,7 +17,6 @@ package capacityreservation import ( "context" "fmt" - "sync" "github.com/aws/aws-sdk-go-v2/service/ec2" ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" @@ -32,48 +31,50 @@ import ( type Provider interface { List(context.Context, ...v1.CapacityReservationSelectorTerm) ([]*ec2types.CapacityReservation, error) + GetAvailableInstanceCount(string) int + MarkLaunched(string) + MarkTerminated(string) + MarkUnavailable(...string) } type DefaultProvider struct { - sync.RWMutex + availabilityCache - ec2api sdk.EC2API - clk clock.Clock - cache *cache.Cache - cm *pretty.ChangeMonitor + ec2api sdk.EC2API + clk clock.Clock + reservationCache *cache.Cache + cm *pretty.ChangeMonitor } -func NewProvider(ec2api sdk.EC2API, clk clock.Clock, cache *cache.Cache) *DefaultProvider { +func NewProvider(ec2api sdk.EC2API, clk clock.Clock, reservationCache, reservationAvailabilityCache *cache.Cache) *DefaultProvider { return &DefaultProvider{ - ec2api: ec2api, - clk: clk, - cache: cache, - cm: pretty.NewChangeMonitor(), + availabilityCache: availabilityCache{ + cache: reservationAvailabilityCache, + clk: clk, + }, + ec2api: ec2api, + clk: clk, + reservationCache: reservationCache, + cm: pretty.NewChangeMonitor(), } } func (p *DefaultProvider) List(ctx context.Context, selectorTerms ...v1.CapacityReservationSelectorTerm) ([]*ec2types.CapacityReservation, error) { queries := QueriesFromSelectorTerms(selectorTerms...) - reservations, remainingQueries := func() ([]*ec2types.CapacityReservation, []*Query) { - p.RLock() - defer p.RUnlock() - reservations := []*ec2types.CapacityReservation{} - remaining := []*Query{} - for _, query := range queries { - if value, ok := p.cache.Get(query.CacheKey()); ok { - reservations = append(reservations, value.([]*ec2types.CapacityReservation)...) - } else { - remaining = append(remaining, query) - } + + var reservations []*ec2types.CapacityReservation + var remainingQueries []*Query + for _, query := range queries { + if value, ok := p.reservationCache.Get(query.CacheKey()); ok { + reservations = append(reservations, value.([]*ec2types.CapacityReservation)...) + } else { + remainingQueries = append(remainingQueries, query) } - return reservations, remaining - }() + } if len(remainingQueries) == 0 { return p.filterReservations(reservations), nil } - p.Lock() - defer p.Unlock() for _, query := range remainingQueries { paginator := ec2.NewDescribeCapacityReservationsPaginator(p.ec2api, query.DescribeCapacityReservationsInput()) for paginator.HasMorePages() { @@ -82,10 +83,14 @@ func (p *DefaultProvider) List(ctx context.Context, selectorTerms ...v1.Capacity return nil, fmt.Errorf("listing capacity reservations, %w", err) } queryReservations := lo.ToSlicePtr(out.CapacityReservations) - p.cache.SetDefault(query.CacheKey(), queryReservations) + p.reservationCache.SetDefault(query.CacheKey(), queryReservations) reservations = append(reservations, queryReservations...) + p.syncAvailability(lo.SliceToMap(queryReservations, func(r *ec2types.CapacityReservation) (string, int) { + return *r.CapacityReservationId, int(*r.AvailableInstanceCount) + })) } } + return p.filterReservations(reservations), nil } diff --git a/pkg/providers/capacityreservation/types.go b/pkg/providers/capacityreservation/types.go index ae8b857e2d0d..bd970b0c33b4 100644 --- a/pkg/providers/capacityreservation/types.go +++ b/pkg/providers/capacityreservation/types.go @@ -16,11 +16,15 @@ package capacityreservation import ( "fmt" + "sync" + "time" "github.com/aws/aws-sdk-go-v2/service/ec2" ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" "github.com/mitchellh/hashstructure/v2" + "github.com/patrickmn/go-cache" "github.com/samber/lo" + "k8s.io/utils/clock" v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1" ) @@ -103,3 +107,79 @@ func (q *Query) tagsFilter() ([]ec2types.Filter, bool) { }), len(q.tags) != 0 } + +type availabilityCache struct { + mu sync.RWMutex + cache *cache.Cache + clk clock.Clock +} + +type availabilityCacheEntry struct { + count int + syncTime time.Time +} + +func (c *availabilityCache) syncAvailability(availability map[string]int) { + now := c.clk.Now() + c.mu.Lock() + defer c.mu.Unlock() + for id, count := range availability { + c.cache.SetDefault(id, &availabilityCacheEntry{ + count: count, + syncTime: now, + }) + } +} + +func (c *availabilityCache) MarkLaunched(reservationID string) { + now := c.clk.Now() + c.mu.Lock() + defer c.mu.Unlock() + entry, ok := c.cache.Get(reservationID) + if !ok { + return + } + // Only count the launch if it occurred before the last sync from EC2. In the worst case, this will lead to us + // overestimating availability if there's an eventual consistency delay with EC2, but we'd rather overestimate than + // underestimate. + if entry.(*availabilityCacheEntry).syncTime.After(now) { + return + } + + if entry.(*availabilityCacheEntry).count != 0 { + entry.(*availabilityCacheEntry).count -= 1 + } +} + +func (c *availabilityCache) MarkTerminated(reservationID string) { + // We don't do a time based comparison for CountTerminated because the reservation becomes available some time between + // the termination call and the instance state transitioning to terminated. This can be a pretty big gap, so a time + // based comparison would have limited value. In the worst case, this can result in us overestimating the available + // capacity, but we'd rather overestimate than underestimate. + c.mu.Lock() + defer c.mu.Unlock() + entry, ok := c.cache.Get(reservationID) + if !ok { + return + } + entry.(*availabilityCacheEntry).count += 1 +} + +func (c *availabilityCache) GetAvailableInstanceCount(reservationID string) int { + c.mu.RLock() + defer c.mu.RUnlock() + entry, ok := c.cache.Get(reservationID) + return lo.Ternary(ok, entry.(*availabilityCacheEntry).count, 0) +} + +func (c *availabilityCache) MarkUnavailable(reservationIDs ...string) { + c.mu.Lock() + defer c.mu.Unlock() + for _, id := range reservationIDs { + entry, ok := c.cache.Get(id) + if !ok { + continue + } + entry.(*availabilityCacheEntry).count = 0 + } +} diff --git a/pkg/providers/instance/instance.go b/pkg/providers/instance/instance.go index 3d66525ecb8e..8a52c8a56ee7 100644 --- a/pkg/providers/instance/instance.go +++ b/pkg/providers/instance/instance.go @@ -43,6 +43,7 @@ import ( "github.com/aws/karpenter-provider-aws/pkg/cache" awserrors "github.com/aws/karpenter-provider-aws/pkg/errors" "github.com/aws/karpenter-provider-aws/pkg/operator/options" + "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" "github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate" "github.com/aws/karpenter-provider-aws/pkg/providers/subnet" @@ -77,23 +78,32 @@ type Provider interface { } type DefaultProvider struct { - region string - ec2api sdk.EC2API - unavailableOfferings *cache.UnavailableOfferings - subnetProvider subnet.Provider - launchTemplateProvider launchtemplate.Provider - ec2Batcher *batcher.EC2API + region string + ec2api sdk.EC2API + unavailableOfferings *cache.UnavailableOfferings + subnetProvider subnet.Provider + launchTemplateProvider launchtemplate.Provider + ec2Batcher *batcher.EC2API + capacityReservationProvider capacityreservation.Provider } -func NewDefaultProvider(ctx context.Context, region string, ec2api sdk.EC2API, unavailableOfferings *cache.UnavailableOfferings, - subnetProvider subnet.Provider, launchTemplateProvider launchtemplate.Provider) *DefaultProvider { +func NewDefaultProvider( + ctx context.Context, + region string, + ec2api sdk.EC2API, + unavailableOfferings *cache.UnavailableOfferings, + subnetProvider subnet.Provider, + launchTemplateProvider launchtemplate.Provider, + capacityReservationProvider capacityreservation.Provider, +) *DefaultProvider { return &DefaultProvider{ - region: region, - ec2api: ec2api, - unavailableOfferings: unavailableOfferings, - subnetProvider: subnetProvider, - launchTemplateProvider: launchTemplateProvider, - ec2Batcher: batcher.EC2(ctx, ec2api), + region: region, + ec2api: ec2api, + unavailableOfferings: unavailableOfferings, + subnetProvider: subnetProvider, + launchTemplateProvider: launchTemplateProvider, + ec2Batcher: batcher.EC2(ctx, ec2api), + capacityReservationProvider: capacityReservationProvider, } } @@ -105,7 +115,9 @@ func (p *DefaultProvider) Create(ctx context.Context, nodeClass *v1.EC2NodeClass } // We filter out non-reserved instances regardless of the min-values settings, since if the launch is eligible for // reserved instances that's all we'll include in our fleet request. - instanceTypes = p.filterReservedInstanceTypes(nodeClaim, instanceTypes) + if reqs := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...); reqs.Get(karpv1.CapacityTypeLabelKey).Has(karpv1.CapacityTypeReserved) { + instanceTypes = p.filterReservedInstanceTypes(reqs, instanceTypes) + } instanceTypes, err := cloudprovider.InstanceTypes(instanceTypes).Truncate(schedulingRequirements, maxInstanceTypes) if err != nil { return nil, cloudprovider.NewCreateError(fmt.Errorf("truncating instance types, %w", err), "InstanceTypeResolutionFailed", "Error truncating instance types based on the passed-in requirements") @@ -119,8 +131,23 @@ func (p *DefaultProvider) Create(ctx context.Context, nodeClass *v1.EC2NodeClass if err != nil { return nil, err } - efaEnabled := lo.Contains(lo.Keys(nodeClaim.Spec.Resources.Requests), v1.ResourceEFA) - return NewInstanceFromFleet(fleetInstance, tags, efaEnabled), nil + + capacityType := p.getCapacityType(nodeClaim, instanceTypes) + var capacityReservation string + if capacityType == karpv1.CapacityTypeReserved { + capacityReservation = p.getCapacityReservationForInstance( + string(fleetInstance.InstanceType), + *fleetInstance.LaunchTemplateAndOverrides.Overrides.AvailabilityZone, + instanceTypes, + ) + } + return NewInstanceFromFleet( + fleetInstance, + tags, + capacityType, + capacityReservation, + lo.Contains(lo.Keys(nodeClaim.Spec.Resources.Requests), v1.ResourceEFA), + ), nil } func (p *DefaultProvider) Get(ctx context.Context, id string) (*Instance, error) { @@ -252,7 +279,7 @@ func (p *DefaultProvider) launchInstance(ctx context.Context, nodeClass *v1.EC2N } return ec2types.CreateFleetInstance{}, cloudprovider.NewCreateError(fmt.Errorf("creating fleet request, %w", err), reason, fmt.Sprintf("Error creating fleet request: %s", message)) } - p.updateUnavailableOfferingsCache(ctx, createFleetOutput.Errors, capacityType) + p.updateUnavailableOfferingsCache(ctx, createFleetOutput.Errors, capacityType, instanceTypes) if len(createFleetOutput.Instances) == 0 || len(createFleetOutput.Instances[0].InstanceIds) == 0 { return ec2types.CreateFleetInstance{}, combineFleetErrors(createFleetOutput.Errors) } @@ -265,12 +292,11 @@ func GetCreateFleetInput(nodeClass *v1.EC2NodeClass, capacityType string, tags m Context: nodeClass.Spec.Context, LaunchTemplateConfigs: launchTemplateConfigs, TargetCapacitySpecification: &ec2types.TargetCapacitySpecificationRequest{ - DefaultTargetCapacityType: func() ec2types.DefaultTargetCapacityType { - if capacityType == karpv1.CapacityTypeReserved { - return ec2types.DefaultTargetCapacityType(karpv1.CapacityTypeOnDemand) - } - return ec2types.DefaultTargetCapacityType(capacityType) - }(), + DefaultTargetCapacityType: lo.Ternary( + capacityType == karpv1.CapacityTypeReserved, + ec2types.DefaultTargetCapacityType(karpv1.CapacityTypeOnDemand), + ec2types.DefaultTargetCapacityType(capacityType), + ), TotalTargetCapacity: aws.Int32(1), }, TagSpecifications: []ec2types.TagSpecification{ @@ -319,7 +345,7 @@ func (p *DefaultProvider) getLaunchTemplateConfigs( requirements[karpv1.CapacityTypeLabelKey] = scheduling.NewRequirement(karpv1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, capacityType) for _, launchTemplate := range launchTemplates { launchTemplateConfig := ec2types.FleetLaunchTemplateConfigRequest{ - Overrides: p.getOverrides(launchTemplate.InstanceTypes, zonalSubnets, requirements, launchTemplate.ImageID), + Overrides: p.getOverrides(launchTemplate.InstanceTypes, zonalSubnets, requirements, launchTemplate.ImageID, launchTemplate.CapacityReservationID), LaunchTemplateSpecification: &ec2types.FleetLaunchTemplateSpecificationRequest{ LaunchTemplateName: aws.String(launchTemplate.Name), Version: aws.String("$Latest"), @@ -337,7 +363,7 @@ func (p *DefaultProvider) getLaunchTemplateConfigs( // getOverrides creates and returns launch template overrides for the cross product of InstanceTypes and subnets (with subnets being constrained by // zones and the offerings in InstanceTypes) -func (p *DefaultProvider) getOverrides(instanceTypes []*cloudprovider.InstanceType, zonalSubnets map[string]*subnet.Subnet, reqs scheduling.Requirements, image string) []ec2types.FleetLaunchTemplateOverridesRequest { +func (p *DefaultProvider) getOverrides(instanceTypes []*cloudprovider.InstanceType, zonalSubnets map[string]*subnet.Subnet, reqs scheduling.Requirements, image, capacityReservationID string) []ec2types.FleetLaunchTemplateOverridesRequest { // Unwrap all the offerings to a flat slice that includes a pointer // to the parent instance type name type offeringWithParentName struct { @@ -356,6 +382,9 @@ func (p *DefaultProvider) getOverrides(instanceTypes []*cloudprovider.InstanceTy } var overrides []ec2types.FleetLaunchTemplateOverridesRequest for _, offering := range unwrappedOfferings { + if capacityReservationID != "" && offering.ReservationID() != capacityReservationID { + continue + } if reqs.Compatible(offering.Requirements, scheduling.AllowUndefinedWellKnownLabels) != nil { continue } @@ -375,12 +404,53 @@ func (p *DefaultProvider) getOverrides(instanceTypes []*cloudprovider.InstanceTy return overrides } -func (p *DefaultProvider) updateUnavailableOfferingsCache(ctx context.Context, errors []ec2types.CreateFleetError, capacityType string) { - for _, err := range errors { - if awserrors.IsUnfulfillableCapacity(err) { - p.unavailableOfferings.MarkUnavailableForFleetErr(ctx, err, capacityType) +func (p *DefaultProvider) updateUnavailableOfferingsCache( + ctx context.Context, + errs []ec2types.CreateFleetError, + capacityType string, + instanceTypes []*cloudprovider.InstanceType, +) { + if capacityType != karpv1.CapacityTypeReserved { + for _, err := range errs { + if awserrors.IsUnfulfillableCapacity(err) { + p.unavailableOfferings.MarkUnavailableForFleetErr(ctx, err, capacityType) + } + } + return + } + + reservationIDs := make([]string, 0, len(errs)) + for i := range errs { + id := p.getCapacityReservationForInstance( + string(errs[i].LaunchTemplateAndOverrides.Overrides.InstanceType), + lo.FromPtr(errs[i].LaunchTemplateAndOverrides.Overrides.AvailabilityZone), + instanceTypes, + ) + reservationIDs = append(reservationIDs, id) + log.FromContext(ctx).WithValues( + "reason", lo.FromPtr(errs[i].ErrorCode), + "instance-type", errs[i].LaunchTemplateAndOverrides.Overrides.InstanceType, + "zone", lo.FromPtr(errs[i].LaunchTemplateAndOverrides.Overrides.AvailabilityZone), + "capacity-reservation-id", id, + ).V(1).Info("marking capacity reservation unavailable") + } + p.capacityReservationProvider.MarkUnavailable(reservationIDs...) +} + +func (p *DefaultProvider) getCapacityReservationForInstance(instance, zone string, instanceTypes []*cloudprovider.InstanceType) string { + for _, it := range instanceTypes { + if it.Name != instance { + continue + } + for _, o := range it.Offerings { + if o.CapacityType() != karpv1.CapacityTypeReserved || o.Zone() != zone { + continue + } + return o.ReservationID() } } + // note: this is an invariant that the caller must enforce, should not occur at runtime + panic("reservation ID doesn't exist for reserved launch") } // getCapacityType selects the capacity type based on the flexibility of the NodeClaim and the available offerings. @@ -404,19 +474,29 @@ func (p *DefaultProvider) getCapacityType(nodeClaim *karpv1.NodeClaim, instanceT // filterReservedInstanceTypes is used to filter the provided set of instance types to only include those with // available reserved offerings if the nodeclaim is compatible. If there are no available reserved offerings, no // filtering is applied. -func (*DefaultProvider) filterReservedInstanceTypes(nodeClaim *karpv1.NodeClaim, instanceTypes []*cloudprovider.InstanceType) []*cloudprovider.InstanceType { - requirements := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...) - if !requirements.Get(karpv1.CapacityTypeLabelKey).Has(karpv1.CapacityTypeReserved) { +func (*DefaultProvider) filterReservedInstanceTypes(nodeClaimRequirements scheduling.Requirements, instanceTypes []*cloudprovider.InstanceType) []*cloudprovider.InstanceType { + nodeClaimRequirements[karpv1.CapacityTypeLabelKey] = scheduling.NewRequirement(karpv1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, karpv1.CapacityTypeReserved) + var reservedInstanceTypes []*cloudprovider.InstanceType + for _, it := range instanceTypes { + // We only want to include a single offering per pool (instance type / AZ combo). This is due to a limitation in the + // CreateFleet API, which limits calls to specifying a single override per pool. We'll choose to launch into the pool + // with the most capacity. + zonalOfferings := map[string]*cloudprovider.Offering{} + for _, o := range it.Offerings.Available().Compatible(nodeClaimRequirements) { + if current, ok := zonalOfferings[o.Zone()]; !ok || o.ReservationCapacity > current.ReservationCapacity { + zonalOfferings[o.Zone()] = o + } + } + if len(zonalOfferings) == 0 { + continue + } + it.Offerings = lo.Values(zonalOfferings) + reservedInstanceTypes = append(reservedInstanceTypes, it) + } + if len(reservedInstanceTypes) == 0 { return instanceTypes } - // Constrain the NodeClaim's capacity type requirement to reserved before filtering for offering availability. If we - // don't perform this step, it's possible the only reserved instance available could have an incompatible reservation - // ID. - requirements[karpv1.CapacityTypeLabelKey] = scheduling.NewRequirement(karpv1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, karpv1.CapacityTypeReserved) - reservedInstances := lo.Filter(instanceTypes, func(it *cloudprovider.InstanceType, _ int) bool { - return len(it.Offerings.Available().Compatible(requirements)) != 0 - }) - return lo.Ternary(len(reservedInstances) != 0, reservedInstances, instanceTypes) + return reservedInstanceTypes } // filterInstanceTypes is used to provide filtering on the list of potential instance types to further limit it to those diff --git a/pkg/providers/instance/suite_test.go b/pkg/providers/instance/suite_test.go index 58b3ebdecf63..dd29355697fa 100644 --- a/pkg/providers/instance/suite_test.go +++ b/pkg/providers/instance/suite_test.go @@ -66,7 +66,7 @@ var _ = BeforeSuite(func() { ctx = options.ToContext(ctx, test.Options()) awsEnv = test.NewEnvironment(ctx, env) cloudProvider = cloudprovider.New(awsEnv.InstanceTypesProvider, awsEnv.InstanceProvider, events.NewRecorder(&record.FakeRecorder{}), - env.Client, awsEnv.AMIProvider, awsEnv.SecurityGroupProvider) + env.Client, awsEnv.AMIProvider, awsEnv.SecurityGroupProvider, awsEnv.CapacityReservationProvider) }) var _ = AfterSuite(func() { diff --git a/pkg/providers/instance/types.go b/pkg/providers/instance/types.go index 62fb8cf11408..f8ff74557255 100644 --- a/pkg/providers/instance/types.go +++ b/pkg/providers/instance/types.go @@ -17,7 +17,6 @@ package instance import ( "time" - "github.com/aws/aws-sdk-go-v2/aws" ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" "github.com/samber/lo" @@ -43,28 +42,21 @@ type Instance struct { func NewInstance(out ec2types.Instance) *Instance { return &Instance{ - LaunchTime: aws.ToTime(out.LaunchTime), + LaunchTime: lo.FromPtr(out.LaunchTime), State: out.State.Name, - ID: aws.ToString(out.InstanceId), - ImageID: aws.ToString(out.ImageId), + ID: lo.FromPtr(out.InstanceId), + ImageID: lo.FromPtr(out.ImageId), Type: out.InstanceType, - Zone: aws.ToString(out.Placement.AvailabilityZone), - CapacityType: func() string { - switch { - case out.SpotInstanceRequestId != nil: - return karpv1.CapacityTypeSpot - case out.CapacityReservationId != nil: - return karpv1.CapacityTypeReserved - default: - return karpv1.CapacityTypeOnDemand - } - }(), + Zone: lo.FromPtr(out.Placement.AvailabilityZone), + CapacityType: lo.If(out.SpotInstanceRequestId != nil, karpv1.CapacityTypeSpot). + ElseIf(out.CapacityReservationId != nil, karpv1.CapacityTypeReserved). + Else(karpv1.CapacityTypeOnDemand), CapacityReservationID: lo.FromPtr(out.CapacityReservationId), SecurityGroupIDs: lo.Map(out.SecurityGroups, func(securitygroup ec2types.GroupIdentifier, _ int) string { - return aws.ToString(securitygroup.GroupId) + return lo.FromPtr(securitygroup.GroupId) }), - SubnetID: aws.ToString(out.SubnetId), - Tags: lo.SliceToMap(out.Tags, func(t ec2types.Tag) (string, string) { return aws.ToString(t.Key), aws.ToString(t.Value) }), + SubnetID: lo.FromPtr(out.SubnetId), + Tags: lo.SliceToMap(out.Tags, func(t ec2types.Tag) (string, string) { return lo.FromPtr(t.Key), lo.FromPtr(t.Value) }), EFAEnabled: lo.ContainsBy(out.NetworkInterfaces, func(item ec2types.InstanceNetworkInterface) bool { return item.InterfaceType != nil && *item.InterfaceType == string(ec2types.NetworkInterfaceTypeEfa) }), @@ -72,17 +64,24 @@ func NewInstance(out ec2types.Instance) *Instance { } -func NewInstanceFromFleet(out ec2types.CreateFleetInstance, tags map[string]string, efaEnabled bool) *Instance { +func NewInstanceFromFleet( + out ec2types.CreateFleetInstance, + tags map[string]string, + capacityType string, + capacityReservationID string, + efaEnabled bool, +) *Instance { return &Instance{ - LaunchTime: time.Now(), // estimate the launch time since we just launched - State: ec2types.InstanceStateNamePending, - ID: out.InstanceIds[0], - ImageID: aws.ToString(out.LaunchTemplateAndOverrides.Overrides.ImageId), - Type: out.InstanceType, - Zone: aws.ToString(out.LaunchTemplateAndOverrides.Overrides.AvailabilityZone), - CapacityType: string(out.Lifecycle), - SubnetID: aws.ToString(out.LaunchTemplateAndOverrides.Overrides.SubnetId), - Tags: tags, - EFAEnabled: efaEnabled, + LaunchTime: time.Now(), // estimate the launch time since we just launched + State: ec2types.InstanceStateNamePending, + ID: out.InstanceIds[0], + ImageID: lo.FromPtr(out.LaunchTemplateAndOverrides.Overrides.ImageId), + Type: out.InstanceType, + Zone: lo.FromPtr(out.LaunchTemplateAndOverrides.Overrides.AvailabilityZone), + CapacityType: capacityType, + CapacityReservationID: capacityReservationID, + SubnetID: lo.FromPtr(out.LaunchTemplateAndOverrides.Overrides.SubnetId), + Tags: tags, + EFAEnabled: efaEnabled, } } diff --git a/pkg/providers/instancetype/offering/provider.go b/pkg/providers/instancetype/offering/provider.go index fd91cd4bcd5c..74f3211aeed6 100644 --- a/pkg/providers/instancetype/offering/provider.go +++ b/pkg/providers/instancetype/offering/provider.go @@ -28,6 +28,7 @@ import ( v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1" awscache "github.com/aws/karpenter-provider-aws/pkg/cache" + "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" "github.com/aws/karpenter-provider-aws/pkg/providers/pricing" ) @@ -36,8 +37,9 @@ type Provider interface { } type DefaultProvider struct { - unavailableOfferings *awscache.UnavailableOfferings - pricingProvider pricing.Provider + unavailableOfferings *awscache.UnavailableOfferings + pricingProvider pricing.Provider + capacityReservationProvider capacityreservation.Provider } func NewDefaultProvider(unavailableOfferingsCache *awscache.UnavailableOfferings, pricingProvider pricing.Provider) *DefaultProvider { @@ -96,10 +98,10 @@ func (p *DefaultProvider) createOfferings( ) cloudprovider.Offerings { itZones := sets.New(it.Requirements.Get(corev1.LabelTopologyZone).Values()...) - offerings := []*cloudprovider.Offering{} + var offerings []*cloudprovider.Offering for zone := range allZones { for _, capacityType := range it.Requirements.Get(karpv1.CapacityTypeLabelKey).Values() { - // Reserved capacity types are constructed separately, skip them for now. + // Reserved capacity types are constructed separately if capacityType == karpv1.CapacityTypeReserved { continue } @@ -138,7 +140,6 @@ func (p *DefaultProvider) createOfferings( } reservation := &nodeClass.Status.CapacityReservations[i] - isUnavailable := p.unavailableOfferings.IsReservationUnavailable(reservation.ID) _, hasSubnetZone := subnetZones[reservation.AvailabilityZone] price := 0.0 if odPrice, ok := p.pricingProvider.OnDemandPrice(ec2types.InstanceType(it.Name)); ok { @@ -148,6 +149,7 @@ func (p *DefaultProvider) createOfferings( // users to utilize the instances they're already paying for. price = odPrice / 10_000_000.0 } + reservationCapacity := p.capacityReservationProvider.GetAvailableInstanceCount(reservation.ID) offering := &cloudprovider.Offering{ Requirements: scheduling.NewRequirements( scheduling.NewRequirement(karpv1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, karpv1.CapacityTypeReserved), @@ -155,8 +157,8 @@ func (p *DefaultProvider) createOfferings( scheduling.NewRequirement(cloudprovider.ReservationIDLabel, corev1.NodeSelectorOpIn, reservation.ID), ), Price: price, - Available: !isUnavailable && itZones.Has(reservation.AvailabilityZone) && hasSubnetZone, - ReservationCapacity: reservation.AvailableInstanceCount, + Available: reservationCapacity != 0 && itZones.Has(reservation.AvailabilityZone) && hasSubnetZone, + ReservationCapacity: reservationCapacity, } if id, ok := subnetZones[reservation.AvailabilityZone]; ok { offering.Requirements.Add(scheduling.NewRequirement(v1.LabelTopologyZoneID, corev1.NodeSelectorOpIn, id)) diff --git a/pkg/providers/instancetype/suite_test.go b/pkg/providers/instancetype/suite_test.go index 77b2343c3cb1..db25bf6b21f6 100644 --- a/pkg/providers/instancetype/suite_test.go +++ b/pkg/providers/instancetype/suite_test.go @@ -88,7 +88,7 @@ var _ = BeforeSuite(func() { awsEnv = test.NewEnvironment(ctx, env) fakeClock = &clock.FakeClock{} cloudProvider = cloudprovider.New(awsEnv.InstanceTypesProvider, awsEnv.InstanceProvider, events.NewRecorder(&record.FakeRecorder{}), - env.Client, awsEnv.AMIProvider, awsEnv.SecurityGroupProvider) + env.Client, awsEnv.AMIProvider, awsEnv.SecurityGroupProvider, awsEnv.CapacityReservationProvider) cluster = state.NewCluster(fakeClock, env.Client, cloudProvider) prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster, fakeClock) }) @@ -2079,7 +2079,7 @@ var _ = Describe("InstanceTypeProvider", func() { ExpectNotScheduled(ctx, env.Client, pod) // capacity shortage is over - expire the item from the cache and try again awsEnv.EC2API.InsufficientCapacityPools.Set([]fake.CapacityPool{}) - awsEnv.UnavailableOfferingsCache.Delete("inf2.24xlarge", "test-zone-1a", karpv1.CapacityTypeOnDemand) + awsEnv.UnavailableOfferingsCache.DeleteOffering("inf2.24xlarge", "test-zone-1a", karpv1.CapacityTypeOnDemand) ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod) node := ExpectScheduled(ctx, env.Client, pod) Expect(node.Labels).To(HaveKeyWithValue(corev1.LabelInstanceTypeStable, "inf2.24xlarge")) diff --git a/pkg/providers/launchtemplate/launchtemplate.go b/pkg/providers/launchtemplate/launchtemplate.go index 49c8a72dc769..d07c37b09bb3 100644 --- a/pkg/providers/launchtemplate/launchtemplate.go +++ b/pkg/providers/launchtemplate/launchtemplate.go @@ -60,9 +60,10 @@ type Provider interface { ResolveClusterCIDR(context.Context) error } type LaunchTemplate struct { - Name string - InstanceTypes []*cloudprovider.InstanceType - ImageID string + Name string + InstanceTypes []*cloudprovider.InstanceType + ImageID string + CapacityReservationID string } type DefaultProvider struct { @@ -134,7 +135,12 @@ func (p *DefaultProvider) EnsureAll( if err != nil { return nil, err } - launchTemplates = append(launchTemplates, &LaunchTemplate{Name: *ec2LaunchTemplate.LaunchTemplateName, InstanceTypes: resolvedLaunchTemplate.InstanceTypes, ImageID: resolvedLaunchTemplate.AMIID}) + launchTemplates = append(launchTemplates, &LaunchTemplate{ + Name: *ec2LaunchTemplate.LaunchTemplateName, + InstanceTypes: resolvedLaunchTemplate.InstanceTypes, + ImageID: resolvedLaunchTemplate.AMIID, + CapacityReservationID: resolvedLaunchTemplate.CapacityReservationID, + }) } return launchTemplates, nil } diff --git a/pkg/providers/launchtemplate/suite_test.go b/pkg/providers/launchtemplate/suite_test.go index 596a2146eae8..cc08e153970f 100644 --- a/pkg/providers/launchtemplate/suite_test.go +++ b/pkg/providers/launchtemplate/suite_test.go @@ -99,7 +99,7 @@ var _ = BeforeSuite(func() { fakeClock = &clock.FakeClock{} recorder = events.NewRecorder(&record.FakeRecorder{}) cloudProvider = cloudprovider.New(awsEnv.InstanceTypesProvider, awsEnv.InstanceProvider, recorder, - env.Client, awsEnv.AMIProvider, awsEnv.SecurityGroupProvider) + env.Client, awsEnv.AMIProvider, awsEnv.SecurityGroupProvider, awsEnv.CapacityReservationProvider) cluster = state.NewCluster(fakeClock, env.Client, cloudProvider) prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster, fakeClock) }) diff --git a/pkg/test/environment.go b/pkg/test/environment.go index 357af1245341..b8f91f397b31 100644 --- a/pkg/test/environment.go +++ b/pkg/test/environment.go @@ -66,18 +66,19 @@ type Environment struct { PricingAPI *fake.PricingAPI // Cache - EC2Cache *cache.Cache - InstanceTypeCache *cache.Cache - UnavailableOfferingsCache *awscache.UnavailableOfferings - LaunchTemplateCache *cache.Cache - SubnetCache *cache.Cache - AvailableIPAdressCache *cache.Cache - AssociatePublicIPAddressCache *cache.Cache - SecurityGroupCache *cache.Cache - InstanceProfileCache *cache.Cache - SSMCache *cache.Cache - DiscoveredCapacityCache *cache.Cache - CapacityReservationCache *cache.Cache + EC2Cache *cache.Cache + InstanceTypeCache *cache.Cache + UnavailableOfferingsCache *awscache.UnavailableOfferings + LaunchTemplateCache *cache.Cache + SubnetCache *cache.Cache + AvailableIPAdressCache *cache.Cache + AssociatePublicIPAddressCache *cache.Cache + SecurityGroupCache *cache.Cache + InstanceProfileCache *cache.Cache + SSMCache *cache.Cache + DiscoveredCapacityCache *cache.Cache + CapacityReservationCache *cache.Cache + CapacityReservationAvailabilityCache *cache.Cache // Providers CapacityReservationProvider *capacityreservation.DefaultProvider @@ -117,6 +118,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment instanceProfileCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) ssmCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) capacityReservationCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) + capacityReservationAvailabilityCache := cache.New(24*time.Hour, awscache.DefaultCleanupInterval) fakePricingAPI := &fake.PricingAPI{} // Providers @@ -147,6 +149,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment net.ParseIP("10.0.100.10"), "https://test-cluster", ) + capacityReservationProvider := capacityreservation.NewProvider(ec2api, clock, capacityReservationCache, capacityReservationAvailabilityCache) instanceProvider := instance.NewDefaultProvider( ctx, "", @@ -154,8 +157,8 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment unavailableOfferingsCache, subnetProvider, launchTemplateProvider, + capacityReservationProvider, ) - capacityReservationProvider := capacityreservation.NewProvider(ec2api, clock, capacityReservationCache) return &Environment{ Clock: clock, @@ -166,18 +169,19 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment IAMAPI: iamapi, PricingAPI: fakePricingAPI, - EC2Cache: ec2Cache, - InstanceTypeCache: instanceTypeCache, - LaunchTemplateCache: launchTemplateCache, - SubnetCache: subnetCache, - AvailableIPAdressCache: availableIPAdressCache, - AssociatePublicIPAddressCache: associatePublicIPAddressCache, - SecurityGroupCache: securityGroupCache, - InstanceProfileCache: instanceProfileCache, - UnavailableOfferingsCache: unavailableOfferingsCache, - SSMCache: ssmCache, - DiscoveredCapacityCache: discoveredCapacityCache, - CapacityReservationCache: capacityReservationCache, + EC2Cache: ec2Cache, + InstanceTypeCache: instanceTypeCache, + LaunchTemplateCache: launchTemplateCache, + SubnetCache: subnetCache, + AvailableIPAdressCache: availableIPAdressCache, + AssociatePublicIPAddressCache: associatePublicIPAddressCache, + SecurityGroupCache: securityGroupCache, + InstanceProfileCache: instanceProfileCache, + UnavailableOfferingsCache: unavailableOfferingsCache, + SSMCache: ssmCache, + DiscoveredCapacityCache: discoveredCapacityCache, + CapacityReservationCache: capacityReservationCache, + CapacityReservationAvailabilityCache: capacityReservationAvailabilityCache, CapacityReservationProvider: capacityReservationProvider, InstanceTypesResolver: instanceTypesResolver,