From 18abbbe46702051060bba2954ef3edf28016bc0b Mon Sep 17 00:00:00 2001 From: Fatih Arslan Date: Thu, 1 Aug 2019 13:32:28 +0300 Subject: [PATCH] cloudprovider: add DigitalOcean cloud provider --- .../cloudprovider/builder/builder_all.go | 5 +- .../builder/builder_digitalocean.go | 42 ++ .../cloudprovider/cloud_provider.go | 2 + .../cloudprovider/digitalocean/OWNERS | 6 + .../cloudprovider/digitalocean/README.md | 58 +++ .../digitalocean_cloud_provider.go | 193 +++++++++ .../digitalocean_cloud_provider_test.go | 220 ++++++++++ .../digitalocean/digitalocean_manager.go | 240 +++++++++++ .../digitalocean/digitalocean_manager_test.go | 316 +++++++++++++++ .../digitalocean/digitalocean_node_group.go | 281 +++++++++++++ .../digitalocean_node_group_test.go | 383 ++++++++++++++++++ hack/boilerplate/boilerplate.py | 1 + hack/verify-gofmt.sh | 1 + hack/verify-golint.sh | 2 +- hack/verify-spelling.sh | 2 +- 15 files changed, 1749 insertions(+), 3 deletions(-) create mode 100644 cluster-autoscaler/cloudprovider/builder/builder_digitalocean.go create mode 100644 cluster-autoscaler/cloudprovider/digitalocean/OWNERS create mode 100644 cluster-autoscaler/cloudprovider/digitalocean/README.md create mode 100644 cluster-autoscaler/cloudprovider/digitalocean/digitalocean_cloud_provider.go create mode 100644 cluster-autoscaler/cloudprovider/digitalocean/digitalocean_cloud_provider_test.go create mode 100644 cluster-autoscaler/cloudprovider/digitalocean/digitalocean_manager.go create mode 100644 cluster-autoscaler/cloudprovider/digitalocean/digitalocean_manager_test.go create mode 100644 cluster-autoscaler/cloudprovider/digitalocean/digitalocean_node_group.go create mode 100644 cluster-autoscaler/cloudprovider/digitalocean/digitalocean_node_group_test.go diff --git a/cluster-autoscaler/cloudprovider/builder/builder_all.go b/cluster-autoscaler/cloudprovider/builder/builder_all.go index 4846c817b270..52eacc033936 100644 --- a/cluster-autoscaler/cloudprovider/builder/builder_all.go +++ b/cluster-autoscaler/cloudprovider/builder/builder_all.go @@ -1,4 +1,4 @@ -// +build !gce,!aws,!azure,!kubemark,!alicloud,!magnum +// +build !gce,!aws,!azure,!kubemark,!alicloud,!magnum,!digitalocean /* Copyright 2018 The Kubernetes Authors. @@ -24,6 +24,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/azure" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/baiducloud" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/digitalocean" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/gce" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/magnum" "k8s.io/autoscaler/cluster-autoscaler/config" @@ -54,6 +55,8 @@ func buildCloudProvider(opts config.AutoscalingOptions, do cloudprovider.NodeGro return alicloud.BuildAlicloud(opts, do, rl) case cloudprovider.BaiducloudProviderName: return baiducloud.BuildBaiducloud(opts, do, rl) + case cloudprovider.DigitalOceanProviderName: + return digitalocean.BuildDigitalOcean(opts, do, rl) case cloudprovider.MagnumProviderName: return magnum.BuildMagnum(opts, do, rl) } diff --git a/cluster-autoscaler/cloudprovider/builder/builder_digitalocean.go b/cluster-autoscaler/cloudprovider/builder/builder_digitalocean.go new file mode 100644 index 000000000000..78156f94c262 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/builder/builder_digitalocean.go @@ -0,0 +1,42 @@ +// +build digitalocean + +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package builder + +import ( + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/digitalocean" + "k8s.io/autoscaler/cluster-autoscaler/config" +) + +// AvailableCloudProviders supported by the digtalocean cloud provider builder. +var AvailableCloudProviders = []string{ + digitalocean.ProviderName, +} + +// DefaultCloudProvider for do-only build is DigitalOcean. +const DefaultCloudProvider = digitalocean.ProviderName + +func buildCloudProvider(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider { + switch opts.CloudProviderName { + case digitalocean.ProviderName: + return digitalocean.BuildDigitalOcean(opts, do, rl) + } + + return nil +} diff --git a/cluster-autoscaler/cloudprovider/cloud_provider.go b/cluster-autoscaler/cloudprovider/cloud_provider.go index b27c1739e84d..d9d50d04b098 100644 --- a/cluster-autoscaler/cloudprovider/cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/cloud_provider.go @@ -35,6 +35,8 @@ const ( AwsProviderName = "aws" // BaiducloudProviderName gets the provider name of baiducloud BaiducloudProviderName = "baiducloud" + // DigitalOceanProviderName gets the provider name of digitalocean + DigitalOceanProviderName = "digitalocean" // GceProviderName gets the provider name of gce GceProviderName = "gce" // MagnumProviderName gets the provider name of magnum diff --git a/cluster-autoscaler/cloudprovider/digitalocean/OWNERS b/cluster-autoscaler/cloudprovider/digitalocean/OWNERS new file mode 100644 index 000000000000..2524de50b4e4 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/digitalocean/OWNERS @@ -0,0 +1,6 @@ +approvers: +- andrewsykim +reviewers: +- andrewsykim + + diff --git a/cluster-autoscaler/cloudprovider/digitalocean/README.md b/cluster-autoscaler/cloudprovider/digitalocean/README.md new file mode 100644 index 000000000000..8be2e47414b9 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/digitalocean/README.md @@ -0,0 +1,58 @@ +# Cluster Autoscaler for DigitalOcean + +The cluster autoscaler for DigitalOcean scales worker nodes within any +specified DigitalOcean Kubernetes cluster's node pool. This is part of the DOKS +offering which can be enabled/disable dynamically for an existing cluster. + +# Configuration + +The `cluster-autoscaler` dynamically runs based on tags associated with node +pools. These are the current valid tags: + +``` +k8s-cluster-autoscaler-enabled:true +k8s-cluster-autoscaler-min:3 +k8s-cluster-autoscaler-max:10 +``` + +The syntax is in form of `key:value`. + +* If `k8s-cluster-autoscaler-enabled:true` is absent or + `k8s-cluster-autoscaler-enabled` is **not** set to `true`, the + `cluster-autoscaler` will not process the node pool by default. +* To set the minimum number of nodes to use `k8s-cluster-autoscaler-min` +* To set the maximum number of nodes to use `k8s-cluster-autoscaler-max` + + +If you don't set the minimum and maximum tags, node pools will have the +following default limits: + +``` +minimum number of nodes: 1 +maximum number of nodes: 200 +``` + +# Development + +Make sure you're inside the root path of the [autoscaler +repository](https://github.com/kubernetes/autoscaler) + +1.) Build the `cluster-autoscaler` binary: + + +``` +make build-binary +``` + +2.) Build the docker image: + +``` +docker build -t digitalocean/cluster-autoscaler:dev . +``` + + +3.) Push the docker image to Docker hub: + +``` +docker push digitalocean/cluster-autoscaler:dev +``` diff --git a/cluster-autoscaler/cloudprovider/digitalocean/digitalocean_cloud_provider.go b/cluster-autoscaler/cloudprovider/digitalocean/digitalocean_cloud_provider.go new file mode 100644 index 000000000000..4e08857809fa --- /dev/null +++ b/cluster-autoscaler/cloudprovider/digitalocean/digitalocean_cloud_provider.go @@ -0,0 +1,193 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package digitalocean + +import ( + "io" + "os" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/klog" +) + +var _ cloudprovider.CloudProvider = (*digitaloceanCloudProvider)(nil) + +const ( + // GPULabel is the label added to nodes with GPU resource. + GPULabel = "cloud.digitalocean.com/gpu-node" +) + +// digitaloceanCloudProvider implements CloudProvider interface. +type digitaloceanCloudProvider struct { + manager *Manager + resourceLimiter *cloudprovider.ResourceLimiter +} + +func newDigitalOceanCloudProvider(manager *Manager, rl *cloudprovider.ResourceLimiter) (*digitaloceanCloudProvider, error) { + if err := manager.Refresh(); err != nil { + return nil, err + } + + return &digitaloceanCloudProvider{ + manager: manager, + resourceLimiter: rl, + }, nil +} + +// Name returns name of the cloud provider. +func (d *digitaloceanCloudProvider) Name() string { + return cloudprovider.DigitalOceanProviderName +} + +// NodeGroups returns all node groups configured for this cloud provider. +func (d *digitaloceanCloudProvider) NodeGroups() []cloudprovider.NodeGroup { + nodeGroups := make([]cloudprovider.NodeGroup, len(d.manager.nodeGroups)) + for i, ng := range d.manager.nodeGroups { + nodeGroups[i] = ng + } + return nodeGroups +} + +// NodeGroupForNode returns the node group for the given node, nil if the node +// should not be processed by cluster autoscaler, or non-nil error if such +// occurred. Must be implemented. +func (d *digitaloceanCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovider.NodeGroup, error) { + nodeID, ok := node.Labels[nodeIDLabel] + if !ok { + // CA creates fake node objects to represent upcoming VMs that haven't + // registered as nodes yet. They have node.Spec.ProviderID set. Use + // that as nodeID. + nodeID = node.Spec.ProviderID + } + + klog.V(5).Infof("checking nodegroup for node ID: %q", nodeID) + + // NOTE(arslan): the number of node groups per cluster is usually very + // small. So even though this looks like quadratic runtime, it's OK to + // proceed with this. + for _, group := range d.manager.nodeGroups { + klog.V(5).Infof("iterating over node group %q", group.Id()) + nodes, err := group.Nodes() + if err != nil { + return nil, err + } + + for _, node := range nodes { + klog.V(6).Infof("checking node have: %q want: %q", node.Id, nodeID) + if node.Id != nodeID { + continue + } + + return group, nil + } + } + + // there is no "ErrNotExist" error, so we have to return a nil error + return nil, nil +} + +// Pricing returns pricing model for this cloud provider or error if not +// available. Implementation optional. +func (d *digitaloceanCloudProvider) Pricing() (cloudprovider.PricingModel, errors.AutoscalerError) { + return nil, cloudprovider.ErrNotImplemented +} + +// GetAvailableMachineTypes get all machine types that can be requested from +// the cloud provider. Implementation optional. +func (d *digitaloceanCloudProvider) GetAvailableMachineTypes() ([]string, error) { + return []string{}, nil +} + +// NewNodeGroup builds a theoretical node group based on the node definition +// provided. The node group is not automatically created on the cloud provider +// side. The node group is not returned by NodeGroups() until it is created. +// Implementation optional. +func (d *digitaloceanCloudProvider) NewNodeGroup( + machineType string, + labels map[string]string, + systemLabels map[string]string, + taints []apiv1.Taint, + extraResources map[string]resource.Quantity, +) (cloudprovider.NodeGroup, error) { + return nil, cloudprovider.ErrNotImplemented +} + +// GetResourceLimiter returns struct containing limits (max, min) for +// resources (cores, memory etc.). +func (d *digitaloceanCloudProvider) GetResourceLimiter() (*cloudprovider.ResourceLimiter, error) { + return d.resourceLimiter, nil +} + +// GPULabel returns the label added to nodes with GPU resource. +func (d *digitaloceanCloudProvider) GPULabel() string { + return GPULabel +} + +// GetAvailableGPUTypes return all available GPU types cloud provider supports. +func (d *digitaloceanCloudProvider) GetAvailableGPUTypes() map[string]struct{} { + return nil +} + +// Cleanup cleans up open resources before the cloud provider is destroyed, +// i.e. go routines etc. +func (d *digitaloceanCloudProvider) Cleanup() error { + return nil +} + +// Refresh is called before every main loop and can be used to dynamically +// update cloud provider state. In particular the list of node groups returned +// by NodeGroups() can change as a result of CloudProvider.Refresh(). +func (d *digitaloceanCloudProvider) Refresh() error { + klog.V(4).Info("Refreshing node group cache") + return d.manager.Refresh() +} + +// BuildDigitalOcean builds the DigitalOcean cloud provider. +func BuildDigitalOcean( + opts config.AutoscalingOptions, + do cloudprovider.NodeGroupDiscoveryOptions, + rl *cloudprovider.ResourceLimiter, +) cloudprovider.CloudProvider { + var configFile io.ReadCloser + if opts.CloudConfig != "" { + var err error + configFile, err = os.Open(opts.CloudConfig) + if err != nil { + klog.Fatalf("Couldn't open cloud provider configuration %s: %#v", opts.CloudConfig, err) + } + defer configFile.Close() + } + + manager, err := newManager(configFile) + if err != nil { + klog.Fatalf("Failed to create DigitalOcean manager: %v", err) + } + + // the cloud provider automatically uses all node pools in DigitalOcean. + // This means we don't use the cloudprovider.NodeGroupDiscoveryOptions + // flags (which can be set via '--node-group-auto-discovery' or '-nodes') + provider, err := newDigitalOceanCloudProvider(manager, rl) + if err != nil { + klog.Fatalf("Failed to create DigitalOcean cloud provider: %v", err) + } + + return provider +} diff --git a/cluster-autoscaler/cloudprovider/digitalocean/digitalocean_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/digitalocean/digitalocean_cloud_provider_test.go new file mode 100644 index 000000000000..3084d29e7944 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/digitalocean/digitalocean_cloud_provider_test.go @@ -0,0 +1,220 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package digitalocean + +import ( + "bytes" + "context" + "testing" + + "github.com/stretchr/testify/assert" + + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/digitalocean/godo" +) + +func testCloudProvider(t *testing.T, client *doClientMock) *digitaloceanCloudProvider { + cfg := `{"cluster_id": "123456", "token": "123-123-123", "url": "https://api.digitalocean.com/v2", "version": "dev"}` + + manager, err := newManager(bytes.NewBufferString(cfg)) + assert.NoError(t, err) + rl := &cloudprovider.ResourceLimiter{} + + // fill the test provider with some example + if client == nil { + client = &doClientMock{} + ctx := context.Background() + + client.On("ListNodePools", ctx, manager.clusterID, nil).Return( + []*godo.KubernetesNodePool{ + { + ID: "1", + Nodes: []*godo.KubernetesNode{ + {ID: "1", Status: &godo.KubernetesNodeStatus{State: "running"}}, + {ID: "2", Status: &godo.KubernetesNodeStatus{State: "running"}}, + }, + Tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + }, + }, + { + ID: "2", + Nodes: []*godo.KubernetesNode{ + {ID: "3", Status: &godo.KubernetesNodeStatus{State: "deleting"}}, + {ID: "4", Status: &godo.KubernetesNodeStatus{State: "running"}}, + }, + Tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + }, + }, + { + ID: "3", + Nodes: []*godo.KubernetesNode{ + {ID: "5", Status: &godo.KubernetesNodeStatus{State: "provisioning"}}, + {ID: "6", Status: &godo.KubernetesNodeStatus{State: "running"}}, + }, + Tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + }, + }, + { + ID: "4", + Nodes: []*godo.KubernetesNode{ + {ID: "7", Status: &godo.KubernetesNodeStatus{State: "draining"}}, + {ID: "8", Status: &godo.KubernetesNodeStatus{State: "running"}}, + }, + Tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + }, + }, + }, + &godo.Response{}, + nil, + ).Once() + } + + manager.client = client + + provider, err := newDigitalOceanCloudProvider(manager, rl) + assert.NoError(t, err) + return provider + +} + +func TestNewDigitalOceanCloudProvider(t *testing.T) { + t.Run("success", func(t *testing.T) { + _ = testCloudProvider(t, nil) + }) +} + +func TestDigitalOceanCloudProvider_Name(t *testing.T) { + provider := testCloudProvider(t, nil) + + t.Run("success", func(t *testing.T) { + name := provider.Name() + assert.Equal(t, cloudprovider.DigitalOceanProviderName, name, "provider name doesn't match") + }) +} + +func TestDigitalOceanCloudProvider_NodeGroups(t *testing.T) { + provider := testCloudProvider(t, nil) + + t.Run("success", func(t *testing.T) { + nodes := provider.NodeGroups() + assert.Equal(t, len(nodes), 4, "number of nodes do not match") + }) + + t.Run("zero groups", func(t *testing.T) { + provider.manager.nodeGroups = []*NodeGroup{} + nodes := provider.NodeGroups() + assert.Equal(t, len(nodes), 0, "number of nodes do not match") + }) +} + +func TestDigitalOceanCloudProvider_NodeGroupForNode(t *testing.T) { + clusterID := "123456" + + t.Run("success", func(t *testing.T) { + client := &doClientMock{} + ctx := context.Background() + + client.On("ListNodePools", ctx, clusterID, nil).Return( + []*godo.KubernetesNodePool{ + { + ID: "1", + Nodes: []*godo.KubernetesNode{ + {ID: "2", Status: &godo.KubernetesNodeStatus{State: "deleting"}}, + {ID: "3", Status: &godo.KubernetesNodeStatus{State: "running"}}, + }, + Tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + }, + }, + { + ID: "2", + Nodes: []*godo.KubernetesNode{ + {ID: "4", Status: &godo.KubernetesNodeStatus{State: "provisioning"}}, + {ID: "5", Status: &godo.KubernetesNodeStatus{State: "draining"}}, + }, + Tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + }, + }, + }, + &godo.Response{}, + nil, + ).Once() + + provider := testCloudProvider(t, client) + + // let's get the nodeGroup for the node with ID 4 + node := &apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + nodeIDLabel: "4", + }, + }, + } + + nodeGroup, err := provider.NodeGroupForNode(node) + assert.NoError(t, err) + assert.NotNil(t, nodeGroup) + assert.Equal(t, nodeGroup.Id(), "2", "node group ID does not match") + }) + + t.Run("node does not exist", func(t *testing.T) { + client := &doClientMock{} + ctx := context.Background() + + client.On("ListNodePools", ctx, clusterID, nil).Return( + []*godo.KubernetesNodePool{ + { + ID: "1", + Nodes: []*godo.KubernetesNode{ + {ID: "2", Status: &godo.KubernetesNodeStatus{State: "deleting"}}, + {ID: "3", Status: &godo.KubernetesNodeStatus{State: "running"}}, + }, + }, + { + ID: "2", + Nodes: []*godo.KubernetesNode{ + {ID: "4", Status: &godo.KubernetesNodeStatus{State: "provisioning"}}, + {ID: "5", Status: &godo.KubernetesNodeStatus{State: "draining"}}, + }, + }, + }, + &godo.Response{}, + nil, + ).Once() + + provider := testCloudProvider(t, client) + + node := &apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + nodeIDLabel: "7", + }, + }, + } + + nodeGroup, err := provider.NodeGroupForNode(node) + assert.NoError(t, err) + assert.Nil(t, nodeGroup) + }) +} diff --git a/cluster-autoscaler/cloudprovider/digitalocean/digitalocean_manager.go b/cluster-autoscaler/cloudprovider/digitalocean/digitalocean_manager.go new file mode 100644 index 000000000000..87f5a1f09bf7 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/digitalocean/digitalocean_manager.go @@ -0,0 +1,240 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package digitalocean + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "strconv" + "strings" + + "golang.org/x/oauth2" + + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/digitalocean/godo" + "k8s.io/klog" +) + +const ( + tagPrefix = "k8s-cluster-autoscaler-" + tagEnabled = tagPrefix + "enabled:" + tagMin = tagPrefix + "min:" + tagMax = tagPrefix + "max:" +) + +var ( + version = "dev" +) + +type nodeGroupClient interface { + // ListNodePools lists all the node pools found in a Kubernetes cluster. + ListNodePools(ctx context.Context, clusterID string, opts *godo.ListOptions) ([]*godo.KubernetesNodePool, *godo.Response, error) + + // UpdateNodePool updates the details of an existing node pool. + UpdateNodePool(ctx context.Context, clusterID, poolID string, req *godo.KubernetesNodePoolUpdateRequest) (*godo.KubernetesNodePool, *godo.Response, error) + + // DeleteNode deletes a specific node in a node pool. + DeleteNode(ctx context.Context, clusterID, poolID, nodeID string, req *godo.KubernetesNodeDeleteRequest) (*godo.Response, error) +} + +// Manager handles DigitalOcean communication and data caching of +// node groups (node pools in DOKS) +type Manager struct { + client nodeGroupClient + clusterID string + nodeGroups []*NodeGroup +} + +// Config is the configuration of the DigitalOcean cloud provider +type Config struct { + // ClusterID is the id associated with the cluster where DigitalOcean + // Cluster Autoscaler is running. + ClusterID string `json:"cluster_id"` + + // Token is the User's Access Token associated with the cluster where + // DigitalOcean Cluster Autoscaler is running. + Token string `json:"token"` + + // URL points to DigitalOcean API. If empty, defaults to + // https://api.digitalocean.com/ + URL string `json:"url"` +} + +func newManager(configReader io.Reader) (*Manager, error) { + cfg := &Config{} + if configReader != nil { + body, err := ioutil.ReadAll(configReader) + if err != nil { + return nil, err + } + err = json.Unmarshal(body, cfg) + if err != nil { + return nil, err + } + } + + if cfg.Token == "" { + return nil, errors.New("access token is not provided") + } + if cfg.ClusterID == "" { + return nil, errors.New("cluster ID is not provided") + } + + tokenSource := oauth2.StaticTokenSource(&oauth2.Token{ + AccessToken: cfg.Token, + }) + oauthClient := oauth2.NewClient(context.Background(), tokenSource) + + opts := []godo.ClientOpt{} + if cfg.URL != "" { + opts = append(opts, godo.SetBaseURL(cfg.URL)) + } + + opts = append(opts, godo.SetUserAgent("cluster-autoscaler-digitalocean/"+version)) + + doClient, err := godo.New(oauthClient, opts...) + if err != nil { + return nil, fmt.Errorf("couldn't initialize DigitalOcean client: %s", err) + } + + m := &Manager{ + client: doClient.Kubernetes, + clusterID: cfg.ClusterID, + nodeGroups: make([]*NodeGroup, 0), + } + + return m, nil +} + +// Refresh refreshes the cache holding the nodegroups. This is called by the CA +// based on the `--scan-interval`. By default it's 10 seconds. +func (m *Manager) Refresh() error { + ctx := context.Background() + nodePools, _, err := m.client.ListNodePools(ctx, m.clusterID, nil) + if err != nil { + return err + } + + var group []*NodeGroup + for _, nodePool := range nodePools { + spec, err := parseTags(nodePool.Tags) + if err != nil { + // we should not return an error here, because one misconfigured + // node pool shouldn't bring down the whole cluster-autoscaler + klog.V(4).Infof("skipping misconfigured node pool: %q name: %s tags: %+v err: %s", + nodePool.ID, nodePool.Name, nodePool.Tags, err) + continue + } + + if !spec.enabled { + continue + } + + minSize := minNodePoolSize + if spec.min != 0 { + minSize = spec.min + } + + maxSize := maxNodePoolSize + if spec.max != 0 { + maxSize = spec.max + } + + klog.V(4).Infof("adding node pool: %q name: %s min: %d max: %d", + nodePool.ID, nodePool.Name, minSize, maxSize) + + group = append(group, &NodeGroup{ + id: nodePool.ID, + clusterID: m.clusterID, + client: m.client, + nodePool: nodePool, + minSize: minSize, + maxSize: maxSize, + }) + } + + if len(group) == 0 { + klog.V(4).Info("cluster-autoscaler is disabled. no node pools are configured") + } + + m.nodeGroups = group + return nil +} + +// nodeSpec defines a custom specification for a given node +type nodeSpec struct { + min int + max int + enabled bool +} + +// parseTags parses a list of tags from a DigitalOcean node pool +func parseTags(tags []string) (*nodeSpec, error) { + spec := &nodeSpec{} + + for _, tag := range tags { + if !strings.HasPrefix(tag, tagPrefix) { + continue + } + + splitted := strings.Split(strings.TrimPrefix(tag, tagPrefix), ":") + if len(splitted) != 2 { + return nil, fmt.Errorf("malformed tag: %q", tag) + } + + key, value := splitted[0], splitted[1] + + switch key { + case "enabled": + if value == "true" { + spec.enabled = true + } + case "min": + min, err := strconv.Atoi(value) + if err != nil { + return nil, fmt.Errorf("invalid minimum nodes: %q", value) + } + + if min <= 0 { + return nil, fmt.Errorf("minimum nodes: %d can't be set to zero or less", min) + } + + spec.min = min + case "max": + max, err := strconv.Atoi(value) + if err != nil { + return nil, fmt.Errorf("invalid maximum nodes: %q", value) + } + + if max <= 0 { + return nil, fmt.Errorf("maximum nodes: %d can't be set to zero or less", max) + } + + spec.max = max + } + } + + if spec.min != 0 && spec.max != 0 && spec.min > spec.max { + return nil, fmt.Errorf("minimum nodes: %d can't be higher than maximum nodes: %d", + spec.min, spec.max) + } + + return spec, nil +} diff --git a/cluster-autoscaler/cloudprovider/digitalocean/digitalocean_manager_test.go b/cluster-autoscaler/cloudprovider/digitalocean/digitalocean_manager_test.go new file mode 100644 index 000000000000..676738986e27 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/digitalocean/digitalocean_manager_test.go @@ -0,0 +1,316 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package digitalocean + +import ( + "bytes" + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/digitalocean/godo" +) + +func TestNewManager(t *testing.T) { + t.Run("success", func(t *testing.T) { + cfg := `{"cluster_id": "123456", "token": "123-123-123", "url": "https://api.digitalocean.com/v2", "version": "dev"}` + + manager, err := newManager(bytes.NewBufferString(cfg)) + assert.NoError(t, err) + assert.Equal(t, manager.clusterID, "123456", "cluster ID does not match") + }) + + t.Run("empty token", func(t *testing.T) { + cfg := `{"cluster_id": "123456", "token": "", "url": "https://api.digitalocean.com/v2", "version": "dev"}` + + _, err := newManager(bytes.NewBufferString(cfg)) + assert.EqualError(t, err, errors.New("access token is not provided").Error()) + }) + + t.Run("empty cluster ID", func(t *testing.T) { + cfg := `{"cluster_id": "", "token": "123-123-123", "url": "https://api.digitalocean.com/v2", "version": "dev"}` + + _, err := newManager(bytes.NewBufferString(cfg)) + assert.EqualError(t, err, errors.New("cluster ID is not provided").Error()) + }) +} + +func TestDigitalOceanManager_Refresh(t *testing.T) { + t.Run("success", func(t *testing.T) { + cfg := `{"cluster_id": "123456", "token": "123-123-123", "url": "https://api.digitalocean.com/v2", "version": "dev"}` + + manager, err := newManager(bytes.NewBufferString(cfg)) + assert.NoError(t, err) + + client := &doClientMock{} + ctx := context.Background() + + client.On("ListNodePools", ctx, manager.clusterID, nil).Return( + []*godo.KubernetesNodePool{ + { + ID: "1", + Nodes: []*godo.KubernetesNode{ + {ID: "1", Status: &godo.KubernetesNodeStatus{State: "running"}}, + {ID: "2", Status: &godo.KubernetesNodeStatus{State: "running"}}, + }, + Tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + }, + }, + { + ID: "2", + Nodes: []*godo.KubernetesNode{ + {ID: "3", Status: &godo.KubernetesNodeStatus{State: "deleting"}}, + {ID: "4", Status: &godo.KubernetesNodeStatus{State: "running"}}, + }, + Tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + }, + }, + { + ID: "3", + Nodes: []*godo.KubernetesNode{ + {ID: "5", Status: &godo.KubernetesNodeStatus{State: "provisioning"}}, + {ID: "6", Status: &godo.KubernetesNodeStatus{State: "running"}}, + }, + Tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + }, + }, + { + ID: "4", + Nodes: []*godo.KubernetesNode{ + {ID: "7", Status: &godo.KubernetesNodeStatus{State: "draining"}}, + {ID: "8", Status: &godo.KubernetesNodeStatus{State: "running"}}, + }, + Tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + }, + }, + }, + &godo.Response{}, + nil, + ).Once() + + manager.client = client + err = manager.Refresh() + assert.NoError(t, err) + assert.Equal(t, len(manager.nodeGroups), 4, "number of nodes do not match") + }) + +} + +func TestDigitalOceanManager_RefreshWithNodeSpec(t *testing.T) { + t.Run("success", func(t *testing.T) { + cfg := `{"cluster_id": "123456", "token": "123-123-123", "url": "https://api.digitalocean.com/v2", "version": "dev"}` + + manager, err := newManager(bytes.NewBufferString(cfg)) + assert.NoError(t, err) + + client := &doClientMock{} + ctx := context.Background() + + client.On("ListNodePools", ctx, manager.clusterID, nil).Return( + []*godo.KubernetesNodePool{ + { + ID: "1", + Nodes: []*godo.KubernetesNode{ + {ID: "1", Status: &godo.KubernetesNodeStatus{State: "running"}}, + {ID: "2", Status: &godo.KubernetesNodeStatus{State: "running"}}, + }, + Tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + "k8s-cluster-autoscaler-min:3", + "k8s-cluster-autoscaler-max:10", + }, + }, + { + ID: "2", + Nodes: []*godo.KubernetesNode{ + {ID: "3", Status: &godo.KubernetesNodeStatus{State: "running"}}, + {ID: "4", Status: &godo.KubernetesNodeStatus{State: "running"}}, + }, + Tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + "k8s-cluster-autoscaler-min:5", + "k8s-cluster-autoscaler-max:20", + }, + }, + { + // this node pool doesn't have any min and max tags, + // therefore this should get assigned the default minimum + // and maximum defaults + ID: "3", + Nodes: []*godo.KubernetesNode{ + {ID: "5", Status: &godo.KubernetesNodeStatus{State: "running"}}, + {ID: "6", Status: &godo.KubernetesNodeStatus{State: "running"}}, + }, + Tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + }, + }, + }, + &godo.Response{}, + nil, + ).Once() + + manager.client = client + err = manager.Refresh() + assert.NoError(t, err) + assert.Equal(t, len(manager.nodeGroups), 3, "number of nodes do not match") + + // first node group + assert.Equal(t, manager.nodeGroups[0].minSize, 3, "minimum node for first group does not match") + assert.Equal(t, manager.nodeGroups[0].maxSize, 10, "maximum node for first group does not match") + + // second node group + assert.Equal(t, manager.nodeGroups[1].minSize, 5, "minimum node for second group does not match") + assert.Equal(t, manager.nodeGroups[1].maxSize, 20, "maximum node for second group does not match") + + // third node group + assert.Equal(t, manager.nodeGroups[2].minSize, minNodePoolSize, "minimum node for third group should match the default") + assert.Equal(t, manager.nodeGroups[2].maxSize, maxNodePoolSize, "maximum node for third group should match the default") + }) +} + +func Test_parseTags(t *testing.T) { + cases := []struct { + name string + tags []string + want *nodeSpec + wantErr bool + }{ + { + name: "good config (single)", + tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + "k8s-cluster-autoscaler-min:3", + "k8s-cluster-autoscaler-max:10", + }, + want: &nodeSpec{ + min: 3, + max: 10, + enabled: true, + }, + }, + { + name: "good config (disabled)", + tags: []string{ + "k8s-cluster-autoscaler-min:3", + "k8s-cluster-autoscaler-max:10", + }, + want: &nodeSpec{ + min: 3, + max: 10, + }, + }, + { + name: "good config (disabled with no values)", + tags: []string{}, + want: &nodeSpec{}, + }, + { + name: "good config - empty tags", + tags: []string{""}, + want: &nodeSpec{}, + }, + { + name: "bad tags - malformed", + tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + "k8s-cluster-autoscaler-min=3", + "k8s-cluster-autoscaler-max=10", + }, + wantErr: true, + }, + { + name: "bad tags - no numerical min node size", + tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + "k8s-cluster-autoscaler-min:three", + "k8s-cluster-autoscaler-max:10", + }, + wantErr: true, + }, + { + name: "bad tags - no numerical max node size", + tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + "k8s-cluster-autoscaler-min:3", + "k8s-cluster-autoscaler-max:ten", + }, + wantErr: true, + }, + { + name: "bad tags - min is higher than max", + tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + "k8s-cluster-autoscaler-min:5", + "k8s-cluster-autoscaler-max:4", + }, + wantErr: true, + }, + { + name: "bad tags - max is set to zero", + tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + "k8s-cluster-autoscaler-min:5", + "k8s-cluster-autoscaler-max:0", + }, + wantErr: true, + }, + { + name: "bad tags - max is set to negative, no min", + tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + "k8s-cluster-autoscaler-max:-5", + }, + wantErr: true, + }, + { + // TODO(arslan): remove this once we support zero count node pools on our end + name: "bad tags - min is set to zero", + tags: []string{ + "k8s-cluster-autoscaler-enabled:true", + "k8s-cluster-autoscaler-min:0", + "k8s-cluster-autoscaler-max:5", + }, + wantErr: true, + }, + } + + for _, ts := range cases { + ts := ts + + t.Run(ts.name, func(t *testing.T) { + got, err := parseTags(ts.tags) + if ts.wantErr && err == nil { + assert.Error(t, err) + return + } + + if ts.wantErr { + return + } + + assert.NoError(t, err) + assert.Equal(t, ts.want, got, "\ngot: %#v\nwant: %#v", got, ts.want) + }) + } + +} diff --git a/cluster-autoscaler/cloudprovider/digitalocean/digitalocean_node_group.go b/cluster-autoscaler/cloudprovider/digitalocean/digitalocean_node_group.go new file mode 100644 index 000000000000..dc36bb021f29 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/digitalocean/digitalocean_node_group.go @@ -0,0 +1,281 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package digitalocean + +import ( + "context" + "errors" + "fmt" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/digitalocean/godo" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" +) + +const ( + // These are internal DO values, not publicly available and configurable at + // this point. + minNodePoolSize = 1 + maxNodePoolSize = 200 + + doksLabelNamespace = "doks.digitalocean.com" + nodeIDLabel = doksLabelNamespace + "/node-id" +) + +var ( + // ErrNodePoolNotExist is return if no node pool exists for a given cluster ID + ErrNodePoolNotExist = errors.New("node pool does not exist") +) + +// NodeGroup implements cloudprovider.NodeGroup interface. NodeGroup contains +// configuration info and functions to control a set of nodes that have the +// same capacity and set of labels. +type NodeGroup struct { + id string + clusterID string + client nodeGroupClient + nodePool *godo.KubernetesNodePool + + minSize int + maxSize int +} + +// MaxSize returns maximum size of the node group. +func (n *NodeGroup) MaxSize() int { + return n.maxSize +} + +// MinSize returns minimum size of the node group. +func (n *NodeGroup) MinSize() int { + return n.minSize +} + +// TargetSize returns the current target size of the node group. It is possible +// that the number of nodes in Kubernetes is different at the moment but should +// be equal to Size() once everything stabilizes (new nodes finish startup and +// registration or removed nodes are deleted completely). Implementation +// required. +func (n *NodeGroup) TargetSize() (int, error) { + return n.nodePool.Count, nil +} + +// IncreaseSize increases the size of the node group. To delete a node you need +// to explicitly name it and use DeleteNode. This function should wait until +// node group size is updated. Implementation required. +func (n *NodeGroup) IncreaseSize(delta int) error { + if delta <= 0 { + return fmt.Errorf("delta must be positive, have: %d", delta) + } + + targetSize := n.nodePool.Count + delta + + if targetSize >= n.MaxSize() { + return fmt.Errorf("size increase is too large. current: %d desired: %d max: %d", + n.nodePool.Count, targetSize, n.MaxSize()) + } + + req := &godo.KubernetesNodePoolUpdateRequest{ + Count: targetSize, + } + + ctx := context.Background() + updatedNodePool, _, err := n.client.UpdateNodePool(ctx, n.clusterID, n.id, req) + if err != nil { + return err + } + + if updatedNodePool.Count != targetSize { + return fmt.Errorf("couldn't increase size to %d (delta: %d). Current size is: %d", + targetSize, delta, updatedNodePool.Count) + } + + // update internal cache + n.nodePool.Count = targetSize + return nil +} + +// DeleteNodes deletes nodes from this node group (and also increasing the size +// of the node group with that). Error is returned either on failure or if the +// given node doesn't belong to this node group. This function should wait +// until node group size is updated. Implementation required. +func (n *NodeGroup) DeleteNodes(nodes []*apiv1.Node) error { + ctx := context.Background() + for _, node := range nodes { + nodeID, ok := node.Labels[nodeIDLabel] + if !ok { + // CA creates fake node objects to represent upcoming VMs that haven't + // registered as nodes yet. They have node.Spec.ProviderID set. Use + // that as nodeID. + nodeID = node.Spec.ProviderID + } + + _, err := n.client.DeleteNode(ctx, n.clusterID, n.id, nodeID, nil) + if err != nil { + return fmt.Errorf("deleting node failed for cluster: %q node pool: %q node: %q: %s", + n.clusterID, n.id, nodeID, err) + } + + // decrement the count by one after a successful delete + n.nodePool.Count-- + } + + return nil +} + +// DecreaseTargetSize decreases the target size of the node group. This function +// doesn't permit to delete any existing node and can be used only to reduce the +// request for new nodes that have not been yet fulfilled. Delta should be negative. +// It is assumed that cloud provider will not delete the existing nodes when there +// is an option to just decrease the target. Implementation required. +func (n *NodeGroup) DecreaseTargetSize(delta int) error { + if delta >= 0 { + return fmt.Errorf("delta must be negative, have: %d", delta) + } + + targetSize := n.nodePool.Count + delta + if targetSize <= n.MinSize() { + return fmt.Errorf("size decrease is too small. current: %d desired: %d min: %d", + n.nodePool.Count, targetSize, n.MinSize()) + } + + req := &godo.KubernetesNodePoolUpdateRequest{ + Count: targetSize, + } + + ctx := context.Background() + updatedNodePool, _, err := n.client.UpdateNodePool(ctx, n.clusterID, n.id, req) + if err != nil { + return err + } + + if updatedNodePool.Count != targetSize { + return fmt.Errorf("couldn't increase size to %d (delta: %d). Current size is: %d", + targetSize, delta, updatedNodePool.Count) + } + + // update internal cache + n.nodePool.Count = targetSize + return nil +} + +// Id returns an unique identifier of the node group. +func (n *NodeGroup) Id() string { + return n.id +} + +// Debug returns a string containing all information regarding this node group. +func (n *NodeGroup) Debug() string { + return fmt.Sprintf("cluster ID: %s (min:%d max:%d)", n.Id(), n.MinSize(), n.MaxSize()) +} + +// Nodes returns a list of all nodes that belong to this node group. It is +// required that Instance objects returned by this method have Id field set. +// Other fields are optional. +func (n *NodeGroup) Nodes() ([]cloudprovider.Instance, error) { + if n.nodePool == nil { + return nil, errors.New("node pool instance is not created") + } + + //TODO(arslan): after increasing a node pool, the number of nodes is not + //anymore equal to the cache here. We should return a placeholder node for + //that. As an example PR check this out: + //https://github.com/kubernetes/autoscaler/pull/2235 + return toInstances(n.nodePool.Nodes), nil +} + +// TemplateNodeInfo returns a schedulernodeinfo.NodeInfo structure of an empty +// (as if just started) node. This will be used in scale-up simulations to +// predict what would a new node look like if a node group was expanded. The +// returned NodeInfo is expected to have a fully populated Node object, with +// all of the labels, capacity and allocatable information as well as all pods +// that are started on the node by default, using manifest (most likely only +// kube-proxy). Implementation optional. +func (n *NodeGroup) TemplateNodeInfo() (*schedulernodeinfo.NodeInfo, error) { + return nil, cloudprovider.ErrNotImplemented +} + +// Exist checks if the node group really exists on the cloud provider side. +// Allows to tell the theoretical node group from the real one. Implementation +// required. +func (n *NodeGroup) Exist() bool { + return n.nodePool != nil +} + +// Create creates the node group on the cloud provider side. Implementation +// optional. +func (n *NodeGroup) Create() (cloudprovider.NodeGroup, error) { + return nil, cloudprovider.ErrNotImplemented +} + +// Delete deletes the node group on the cloud provider side. This will be +// executed only for autoprovisioned node groups, once their size drops to 0. +// Implementation optional. +func (n *NodeGroup) Delete() error { + return cloudprovider.ErrNotImplemented +} + +// Autoprovisioned returns true if the node group is autoprovisioned. An +// autoprovisioned group was created by CA and can be deleted when scaled to 0. +func (n *NodeGroup) Autoprovisioned() bool { + return false +} + +// toInstances converts a slice of *godo.KubernetesNode to +// cloudprovider.Instance +func toInstances(nodes []*godo.KubernetesNode) []cloudprovider.Instance { + instances := make([]cloudprovider.Instance, len(nodes)) + for i, nd := range nodes { + instances[i] = toInstance(nd) + } + return instances +} + +// toInstance converts the given *godo.KubernetesNode to a +// cloudprovider.Instance +func toInstance(node *godo.KubernetesNode) cloudprovider.Instance { + return cloudprovider.Instance{ + Id: node.ID, + Status: toInstanceStatus(node.Status), + } +} + +// toInstanceStatus converts the given *godo.KubernetesNodeStatus to a +// cloudprovider.InstanceStatus +func toInstanceStatus(nodeState *godo.KubernetesNodeStatus) *cloudprovider.InstanceStatus { + if nodeState == nil { + return nil + } + + st := &cloudprovider.InstanceStatus{} + switch nodeState.State { + case "provisioning": + st.State = cloudprovider.InstanceCreating + case "running": + st.State = cloudprovider.InstanceRunning + case "draining", "deleting": + st.State = cloudprovider.InstanceDeleting + default: + st.ErrorInfo = &cloudprovider.InstanceErrorInfo{ + ErrorClass: cloudprovider.OtherErrorClass, + ErrorCode: "no-code-digitalocean", + ErrorMessage: nodeState.Message, + } + } + + return st +} diff --git a/cluster-autoscaler/cloudprovider/digitalocean/digitalocean_node_group_test.go b/cluster-autoscaler/cloudprovider/digitalocean/digitalocean_node_group_test.go new file mode 100644 index 000000000000..605acbad3b66 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/digitalocean/digitalocean_node_group_test.go @@ -0,0 +1,383 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package digitalocean + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/digitalocean/godo" +) + +func TestNodeGroup_TargetSize(t *testing.T) { + t.Run("success", func(t *testing.T) { + numberOfNodes := 3 + + client := &doClientMock{} + ng := testNodeGroup(client, &godo.KubernetesNodePool{ + Count: numberOfNodes, + }) + + size, err := ng.TargetSize() + assert.NoError(t, err) + assert.Equal(t, numberOfNodes, size, "target size is not correct") + }) +} + +func TestNodeGroup_IncreaseSize(t *testing.T) { + ctx := context.Background() + + t.Run("success", func(t *testing.T) { + numberOfNodes := 3 + client := &doClientMock{} + ng := testNodeGroup(client, &godo.KubernetesNodePool{ + Count: numberOfNodes, + }) + + delta := 2 + + client.On("UpdateNodePool", + ctx, + ng.clusterID, + ng.id, + &godo.KubernetesNodePoolUpdateRequest{ + Count: numberOfNodes + delta, + }, + ).Return( + &godo.KubernetesNodePool{Count: numberOfNodes + delta}, + &godo.Response{}, + nil, + ).Once() + + err := ng.IncreaseSize(delta) + assert.NoError(t, err) + }) + + t.Run("negative increase", func(t *testing.T) { + numberOfNodes := 3 + delta := -1 + client := &doClientMock{} + ng := testNodeGroup(client, &godo.KubernetesNodePool{ + Count: numberOfNodes, + }) + err := ng.IncreaseSize(delta) + + exp := fmt.Errorf("delta must be positive, have: %d", delta) + assert.EqualError(t, err, exp.Error(), "size increase must be positive") + }) + + t.Run("zero increase", func(t *testing.T) { + numberOfNodes := 3 + delta := 0 + client := &doClientMock{} + ng := testNodeGroup(client, &godo.KubernetesNodePool{ + Count: numberOfNodes, + }) + + exp := fmt.Errorf("delta must be positive, have: %d", delta) + + err := ng.IncreaseSize(delta) + assert.EqualError(t, err, exp.Error(), "size increase must be positive") + }) + + t.Run("large increase above maximum", func(t *testing.T) { + numberOfNodes := 195 + delta := 10 + + client := &doClientMock{} + ng := testNodeGroup(client, &godo.KubernetesNodePool{ + Count: numberOfNodes, + }) + + exp := fmt.Errorf("size increase is too large. current: %d desired: %d max: %d", + numberOfNodes, numberOfNodes+delta, ng.MaxSize()) + + err := ng.IncreaseSize(delta) + assert.EqualError(t, err, exp.Error(), "size increase is too large") + }) +} + +func TestNodeGroup_DecreaseTargetSize(t *testing.T) { + ctx := context.Background() + + t.Run("success", func(t *testing.T) { + numberOfNodes := 5 + client := &doClientMock{} + ng := testNodeGroup(client, &godo.KubernetesNodePool{ + Count: numberOfNodes, + }) + + delta := -2 + + client.On("UpdateNodePool", + ctx, + ng.clusterID, + ng.id, + &godo.KubernetesNodePoolUpdateRequest{ + Count: numberOfNodes + delta, + }, + ).Return( + &godo.KubernetesNodePool{Count: numberOfNodes + delta}, + &godo.Response{}, + nil, + ).Once() + + err := ng.DecreaseTargetSize(delta) + assert.NoError(t, err) + }) + + t.Run("positive decrease", func(t *testing.T) { + numberOfNodes := 5 + client := &doClientMock{} + ng := testNodeGroup(client, &godo.KubernetesNodePool{ + Count: numberOfNodes, + }) + + delta := 1 + err := ng.DecreaseTargetSize(delta) + + exp := fmt.Errorf("delta must be negative, have: %d", delta) + assert.EqualError(t, err, exp.Error(), "size decrease must be negative") + }) + + t.Run("zero decrease", func(t *testing.T) { + numberOfNodes := 5 + client := &doClientMock{} + ng := testNodeGroup(client, &godo.KubernetesNodePool{ + Count: numberOfNodes, + }) + + delta := 0 + exp := fmt.Errorf("delta must be negative, have: %d", delta) + + err := ng.DecreaseTargetSize(delta) + assert.EqualError(t, err, exp.Error(), "size decrease must be negative") + }) + + t.Run("small decrease below minimum", func(t *testing.T) { + delta := -2 + numberOfNodes := 3 + client := &doClientMock{} + ng := testNodeGroup(client, &godo.KubernetesNodePool{ + Count: numberOfNodes, + }) + + exp := fmt.Errorf("size decrease is too small. current: %d desired: %d min: %d", + numberOfNodes, numberOfNodes+delta, ng.MinSize()) + err := ng.DecreaseTargetSize(delta) + assert.EqualError(t, err, exp.Error(), "size decrease is too small") + }) +} + +func TestNodeGroup_DeleteNodes(t *testing.T) { + ctx := context.Background() + + t.Run("success", func(t *testing.T) { + client := &doClientMock{} + ng := testNodeGroup(client, &godo.KubernetesNodePool{ + Count: 3, + }) + + nodes := []*apiv1.Node{ + {ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{nodeIDLabel: "1"}}}, + {ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{nodeIDLabel: "2"}}}, + {ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{nodeIDLabel: "3"}}}, + } + + // this should be called three times (the number of nodes) + client.On("DeleteNode", ctx, ng.clusterID, ng.id, "1", nil).Return(&godo.Response{}, nil).Once() + client.On("DeleteNode", ctx, ng.clusterID, ng.id, "2", nil).Return(&godo.Response{}, nil).Once() + client.On("DeleteNode", ctx, ng.clusterID, ng.id, "3", nil).Return(&godo.Response{}, nil).Once() + + err := ng.DeleteNodes(nodes) + assert.NoError(t, err) + }) + + t.Run("client deleting node fails", func(t *testing.T) { + client := &doClientMock{} + ng := testNodeGroup(client, &godo.KubernetesNodePool{Count: 3}) + + nodes := []*apiv1.Node{ + {ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{nodeIDLabel: "1"}}}, + {ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{nodeIDLabel: "2"}}}, + {ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{nodeIDLabel: "3"}}}, + } + + // client is called twice, first run is successfully but the second one + // fails with a random error. In this case DeleteNodes() should return + // immediately. + client.On("DeleteNode", ctx, ng.clusterID, ng.id, "1", nil). + Return(&godo.Response{}, nil).Once() + client.On("DeleteNode", ctx, ng.clusterID, ng.id, "2", nil). + Return(&godo.Response{}, errors.New("random error")).Once() + + err := ng.DeleteNodes(nodes) + assert.Error(t, err) + }) +} + +func TestNodeGroup_Nodes(t *testing.T) { + t.Run("success", func(t *testing.T) { + client := &doClientMock{} + ng := testNodeGroup(client, &godo.KubernetesNodePool{ + Nodes: []*godo.KubernetesNode{ + { + ID: "1", + Status: &godo.KubernetesNodeStatus{ + State: "provisioning", + }, + }, + { + ID: "2", + Status: &godo.KubernetesNodeStatus{ + State: "running", + }, + }, + { + ID: "3", + Status: &godo.KubernetesNodeStatus{ + State: "deleting", + }, + }, + { + ID: "4", + Status: &godo.KubernetesNodeStatus{ + State: "unknown", + Message: "some-message", + }, + }, + { + // no status + ID: "5", + }, + }, + Count: 5, + }) + + exp := []cloudprovider.Instance{ + { + Id: "1", + Status: &cloudprovider.InstanceStatus{ + State: cloudprovider.InstanceCreating, + }, + }, + { + Id: "2", + Status: &cloudprovider.InstanceStatus{ + State: cloudprovider.InstanceRunning, + }, + }, + { + Id: "3", + Status: &cloudprovider.InstanceStatus{ + State: cloudprovider.InstanceDeleting, + }, + }, + { + Id: "4", + Status: &cloudprovider.InstanceStatus{ + ErrorInfo: &cloudprovider.InstanceErrorInfo{ + ErrorClass: cloudprovider.OtherErrorClass, + ErrorCode: "no-code-digitalocean", + ErrorMessage: "some-message", + }, + }, + }, + { + Id: "5", + }, + } + + nodes, err := ng.Nodes() + assert.NoError(t, err) + assert.Equal(t, exp, nodes, "nodes do not match") + }) + + t.Run("failure (nil node pool)", func(t *testing.T) { + client := &doClientMock{} + ng := testNodeGroup(client, nil) + + _, err := ng.Nodes() + assert.Error(t, err, "Nodes() should return an error") + }) +} + +func TestNodeGroup_Debug(t *testing.T) { + t.Run("success", func(t *testing.T) { + client := &doClientMock{} + ng := testNodeGroup(client, &godo.KubernetesNodePool{Count: 3}) + + d := ng.Debug() + exp := "cluster ID: 1 (min:1 max:200)" + assert.Equal(t, exp, d, "debug string do not match") + }) +} + +func TestNodeGroup_Exist(t *testing.T) { + t.Run("success", func(t *testing.T) { + client := &doClientMock{} + ng := testNodeGroup(client, &godo.KubernetesNodePool{Count: 3}) + + exist := ng.Exist() + assert.Equal(t, true, exist, "node pool should exist") + }) + + t.Run("failure", func(t *testing.T) { + client := &doClientMock{} + ng := testNodeGroup(client, nil) + + exist := ng.Exist() + assert.Equal(t, false, exist, "node pool should not exist") + }) +} + +func testNodeGroup(client nodeGroupClient, np *godo.KubernetesNodePool) *NodeGroup { + return &NodeGroup{ + id: "1", + clusterID: "1", + client: client, + nodePool: np, + minSize: minNodePoolSize, + maxSize: maxNodePoolSize, + } +} + +type doClientMock struct { + mock.Mock +} + +func (m *doClientMock) ListNodePools(ctx context.Context, clusterID string, opts *godo.ListOptions) ([]*godo.KubernetesNodePool, *godo.Response, error) { + args := m.Called(ctx, clusterID, nil) + return args.Get(0).([]*godo.KubernetesNodePool), args.Get(1).(*godo.Response), args.Error(2) +} + +func (m *doClientMock) UpdateNodePool(ctx context.Context, clusterID, poolID string, req *godo.KubernetesNodePoolUpdateRequest) (*godo.KubernetesNodePool, *godo.Response, error) { + args := m.Called(ctx, clusterID, poolID, req) + return args.Get(0).(*godo.KubernetesNodePool), args.Get(1).(*godo.Response), args.Error(2) +} + +func (m *doClientMock) DeleteNode(ctx context.Context, clusterID, poolID, nodeID string, req *godo.KubernetesNodeDeleteRequest) (*godo.Response, error) { + args := m.Called(ctx, clusterID, poolID, nodeID, nil) + return args.Get(0).(*godo.Response), args.Error(1) +} diff --git a/hack/boilerplate/boilerplate.py b/hack/boilerplate/boilerplate.py index 7ebaab9ed2ba..664a6b8100f0 100755 --- a/hack/boilerplate/boilerplate.py +++ b/hack/boilerplate/boilerplate.py @@ -148,6 +148,7 @@ def file_extension(filename): skipped_dirs = ['Godeps', 'third_party', '_gopath', '_output', '.git', 'cluster/env.sh', "vendor", "test/e2e/generated/bindata.go", "hack/boilerplate/test", "pkg/generated/bindata.go", + "cluster-autoscaler/cloudprovider/digitalocean/godo", "cluster-autoscaler/cloudprovider/magnum/gophercloud"] # list all the files contain 'DO NOT EDIT', but are not generated diff --git a/hack/verify-gofmt.sh b/hack/verify-gofmt.sh index afe554a74795..d585c7dbfb4e 100755 --- a/hack/verify-gofmt.sh +++ b/hack/verify-gofmt.sh @@ -35,6 +35,7 @@ find_files() { -o -wholename '*/Godeps/*' \ -o -wholename '*/vendor/*' \ -o -wholename './cluster-autoscaler/cloudprovider/magnum/gophercloud/*' \ + -o -wholename './cluster-autoscaler/cloudprovider/digitalocean/godo/*' \ \) -prune \ \) -name '*.go' } diff --git a/hack/verify-golint.sh b/hack/verify-golint.sh index 553db956a137..ca6ddc623e27 100755 --- a/hack/verify-golint.sh +++ b/hack/verify-golint.sh @@ -22,7 +22,7 @@ KUBE_ROOT=$(dirname "${BASH_SOURCE}")/.. cd "${KUBE_ROOT}" GOLINT=${GOLINT:-"golint"} -PACKAGES=($(go list ./... | grep -v /vendor/ | grep -v vertical-pod-autoscaler/pkg/client | grep -v vertical-pod-autoscaler/pkg/apis | grep -v cluster-autoscaler/cloudprovider/magnum/gophercloud)) +PACKAGES=($(go list ./... | grep -v /vendor/ | grep -v vertical-pod-autoscaler/pkg/client | grep -v vertical-pod-autoscaler/pkg/apis | grep -v cluster-autoscaler/cloudprovider/magnum/gophercloud | grep -v cluster-autoscaler/cloudprovider/digitalocean/godo)) bad_files=() for package in "${PACKAGES[@]}"; do out=$("${GOLINT}" -min_confidence=0.9 "${package}") diff --git a/hack/verify-spelling.sh b/hack/verify-spelling.sh index d867be58f1cd..40b06b438014 100755 --- a/hack/verify-spelling.sh +++ b/hack/verify-spelling.sh @@ -23,4 +23,4 @@ DIR=$(dirname $0) go install ${DIR}/../../../github.com/client9/misspell/cmd/misspell # Spell checking -git ls-files --full-name | grep -v -e vendor | grep -v cluster-autoscaler/cloudprovider/magnum/gophercloud | xargs misspell -error -o stderr +git ls-files --full-name | grep -v -e vendor | grep -v cluster-autoscaler/cloudprovider/magnum/gophercloud| grep -v cluster-autoscaler/cloudprovider/digitalocean/godo | xargs misspell -error -o stderr