Skip to content

Commit

Permalink
Merge pull request kubernetes#101813 from melnikalex/automated-cherry…
Browse files Browse the repository at this point in the history
…-pick-of-#101592-upstream-release-1.20

Automated cherry pick of kubernetes#101592: chunk target operatation for aws targetGroup
  • Loading branch information
k8s-ci-robot authored Jun 14, 2021
2 parents 66612c8 + c4abedf commit 1e97120
Show file tree
Hide file tree
Showing 2 changed files with 563 additions and 73 deletions.
183 changes: 110 additions & 73 deletions staging/src/k8s.io/legacy-cloud-providers/aws/aws_loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ var (
defaultNlbHealthCheckThreshold = int64(3)
defaultHealthCheckPort = "traffic-port"
defaultHealthCheckPath = "/"

// Defaults for ELB Target operations
defaultRegisterTargetsChunkSize = 100
defaultDeregisterTargetsChunkSize = 100
)

func isNLB(annotations map[string]string) bool {
Expand Down Expand Up @@ -563,6 +567,7 @@ func (c *Cloud) deleteListenerV2(listener *elbv2.Listener) error {
// ensureTargetGroup creates a target group with a set of instances.
func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, serviceName types.NamespacedName, mapping nlbPortMapping, instances []string, vpcID string, tags map[string]string) (*elbv2.TargetGroup, error) {
dirty := false
expectedTargets := c.computeTargetGroupExpectedTargets(instances, mapping.TrafficPort)
if targetGroup == nil {
targetType := "instance"
name := c.buildTargetGroupName(serviceName, mapping.FrontendPort, mapping.TrafficPort, mapping.TrafficProtocol, targetType, mapping)
Expand Down Expand Up @@ -609,86 +614,23 @@ func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, serviceName ty
}
}

registerInput := &elbv2.RegisterTargetsInput{
TargetGroupArn: result.TargetGroups[0].TargetGroupArn,
Targets: []*elbv2.TargetDescription{},
}
for _, instanceID := range instances {
registerInput.Targets = append(registerInput.Targets, &elbv2.TargetDescription{
Id: aws.String(string(instanceID)),
Port: aws.Int64(mapping.TrafficPort),
})
}

_, err = c.elbv2.RegisterTargets(registerInput)
if err != nil {
return nil, fmt.Errorf("error registering targets for load balancer: %q", err)
tg := result.TargetGroups[0]
tgARN := aws.StringValue(tg.TargetGroupArn)
if err := c.ensureTargetGroupTargets(tgARN, expectedTargets, nil); err != nil {
return nil, err
}

return result.TargetGroups[0], nil
return tg, nil
}

// handle instances in service
{
healthResponse, err := c.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: targetGroup.TargetGroupArn})
tgARN := aws.StringValue(targetGroup.TargetGroupArn)
actualTargets, err := c.obtainTargetGroupActualTargets(tgARN)
if err != nil {
return nil, fmt.Errorf("error describing target group health: %q", err)
}
actualIDs := []string{}
for _, healthDescription := range healthResponse.TargetHealthDescriptions {
if aws.StringValue(healthDescription.TargetHealth.State) == elbv2.TargetHealthStateEnumHealthy {
actualIDs = append(actualIDs, *healthDescription.Target.Id)
} else if healthDescription.TargetHealth.Reason != nil {
switch aws.StringValue(healthDescription.TargetHealth.Reason) {
case elbv2.TargetHealthReasonEnumTargetDeregistrationInProgress:
// We don't need to count this instance in service if it is
// on its way out
default:
actualIDs = append(actualIDs, *healthDescription.Target.Id)
}
}
}

actual := sets.NewString(actualIDs...)
expected := sets.NewString(instances...)

additions := expected.Difference(actual)
removals := actual.Difference(expected)

if len(additions) > 0 {
registerInput := &elbv2.RegisterTargetsInput{
TargetGroupArn: targetGroup.TargetGroupArn,
Targets: []*elbv2.TargetDescription{},
}
for instanceID := range additions {
registerInput.Targets = append(registerInput.Targets, &elbv2.TargetDescription{
Id: aws.String(instanceID),
Port: aws.Int64(mapping.TrafficPort),
})
}
_, err := c.elbv2.RegisterTargets(registerInput)
if err != nil {
return nil, fmt.Errorf("error registering new targets in target group: %q", err)
}
dirty = true
return nil, err
}

if len(removals) > 0 {
deregisterInput := &elbv2.DeregisterTargetsInput{
TargetGroupArn: targetGroup.TargetGroupArn,
Targets: []*elbv2.TargetDescription{},
}
for instanceID := range removals {
deregisterInput.Targets = append(deregisterInput.Targets, &elbv2.TargetDescription{
Id: aws.String(instanceID),
Port: aws.Int64(mapping.TrafficPort),
})
}
_, err := c.elbv2.DeregisterTargets(deregisterInput)
if err != nil {
return nil, fmt.Errorf("error trying to deregister targets in target group: %q", err)
}
dirty = true
if err := c.ensureTargetGroupTargets(tgARN, expectedTargets, actualTargets); err != nil {
return nil, err
}
}

Expand Down Expand Up @@ -738,6 +680,101 @@ func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, serviceName ty
return targetGroup, nil
}

func (c *Cloud) ensureTargetGroupTargets(tgARN string, expectedTargets []*elbv2.TargetDescription, actualTargets []*elbv2.TargetDescription) error {
targetsToRegister, targetsToDeregister := c.diffTargetGroupTargets(expectedTargets, actualTargets)
if len(targetsToRegister) > 0 {
targetsToRegisterChunks := c.chunkTargetDescriptions(targetsToRegister, defaultRegisterTargetsChunkSize)
for _, targetsChunk := range targetsToRegisterChunks {
req := &elbv2.RegisterTargetsInput{
TargetGroupArn: aws.String(tgARN),
Targets: targetsChunk,
}
if _, err := c.elbv2.RegisterTargets(req); err != nil {
return fmt.Errorf("error trying to register targets in target group: %q", err)
}
}
}
if len(targetsToDeregister) > 0 {
targetsToDeregisterChunks := c.chunkTargetDescriptions(targetsToDeregister, defaultDeregisterTargetsChunkSize)
for _, targetsChunk := range targetsToDeregisterChunks {
req := &elbv2.DeregisterTargetsInput{
TargetGroupArn: aws.String(tgARN),
Targets: targetsChunk,
}
if _, err := c.elbv2.DeregisterTargets(req); err != nil {
return fmt.Errorf("error trying to deregister targets in target group: %q", err)
}
}
}
return nil
}

func (c *Cloud) computeTargetGroupExpectedTargets(instanceIDs []string, port int64) []*elbv2.TargetDescription {
expectedTargets := make([]*elbv2.TargetDescription, 0, len(instanceIDs))
for _, instanceID := range instanceIDs {
expectedTargets = append(expectedTargets, &elbv2.TargetDescription{
Id: aws.String(instanceID),
Port: aws.Int64(port),
})
}
return expectedTargets
}

func (c *Cloud) obtainTargetGroupActualTargets(tgARN string) ([]*elbv2.TargetDescription, error) {
req := &elbv2.DescribeTargetHealthInput{
TargetGroupArn: aws.String(tgARN),
}
resp, err := c.elbv2.DescribeTargetHealth(req)
if err != nil {
return nil, fmt.Errorf("error describing target group health: %q", err)
}
actualTargets := make([]*elbv2.TargetDescription, 0, len(resp.TargetHealthDescriptions))
for _, targetDesc := range resp.TargetHealthDescriptions {
if targetDesc.TargetHealth.Reason != nil && aws.StringValue(targetDesc.TargetHealth.Reason) == elbv2.TargetHealthReasonEnumTargetDeregistrationInProgress {
continue
}
actualTargets = append(actualTargets, targetDesc.Target)
}
return actualTargets, nil
}

// diffTargetGroupTargets computes the targets to register and targets to deregister based on existingTargets and desired instances.
func (c *Cloud) diffTargetGroupTargets(expectedTargets []*elbv2.TargetDescription, actualTargets []*elbv2.TargetDescription) (targetsToRegister []*elbv2.TargetDescription, targetsToDeregister []*elbv2.TargetDescription) {
expectedTargetsByUID := make(map[string]*elbv2.TargetDescription, len(expectedTargets))
for _, target := range expectedTargets {
targetUID := fmt.Sprintf("%v:%v", aws.StringValue(target.Id), aws.Int64Value(target.Port))
expectedTargetsByUID[targetUID] = target
}
actualTargetsByUID := make(map[string]*elbv2.TargetDescription, len(actualTargets))
for _, target := range actualTargets {
targetUID := fmt.Sprintf("%v:%v", aws.StringValue(target.Id), aws.Int64Value(target.Port))
actualTargetsByUID[targetUID] = target
}

expectedTargetsUIDs := sets.StringKeySet(expectedTargetsByUID)
actualTargetsUIDs := sets.StringKeySet(actualTargetsByUID)
for _, targetUID := range expectedTargetsUIDs.Difference(actualTargetsUIDs).List() {
targetsToRegister = append(targetsToRegister, expectedTargetsByUID[targetUID])
}
for _, targetUID := range actualTargetsUIDs.Difference(expectedTargetsUIDs).List() {
targetsToDeregister = append(targetsToDeregister, actualTargetsByUID[targetUID])
}
return targetsToRegister, targetsToDeregister
}

// chunkTargetDescriptions will split slice of TargetDescription into chunks
func (c *Cloud) chunkTargetDescriptions(targets []*elbv2.TargetDescription, chunkSize int) [][]*elbv2.TargetDescription {
var chunks [][]*elbv2.TargetDescription
for i := 0; i < len(targets); i += chunkSize {
end := i + chunkSize
if end > len(targets) {
end = len(targets)
}
chunks = append(chunks, targets[i:end])
}
return chunks
}

// updateInstanceSecurityGroupsForNLB will adjust securityGroup's settings to allow inbound traffic into instances from clientCIDRs and portMappings.
// TIP: if either instances or clientCIDRs or portMappings are nil, then the securityGroup rules for lbName are cleared.
func (c *Cloud) updateInstanceSecurityGroupsForNLB(lbName string, instances map[InstanceID]*ec2.Instance, subnetCIDRs []string, clientCIDRs []string, portMappings []nlbPortMapping) error {
Expand Down
Loading

0 comments on commit 1e97120

Please sign in to comment.