Skip to content

Commit

Permalink
✨ Reimplement GCP connection draining (#45)
Browse files Browse the repository at this point in the history
* Reimplement GCP connection draining

* Move notes error

* Fix timeSinceToBeDeletedTaintAdded

* Remove unused code

* Reduce code

* Update Go and fix warnings

* Remove version
  • Loading branch information
dippynark authored Dec 20, 2024
1 parent cf9c545 commit b1cc3bc
Show file tree
Hide file tree
Showing 14 changed files with 138 additions and 177 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: '1.21'
go-version: '1.23.4'
cache: false
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.54
args: --timeout=10m
# https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go
test:
Expand All @@ -38,7 +37,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: '1.21'
go-version: '1.23.4'
- run: go mod download
- name: Verify generated code
run: make verify
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.21 as build
FROM golang:1.23.4 as build

WORKDIR /go/src/cost-manager

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/hsbc/cost-manager

go 1.21.3
go 1.23.4

require (
github.com/go-logr/logr v1.3.0
Expand Down
2 changes: 1 addition & 1 deletion hack/notes/notes.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func run(from, to string) error {
if to == "" {
to = "HEAD"
}
var err error
if from == "" {
var err error
from, err = previousTag(to)
if err != nil {
return err
Expand Down
7 changes: 3 additions & 4 deletions pkg/cloudprovider/cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ type CloudProvider interface {
IsSpotInstance(ctx context.Context, node *corev1.Node) (bool, error)
// DeleteInstance should drain connections from external load balancers to the Node and then
// delete the underlying instance. Implementations can assume that before this function is
// called the Node has already been modified to ensure that the KCCM service controller will
// eventually remove the Node from load balancing although this process may still be in progress
// when this function is called:
// https://kubernetes.io/docs/concepts/architecture/cloud-controller/#service-controller
// called Pods have already been drained from the Node and it has been tainted with
// ToBeDeletedByClusterAutoscaler to fail kube-proxy health checks as described in KEP-3836:
// https://github.com/kubernetes/enhancements/tree/27ef0d9a740ae5058472aac4763483f0e7218c0e/keps/sig-network/3836-kube-proxy-improved-ingress-connectivity-reliability
DeleteInstance(ctx context.Context, node *corev1.Node) error
}

Expand Down
142 changes: 39 additions & 103 deletions pkg/cloudprovider/gcp/cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package gcp
import (
"context"
"fmt"
"net/http"
"strings"
"strconv"
"time"

"github.com/hsbc/cost-manager/pkg/kubernetes"
"github.com/pkg/errors"
"google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
corev1 "k8s.io/api/core/v1"
)

Expand All @@ -18,13 +17,6 @@ const (
spotNodeLabelKey = "cloud.google.com/gke-spot"
// https://cloud.google.com/kubernetes-engine/docs/how-to/preemptible-vms#use_nodeselector_to_schedule_pods_on_preemptible_vms
preemptibleNodeLabelKey = "cloud.google.com/gke-preemptible"

// After kube-proxy starts failing its health check GCP load balancers should mark the instance
// as unhealthy within 24 seconds but we wait for slightly longer to give in-flight connections
// time to complete before we delete the underlying instance:
// https://github.com/kubernetes/ingress-gce/blob/2a08b1e4111a21c71455bbb2bcca13349bb6f4c0/pkg/healthchecksl4/healthchecksl4.go#L48
externalLoadBalancerConnectionDrainingPeriod = 30 * time.Second
externalLoadBalancerConnectionDrainingInterval = 5 * time.Second
)

type CloudProvider struct {
Expand All @@ -42,9 +34,18 @@ func NewCloudProvider(ctx context.Context) (*CloudProvider, error) {
}, nil
}

// DeleteInstance retrieves the underlying compute instance of the Kubernetes Node, drains any
// connections from GCP load balancers and then deletes it from its managed instance group
// DeleteInstance drains any connections from GCP load balancers, retrieves the underlying compute
// instance of the Kubernetes Node and then deletes it from its managed instance group
func (gcp *CloudProvider) DeleteInstance(ctx context.Context, node *corev1.Node) error {
// GCP Network Load Balancer health checks have an interval of 8 seconds with a timeout of 1
// second and an unhealthy threshold of 3 so we wait for 3 * 8 + 1 = 25 seconds for instances to
// be marked as unhealthy which triggers connection draining. We add an additional 30 seconds
// since this is the connection draining timeout used when GKE subsetting is enabled. We then
// add an additional 5 seconds to allow processing time for the various components involved
// (e.g. GCP probes and kube-proxy):
// https://github.com/kubernetes/ingress-gce/blob/2a08b1e4111a21c71455bbb2bcca13349bb6f4c0/pkg/healthchecksl4/healthchecksl4.go#L42
time.Sleep(time.Minute - timeSinceToBeDeletedTaintAdded(node, time.Now()))

// Retrieve instance details from the provider ID
project, zone, instanceName, err := parseProviderID(node.Spec.ProviderID)
if err != nil {
Expand Down Expand Up @@ -73,95 +74,6 @@ func (gcp *CloudProvider) DeleteInstance(ctx context.Context, node *corev1.Node)
return errors.Wrapf(err, "failed to get compute instance: %s/%s/%s", project, zone, instanceName)
}

// Until KEP-3836 has been released we explicitly remove the instance from any instance groups
// managed by the GCP Cloud Controller Manager to trigger connection draining. This is required
// since there seems to be a bug in the GCP Cloud Controller Manager where it does not remove an
// instance from a backend instance group if it is the last instance in the group; in this case
// it updates the backend service to remove the instance group as a backend which does not seem
// to trigger connection draining:
// https://cloud.google.com/load-balancing/docs/enabling-connection-draining
// https://github.com/kubernetes/cloud-provider-gcp/issues/643
instanceGroupsListCall := gcp.computeService.InstanceGroups.List(project, zone)
for {
instanceGroups, err := instanceGroupsListCall.Do()
if err != nil {
return err
}
for _, instanceGroup := range instanceGroups.Items {
// Only consider instance groups managed by the GCP Cloud Controller Manager:
// https://github.com/kubernetes/cloud-provider-gcp/blob/398b1a191aa49b7c67ed5e4677400b73243904e2/providers/gce/gce_loadbalancer_naming.go#L35-L43
// TODO(dippynark): Use the cluster ID:
// https://github.com/kubernetes/cloud-provider-gcp/blob/398b1a191aa49b7c67ed5e4677400b73243904e2/providers/gce/gce_clusterid.go#L43-L50
if !strings.HasPrefix(instanceGroup.Name, "k8s-ig--") {
continue
}
// Ignore empty instance groups
if instanceGroup.Size == 0 {
continue
}
instanceGroupInstances, err := gcp.computeService.InstanceGroups.ListInstances(project, zone, instanceGroup.Name, &compute.InstanceGroupsListInstancesRequest{}).Do()
if err != nil {
return err
}
for _, instanceGroupInstance := range instanceGroupInstances.Items {
if instanceGroupInstance.Instance == instance.SelfLink {
// There is a small chance that the GCP Cloud Controller Manager is currently
// processing an old list of Nodes so that after we remove the instance from its
// instance group the GCP Cloud Controller Manager will add it back. We mitigate
// this by periodically attempting to remove the instance. Once KEP-3836 has
// been released we will not need to remove the instance, we will just need to
// wait for load balancer health checks to mark the instance as unhealthy, so we
// periodically attempt removal for the same length of time as we will need to
// wait so that connection draining works before and after KEP-3836
removeInstance := func() error {
operation, err := gcp.computeService.InstanceGroups.RemoveInstances(project, zone, instanceGroup.Name,
&compute.InstanceGroupsRemoveInstancesRequest{Instances: []*compute.InstanceReference{{Instance: instance.SelfLink}}}).Do()
// Ignore the error if the instance has already been removed
if apiErr, ok := err.(*googleapi.Error); ok && apiErr.Code == http.StatusBadRequest &&
len(apiErr.Errors) == 1 && apiErr.Errors[0].Reason == "memberNotFound" {
return nil
}
if err != nil {
return err
}
err = gcp.waitForZonalComputeOperation(ctx, project, zone, operation.Name)
if err != nil {
return err
}
return nil
}
// Once KEP-3836 has been released we can simply sleep for the connection
// draining period instead of periodically attempting to remove the instance
ctxWithTimeout, cancel := context.WithTimeout(ctx, externalLoadBalancerConnectionDrainingPeriod)
defer cancel()
err := removeInstance()
if err != nil {
return err
}
removeInstanceLoop:
for {
select {
case <-ctxWithTimeout.Done():
break removeInstanceLoop
case <-time.After(externalLoadBalancerConnectionDrainingInterval):
err := removeInstance()
if err != nil {
return err
}
}
}
}
}
}
// Continue if there is another page of results...
if len(instanceGroups.NextPageToken) > 0 {
instanceGroupsListCall.PageToken(instanceGroups.NextPageToken)
continue
}
// ...otherwise we are done
break
}

// Determine the managed instance group that created the instance
managedInstanceGroupName, err := getManagedInstanceGroupFromInstance(instance)
if err != nil {
Expand All @@ -178,11 +90,11 @@ func (gcp *CloudProvider) DeleteInstance(ctx context.Context, node *corev1.Node)
if err != nil {
return errors.Wrap(err, "failed to delete managed instance")
}
err = gcp.waitForZonalComputeOperation(ctx, project, zone, r.Name)
err = gcp.waitForZonalComputeOperation(project, zone, r.Name)
if err != nil {
return errors.Wrap(err, "failed to wait for compute operation to complete successfully")
}
err = gcp.waitForManagedInstanceGroupStability(ctx, project, zone, managedInstanceGroupName)
err = gcp.waitForManagedInstanceGroupStability(project, zone, managedInstanceGroupName)
if err != nil {
return errors.Wrap(err, "failed to wait for managed instance group stability")
}
Expand All @@ -199,3 +111,27 @@ func (gcp *CloudProvider) IsSpotInstance(ctx context.Context, node *corev1.Node)
}
return node.Labels[spotNodeLabelKey] == "true" || node.Labels[preemptibleNodeLabelKey] == "true", nil
}

func timeSinceToBeDeletedTaintAdded(node *corev1.Node, now time.Time) time.Duration {
// Retrieve taint value
toBeDeletedTaintAddedValue := ""
for _, taint := range node.Spec.Taints {
if taint.Key == kubernetes.ToBeDeletedTaint && taint.Effect == corev1.TaintEffectNoSchedule {
toBeDeletedTaintAddedValue = taint.Value
break
}
}

// Attempt to parse taint value as Unix timestamp
unixTimeSeconds, err := strconv.ParseInt(toBeDeletedTaintAddedValue, 10, 64)
if err != nil {
return 0
}

timeSinceToBeDeletedTaintAdded := now.Sub(time.Unix(unixTimeSeconds, 0))
// Ignore negative durations to avoid waiting for an unbounded amount of time
if timeSinceToBeDeletedTaintAdded < 0 {
return 0
}
return timeSinceToBeDeletedTaintAdded
}
52 changes: 52 additions & 0 deletions pkg/cloudprovider/gcp/cloud_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package gcp

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -80,3 +82,53 @@ func TestIsSpotInstance(t *testing.T) {
})
}
}

func TestTimeSinceToBeDeletedTaintAdded(t *testing.T) {
tests := map[string]struct {
node *corev1.Node
now time.Time
timeSinceToBeDeletedTaintAdded time.Duration
}{
"missingTaint": {
node: &corev1.Node{},
now: time.Now(),
timeSinceToBeDeletedTaintAdded: 0,
},
"recentTaint": {
node: &corev1.Node{
Spec: corev1.NodeSpec{
Taints: []corev1.Taint{
{
Key: "ToBeDeletedByClusterAutoscaler",
Value: fmt.Sprint(time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC).Unix()),
Effect: corev1.TaintEffectNoSchedule,
},
},
},
},
now: time.Date(0, 0, 0, 0, 1, 0, 0, time.UTC),
timeSinceToBeDeletedTaintAdded: time.Minute,
},
"futureTaint": {
node: &corev1.Node{
Spec: corev1.NodeSpec{
Taints: []corev1.Taint{
{
Key: "ToBeDeletedByClusterAutoscaler",
Value: fmt.Sprint(time.Date(0, 0, 0, 0, 1, 0, 0, time.UTC).Unix()),
Effect: corev1.TaintEffectNoSchedule,
},
},
},
},
now: time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC),
timeSinceToBeDeletedTaintAdded: 0,
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
timeSinceToBeDeletedTaintAdded := timeSinceToBeDeletedTaintAdded(test.node, test.now)
require.Equal(t, test.timeSinceToBeDeletedTaintAdded, timeSinceToBeDeletedTaintAdded)
})
}
}
9 changes: 4 additions & 5 deletions pkg/cloudprovider/gcp/compute.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package gcp

import (
"context"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -33,7 +32,7 @@ func getManagedInstanceGroupFromInstance(instance *compute.Instance) (string, er
return "", fmt.Errorf("failed to determine managed instance group for instance %s", instance.Name)
}

func (gcp *CloudProvider) waitForManagedInstanceGroupStability(ctx context.Context, project, zone, managedInstanceGroupName string) error {
func (gcp *CloudProvider) waitForManagedInstanceGroupStability(project, zone, managedInstanceGroupName string) error {
for {
r, err := gcp.computeService.InstanceGroupManagers.Get(project, zone, managedInstanceGroupName).Do()
if err != nil {
Expand All @@ -46,13 +45,13 @@ func (gcp *CloudProvider) waitForManagedInstanceGroupStability(ctx context.Conte
}
}

func (gcp *CloudProvider) waitForZonalComputeOperation(ctx context.Context, project, zone, operationName string) error {
return waitForComputeOperation(ctx, project, func() (*compute.Operation, error) {
func (gcp *CloudProvider) waitForZonalComputeOperation(project, zone, operationName string) error {
return waitForComputeOperation(func() (*compute.Operation, error) {
return gcp.computeService.ZoneOperations.Get(project, zone, operationName).Do()
})
}

func waitForComputeOperation(ctx context.Context, project string, getOperation func() (*compute.Operation, error)) error {
func waitForComputeOperation(getOperation func() (*compute.Operation, error)) error {
for {
operation, err := getOperation()
if err != nil {
Expand Down
Loading

0 comments on commit b1cc3bc

Please sign in to comment.