Skip to content

Commit

Permalink
Add Dual-Stack support to L4 ILB
Browse files Browse the repository at this point in the history
  • Loading branch information
panslava committed Aug 19, 2022
1 parent fb05af3 commit 1be3ae9
Show file tree
Hide file tree
Showing 19 changed files with 971 additions and 98 deletions.
30 changes: 23 additions & 7 deletions pkg/annotations/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const (
// ProtocolHTTP2 protocol for a service
ProtocolHTTP2 AppProtocol = "HTTP2"

IPv6Suffix = "-ipv6"
// ServiceStatusPrefix is the prefix used in annotations used to record
// debug information in the Service annotations. This is applicable to L4 ILB services.
ServiceStatusPrefix = "service.kubernetes.io"
Expand All @@ -82,24 +83,39 @@ const (
// UDPForwardingRuleKey is the annotation key used by l4 controller to record
// GCP UDP forwarding rule name.
UDPForwardingRuleKey = ServiceStatusPrefix + "/udp-" + ForwardingRuleResource
// TCPForwardingRuleIPv6Key is the annotation key used by l4 controller to record
// GCP IPv6 TCP forwarding rule name.
TCPForwardingRuleIPv6Key = TCPForwardingRuleKey + IPv6Suffix
// UDPForwardingRuleIPv6Key is the annotation key used by l4 controller to record
// GCP IPv6 UDP forwarding rule name.
UDPForwardingRuleIPv6Key = UDPForwardingRuleKey + IPv6Suffix
// BackendServiceKey is the annotation key used by l4 controller to record
// GCP Backend service name.
BackendServiceKey = ServiceStatusPrefix + "/" + BackendServiceResource
// FirewallRuleKey is the annotation key used by l4 controller to record
// GCP Firewall rule name.
FirewallRuleKey = ServiceStatusPrefix + "/" + FirewallRuleResource
// FirewallRuleIPv6Key is the annotation key used by l4 controller to record
// GCP IPv6 Firewall rule name.
FirewallRuleIPv6Key = FirewallRuleKey + IPv6Suffix
// HealthcheckKey is the annotation key used by l4 controller to record
// GCP Healthcheck name.
HealthcheckKey = ServiceStatusPrefix + "/" + HealthcheckResource
// FirewallRuleForHealthcheckKey is the annotation key used by l4 controller to record
// the firewall rule name that allows healthcheck traffic.
FirewallRuleForHealthcheckKey = ServiceStatusPrefix + "/" + FirewallForHealthcheckResource
ForwardingRuleResource = "forwarding-rule"
BackendServiceResource = "backend-service"
FirewallRuleResource = "firewall-rule"
HealthcheckResource = "healthcheck"
FirewallForHealthcheckResource = "firewall-rule-for-hc"
AddressResource = "address"
FirewallRuleForHealthcheckKey = ServiceStatusPrefix + "/" + FirewallForHealthcheckResource
// FirewallRuleForHealthcheckIPv6Key is the annotation key used by l4 controller to record
// the firewall rule name that allows IPv6 healthcheck traffic.
FirewallRuleForHealthcheckIPv6Key = FirewallRuleForHealthcheckKey + IPv6Suffix
ForwardingRuleResource = "forwarding-rule"
ForwardingRuleIPv6Resource = ForwardingRuleResource + IPv6Suffix
BackendServiceResource = "backend-service"
FirewallRuleResource = "firewall-rule"
FirewallRuleIPv6Resource = FirewallRuleResource + IPv6Suffix
HealthcheckResource = "healthcheck"
FirewallForHealthcheckResource = "firewall-rule-for-hc"
FirewallForHealthcheckIPv6Resource = FirewallRuleForHealthcheckKey + IPv6Suffix
AddressResource = "address"
// TODO(slavik): import this from gce_annotations when it will be merged in k8s
RBSAnnotationKey = "cloud.google.com/l4-rbs"
RBSEnabled = "enabled"
Expand Down
2 changes: 2 additions & 0 deletions pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ var (
EnableTrafficScaling bool
EnableEndpointSlices bool
EnablePinhole bool
EnableL4ILBDualStack bool
EnableMultipleIGs bool
MaxIGSize int
}{
Expand Down Expand Up @@ -249,6 +250,7 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5
flag.BoolVar(&F.EnableTrafficScaling, "enable-traffic-scaling", false, "Enable support for Service {max-rate-per-endpoint, capacity-scaler}")
flag.BoolVar(&F.EnableEndpointSlices, "enable-endpoint-slices", false, "Enable using Endpoint Slices API instead of Endpoints API")
flag.BoolVar(&F.EnablePinhole, "enable-pinhole", false, "Enable Pinhole firewall feature")
flag.BoolVar(&F.EnableL4ILBDualStack, "enable-l4ilb-ipv6", false, "Enable Handling ipv6 L4 ILB load balancers")
flag.BoolVar(&F.EnableMultipleIGs, "enable-multiple-igs", false, "Enable using multiple unmanaged instance groups")
flag.IntVar(&F.MaxIGSize, "max-ig-size", 1000, "Max number of instances in Instance Group")
flag.DurationVar(&F.MetricsExportInterval, "metrics-export-interval", 10*time.Minute, `Period for calculating and exporting metrics related to state of managed objects.`)
Expand Down
68 changes: 58 additions & 10 deletions pkg/healthchecks/healthchecks_l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ func L4() *l4HealthChecks {
// Services of different scope (Global vs Regional).

func (l4hc *l4HealthChecks) EnsureL4HealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) *EnsureL4HealthCheckResult {
return l4hc.EnsureL4DualStackHealthCheck(svc, namer, sharedHC, scope, l4Type, nodeNames, true, false)
}

func (l4hc *l4HealthChecks) EnsureL4DualStackHealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string, needsIPv4 bool, needsIPv6 bool) *EnsureL4HealthCheckResult {
namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}

hcName, hcFwName := namer.L4HealthCheck(svc.Namespace, svc.Name, sharedHC)
Expand All @@ -126,24 +130,57 @@ func (l4hc *l4HealthChecks) EnsureL4HealthCheck(svc *corev1.Service, namer namer
}
}

klog.V(3).Infof("Healthcheck created, ensuring firewall rule %s", hcFwName)
err = l4hc.ensureFirewall(svc, hcFwName, hcPort, sharedHC, nodeNames)
if err != nil {
return &EnsureL4HealthCheckResult{
GceResourceInError: annotations.HealthcheckResource,
Err: err,
hcResult := &EnsureL4HealthCheckResult{
HCName: hcName,
HCLink: hcLink,
}

if needsIPv4 {
klog.V(3).Infof("Healthcheck created, ensuring firewall rule %s", hcFwName)
err = l4hc.ensureFirewall(svc, hcFwName, hcPort, sharedHC, nodeNames)
if err != nil {
return &EnsureL4HealthCheckResult{
GceResourceInError: annotations.FirewallForHealthcheckResource,
Err: err,
}
}
hcResult.HCFirewallRuleName = hcFwName
}

if needsIPv6 {
ipv6HCFWName := namer.L4IPv6HealthCheckFirewall(svc.Namespace, svc.Name, sharedHC)
klog.V(3).Infof("Healthcheck created, ensuring ipv6 firewall rule %s", ipv6HCFWName)
err = l4hc.ensureIPv6Firewall(svc, ipv6HCFWName, hcPort, sharedHC, nodeNames)
if err != nil {
return &EnsureL4HealthCheckResult{
GceResourceInError: annotations.FirewallForHealthcheckIPv6Resource,
Err: err,
}
}
hcResult.HCFirewallRuleIPv6Name = ipv6HCFWName
}
return &EnsureL4HealthCheckResult{
HCName: hcName,
HCLink: hcLink,
HCFirewallRuleName: hcFwName,

return hcResult
}

func (l4hc *l4HealthChecks) ensureIPv6Firewall(svc *corev1.Service, ipv6HCFWName string, hcPort int32, isSharedHC bool, nodeNames []string) error {
hcFWRParams := firewalls.FirewallParams{
PortRanges: []string{strconv.Itoa(int(hcPort))},
SourceRanges: L4ILBIPv6HCRange.StringSlice(),
Protocol: string(corev1.ProtocolTCP),
Name: ipv6HCFWName,
NodeNames: nodeNames,
}
return firewalls.EnsureL4LBFirewallForHc(svc, isSharedHC, &hcFWRParams, l4hc.cloud, l4hc.recorderFactory.Recorder(svc.Namespace))
}

// DeleteHealthCheck deletes health check (and firewall rule) for l4 service. Checks if shared resources are safe to delete.
func (l4hc *l4HealthChecks) DeleteHealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, error) {
return l4hc.DeleteDualStackHealthCheck(svc, namer, sharedHC, scope, l4Type, false)
}

// DeleteDualStackHealthCheck deletes health check (and firewall rule) for l4 service. Checks if shared resources are safe to delete.
func (l4hc *l4HealthChecks) DeleteDualStackHealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, handleIPv6 bool) (string, error) {
hcName, hcFwName := namer.L4HealthCheck(svc.Namespace, svc.Name, sharedHC)
namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
klog.V(3).Infof("Trying to delete L4 healthcheck: %s and firewall rule %s from service %s, shared: %v", hcName, hcFwName, namespacedName.String(), sharedHC)
Expand All @@ -164,6 +201,12 @@ func (l4hc *l4HealthChecks) DeleteHealthCheck(svc *corev1.Service, namer namer.L
return "", nil
}
// Health check deleted, now delete the firewall rule
if handleIPv6 {
errResource, err := l4hc.deleteIPv6HealthCheckFirewall(svc, hcName, namer, sharedHC, l4Type)
if err != nil {
return errResource, err
}
}
return l4hc.deleteHealthCheckFirewall(svc, hcName, hcFwName, sharedHC, l4Type)
}

Expand Down Expand Up @@ -256,6 +299,11 @@ func (l4hc *l4HealthChecks) deleteHealthCheckFirewall(svc *corev1.Service, hcNam
return "", nil
}

func (l4hc *l4HealthChecks) deleteIPv6HealthCheckFirewall(svc *corev1.Service, hcName string, namer namer.L4ResourcesNamer, sharedHC bool, l4type utils.L4LBType) (string, error) {
ipv6hcFwName := namer.L4IPv6HealthCheckFirewall(svc.Namespace, svc.Name, sharedHC)
return l4hc.deleteHealthCheckFirewall(svc, hcName, ipv6hcFwName, sharedHC, l4type)
}

func (l4hc *l4HealthChecks) healthCheckFirewallSafeToDelete(hcName string, sharedHC bool, l4Type utils.L4LBType) (bool, error) {
if !sharedHC {
return true, nil
Expand Down
15 changes: 10 additions & 5 deletions pkg/healthchecks/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,19 @@ type HealthChecker interface {
type L4HealthChecks interface {
// EnsureL4HealthCheck creates health check (and firewall rule) for l4 service
EnsureL4HealthCheck(svc *v1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) *EnsureL4HealthCheckResult
// EnsureL4DualStackHealthCheck creates health check (and firewall rule) for l4 service. Handles both IPv4 and IPv6
EnsureL4DualStackHealthCheck(svc *v1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string, needsIPv4 bool, needsIPv6 bool) *EnsureL4HealthCheckResult
// DeleteHealthCheck deletes health check (and firewall rule) for l4 service
DeleteHealthCheck(svc *v1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, error)
// DeleteDualStackHealthCheck deletes health check (and firewall rule) for l4 service, deletes IPv6 if asked
DeleteDualStackHealthCheck(svc *v1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, handleIPv6 bool) (string, error)
}

type EnsureL4HealthCheckResult struct {
HCName string
HCLink string
HCFirewallRuleName string
GceResourceInError string
Err error
HCName string
HCLink string
HCFirewallRuleName string
HCFirewallRuleIPv6Name string
GceResourceInError string
Err error
}
31 changes: 31 additions & 0 deletions pkg/healthchecks/ipv6.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package healthchecks

import (
"fmt"

"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"
)

const (
l4ELBIPv6HCRangeString = "2600:1901:8001::/48"
l4ILBIPv6HCRangeString = "2600:2d00:1:b029::/64"
)

var (
L4ELBIPv6HCRange utilnet.IPNetSet
L4ILBIPv6HCRange utilnet.IPNetSet
)

func init() {
var err error
L4ELBIPv6HCRange, err = utilnet.ParseIPNets([]string{l4ELBIPv6HCRangeString}...)
if err != nil {
klog.Fatalf(fmt.Sprintf("utilnet.ParseIPNets([]string{%s}...) returned error %v, want nil", l4ELBIPv6HCRangeString, err))
}

L4ILBIPv6HCRange, err = utilnet.ParseIPNets([]string{l4ILBIPv6HCRangeString}...)
if err != nil {
klog.Fatalf(fmt.Sprintf("utilnet.ParseIPNets([]string{%s}...) returned error %v, want nil", l4ILBIPv6HCRangeString, err))
}
}
5 changes: 3 additions & 2 deletions pkg/l4lb/l4controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/ingress-gce/pkg/backends"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller/translator"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/forwardingrules"
l4metrics "k8s.io/ingress-gce/pkg/l4lb/metrics"
"k8s.io/ingress-gce/pkg/loadbalancers"
Expand Down Expand Up @@ -208,7 +209,7 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(key string, service *v1.Se
}
// Use the same function for both create and updates. If controller crashes and restarts,
// all existing services will show up as Service Adds.
l4 := loadbalancers.NewL4Handler(service, l4c.ctx.Cloud, meta.Regional, l4c.namer, l4c.ctx.Recorder(service.Namespace))
l4 := loadbalancers.NewL4Handler(service, l4c.ctx.Cloud, meta.Regional, l4c.namer, l4c.ctx.Recorder(service.Namespace), flags.F.EnableL4ILBDualStack)
syncResult := l4.EnsureInternalLoadBalancer(nodeNames, service)
// syncResult will not be nil
if syncResult.Error != nil {
Expand Down Expand Up @@ -248,7 +249,7 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(key string, service *v1.Se
}

func (l4c *L4Controller) processServiceDeletion(key string, svc *v1.Service) *loadbalancers.L4ILBSyncResult {
l4 := loadbalancers.NewL4Handler(svc, l4c.ctx.Cloud, meta.Regional, l4c.namer, l4c.ctx.Recorder(svc.Namespace))
l4 := loadbalancers.NewL4Handler(svc, l4c.ctx.Cloud, meta.Regional, l4c.namer, l4c.ctx.Recorder(svc.Namespace), flags.F.EnableL4ILBDualStack)
l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer for %s", key)
result := l4.EnsureInternalLoadBalancerDeleted(svc)
if result.Error != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/l4lb/l4netlbcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ func TestHealthCheckWhenExternalTrafficPolicyWasUpdated(t *testing.T) {
// Update ExternalTrafficPolicy to Cluster check if shared HC was created
err = updateAndAssertExternalTrafficPolicy(newSvc, lc, v1.ServiceExternalTrafficPolicyTypeCluster, hcNameShared)
if err != nil {
t.Errorf("Error asserthing shared health check %v", err)
t.Errorf("Error asserting shared health check %v", err)
}
newSvc.DeletionTimestamp = &metav1.Time{}
updateNetLBService(lc, newSvc)
Expand Down Expand Up @@ -1005,7 +1005,7 @@ func TestIsRBSBasedService(t *testing.T) {
func TestIsRBSBasedServiceWithILBServices(t *testing.T) {
controller := newL4NetLBServiceController()
ilbSvc := test.NewL4ILBService(false, 8080)
ilbFrName := loadbalancers.NewL4Handler(ilbSvc, controller.ctx.Cloud, meta.Regional, controller.namer, record.NewFakeRecorder(100)).GetFRName()
ilbFrName := loadbalancers.NewL4Handler(ilbSvc, controller.ctx.Cloud, meta.Regional, controller.namer, record.NewFakeRecorder(100), false).GetFRName()
ilbSvc.Annotations = map[string]string{
annotations.TCPForwardingRuleKey: ilbFrName,
annotations.UDPForwardingRuleKey: ilbFrName,
Expand Down
120 changes: 120 additions & 0 deletions pkg/loadbalancers/forwarding_rules_ipv6.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package loadbalancers

import (
"fmt"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
corev1 "k8s.io/api/core/v1"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog/v2"
"k8s.io/legacy-cloud-providers/gce"
)

func (l *L4) ensureIPv6ForwardingRule(bsLink string, options gce.ILBOptions) (*composite.ForwardingRule, error) {
expectedIPv6FwdRule, err := l.buildExpectedIPv6ForwardingRule(bsLink, options)
if err != nil {
return nil, fmt.Errorf("l.buildExpectedIPv6ForwardingRule(%s, %v) returned error %w, want nil", bsLink, options, err)
}

existingIPv6FwdRule, err := l.forwardingRules.Get(expectedIPv6FwdRule.Name)
if err != nil {
return nil, fmt.Errorf("l.forwardingRules.GetForwardingRule(%s) returned error %w, want nil", expectedIPv6FwdRule.Name, err)
}

if existingIPv6FwdRule != nil {
equal, err := EqualIPv6ForwardingRules(existingIPv6FwdRule, expectedIPv6FwdRule)
if err != nil {
return existingIPv6FwdRule, err
}
if equal {
klog.V(2).Infof("ensureIPv6ForwardingRule: Skipping update of unchanged ipv6 forwarding rule - %s", expectedIPv6FwdRule.Name)
return existingIPv6FwdRule, nil
}

frDiff := cmp.Diff(existingIPv6FwdRule, expectedIPv6FwdRule, cmpopts.IgnoreFields(composite.ForwardingRule{}, "IPAddress"))
klog.V(2).Infof("ensureIPv6ForwardingRule: forwarding rule changed - Existing - %+v\n, New - %+v\n, Diff(-existing, +new) - %s\n. Deleting existing ipv6 forwarding rule.", existingIPv6FwdRule, expectedIPv6FwdRule, frDiff)

err = l.forwardingRules.Delete(existingIPv6FwdRule.Name)
if err != nil {
return nil, err
}
l.recorder.Eventf(l.Service, corev1.EventTypeNormal, events.SyncIngress, "ForwardingRule %q deleted", existingIPv6FwdRule.Name)
}
klog.V(2).Infof("ensureIPv6ForwardingRule: Creating/Recreating forwarding rule - %s", expectedIPv6FwdRule.Name)
err = l.forwardingRules.Create(expectedIPv6FwdRule)
if err != nil {
return nil, err
}

createdFr, err := l.forwardingRules.Get(expectedIPv6FwdRule.Name)
return createdFr, err
}

func (l *L4) buildExpectedIPv6ForwardingRule(bsLink string, options gce.ILBOptions) (*composite.ForwardingRule, error) {
frName := l.getIPv6FRName()

frDesc, err := utils.MakeL4IPv6ForwardingRuleDescription(l.Service)
if err != nil {
return nil, fmt.Errorf("failed to compute description for forwarding rule %s, err: %w", frName, err)
}

subnetworkURL := l.cloud.SubnetworkURL()

if options.SubnetName != "" {
key, err := l.CreateKey(frName)
if err != nil {
return nil, err
}
subnetKey := *key
subnetKey.Name = options.SubnetName
subnetworkURL = cloud.SelfLink(meta.VersionGA, l.cloud.NetworkProjectID(), "subnetworks", &subnetKey)
}

svcPorts := l.Service.Spec.Ports
ports := utils.GetPorts(svcPorts)
protocol := utils.GetProtocol(svcPorts)

fr := &composite.ForwardingRule{
Name: frName,
Description: frDesc,
IPProtocol: string(protocol),
Ports: ports,
LoadBalancingScheme: string(cloud.SchemeInternal),
BackendService: bsLink,
IpVersion: "IPV6",
Network: l.cloud.NetworkURL(),
Subnetwork: subnetworkURL,
AllowGlobalAccess: options.AllowGlobalAccess,
NetworkTier: cloud.NetworkTierPremium.ToGCEValue(),
}
if len(ports) > maxL4ILBPorts {
fr.Ports = nil
fr.AllPorts = true
}

return fr, nil
}

func EqualIPv6ForwardingRules(fr1, fr2 *composite.ForwardingRule) (bool, error) {
id1, err := cloud.ParseResourceURL(fr1.BackendService)
if err != nil {
return false, fmt.Errorf("EqualIPv6ForwardingRules(): failed to parse backend resource URL from FR, err - %w", err)
}
id2, err := cloud.ParseResourceURL(fr2.BackendService)
if err != nil {
return false, fmt.Errorf("EqualIPv6ForwardingRules(): failed to parse resource URL from FR, err - %w", err)
}
return fr1.IPProtocol == fr2.IPProtocol &&
fr1.LoadBalancingScheme == fr2.LoadBalancingScheme &&
utils.EqualStringSets(fr1.Ports, fr2.Ports) &&
id1.Equal(id2) &&
fr1.AllowGlobalAccess == fr2.AllowGlobalAccess &&
fr1.AllPorts == fr2.AllPorts &&
fr1.Subnetwork == fr2.Subnetwork &&
fr1.NetworkTier == fr2.NetworkTier, nil
}
Loading

0 comments on commit 1be3ae9

Please sign in to comment.