From d059a916c37f9b265967c5503810cc5306644c9e Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Tue, 2 Feb 2021 09:46:00 +0100 Subject: [PATCH] kube-proxy: clear conntrack entries after rules are in place Clear conntrack entries for UDP NodePorts, this has to be done AFTER the iptables rules are programmed. It can happen that traffic to the NodePort hits the host before the iptables rules are programmed this will create an stale entry in conntrack that will blackhole the traffic, so we need to clear it ONLY when the service has endpoints. --- pkg/proxy/iptables/BUILD | 1 + pkg/proxy/iptables/proxier.go | 40 +++++++----- pkg/proxy/iptables/proxier_test.go | 97 +++++++++++++++++++++++++++++- 3 files changed, 119 insertions(+), 19 deletions(-) diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 2bd97cf9eef8a..d4795d93b69fd 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -25,6 +25,7 @@ go_library( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/discovery/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 50d13c573085d..bb5ce61b35720 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -36,6 +36,7 @@ import ( v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" @@ -846,14 +847,23 @@ func (proxier *Proxier) syncProxyRules() { serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges) endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) - staleServices := serviceUpdateResult.UDPStaleClusterIP + // We need to detect stale connections to UDP Services so we + // can clean dangling conntrack entries that can blackhole traffic. + conntrackCleanupServiceIPs := serviceUpdateResult.UDPStaleClusterIP + conntrackCleanupServiceNodePorts := sets.NewInt() // merge stale services gathered from updateEndpointsMap + // an UDP service that changes from 0 to non-0 endpoints is considered stale. for _, svcPortName := range endpointUpdateResult.StaleServiceNames { if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { klog.V(2).Infof("Stale %s service %v -> %s", strings.ToLower(string(svcInfo.Protocol())), svcPortName, svcInfo.ClusterIP().String()) - staleServices.Insert(svcInfo.ClusterIP().String()) + conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPStrings() { - staleServices.Insert(extIP) + conntrackCleanupServiceIPs.Insert(extIP) + } + nodePort := svcInfo.NodePort() + if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 { + klog.V(2).Infof("Stale %s service NodePort %v -> %d", strings.ToLower(string(svcInfo.Protocol())), svcPortName, nodePort) + conntrackCleanupServiceNodePorts.Insert(nodePort) } } } @@ -1278,16 +1288,6 @@ func (proxier *Proxier) syncProxyRules() { klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) continue } - if lp.Protocol == "udp" { - // TODO: We might have multiple services using the same port, and this will clear conntrack for all of them. - // This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services. - // This only affects UDP connections, which are not common. - // See issue: https://github.com/kubernetes/kubernetes/issues/49881 - err := conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP) - if err != nil { - klog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err) - } - } replacementPortsMap[lp] = socket } } @@ -1633,14 +1633,22 @@ func (proxier *Proxier) syncProxyRules() { } // Finish housekeeping. + // Clear stale conntrack entries for UDP Services, this has to be done AFTER the iptables rules are programmed. // TODO: these could be made more consistent. - klog.V(4).Infof("Deleting stale services IPs: %v", staleServices.UnsortedList()) - for _, svcIP := range staleServices.UnsortedList() { + klog.V(4).Infof("Deleting conntrack stale entries for ClusterIP Services: %v", conntrackCleanupServiceIPs.UnsortedList()) + for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() { if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil { klog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err) } } - klog.V(4).Infof("Deleting stale endpoint connections: %v", endpointUpdateResult.StaleEndpoints) + klog.V(4).Infof("Deleting conntrack stale entries for NodePorts Services: %v", conntrackCleanupServiceNodePorts.UnsortedList()) + for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() { + err := conntrack.ClearEntriesForPort(proxier.exec, nodePort, isIPv6, v1.ProtocolUDP) + if err != nil { + klog.Errorf("Failed to clear udp conntrack on port %d: %v", nodePort, err) + } + } + klog.V(4).Infof("Deleting stale endpoint connections %v", endpointUpdateResult.StaleEndpoints) proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 0cae52809ddce..de2523893dda0 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -26,14 +26,13 @@ import ( "testing" "time" - "k8s.io/klog/v2" - "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" utilproxy "k8s.io/kubernetes/pkg/proxy/util" @@ -454,6 +453,14 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) { } } +// fakeCloseable implements utilproxy.Closeable +type fakeCloseable struct{} + +// Close fakes out the close() used by syncProxyRules to release a local port. +func (f *fakeCloseable) Close() error { + return nil +} + // fakePortOpener implements portOpener. type fakePortOpener struct { openPorts []*utilproxy.LocalPort @@ -463,7 +470,7 @@ type fakePortOpener struct { // to lock a local port. func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) { f.openPorts = append(f.openPorts, lp) - return nil, nil + return &fakeCloseable{}, nil } const testHostname = "test-hostname" @@ -2682,4 +2689,88 @@ COMMIT assert.NotEqual(t, expectedIPTablesWithSlice, fp.iptablesData.String()) } +func TestProxierDeleteNodePortStaleUDP(t *testing.T) { + fcmd := fakeexec.FakeCmd{} + fexec := fakeexec.FakeExec{ + LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, + } + execFunc := func(cmd string, args ...string) exec.Cmd { + return fakeexec.InitFakeCmd(&fcmd, cmd, args...) + } + cmdOutput := "1 flow entries have been deleted" + cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil } + + // Delete ClusterIP entries + fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) + fexec.CommandScript = append(fexec.CommandScript, execFunc) + // Delete NodePort entries + fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) + fexec.CommandScript = append(fexec.CommandScript, execFunc) + + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt, false) + fp.exec = &fexec + + svcIP := "10.20.30.41" + svcPort := 80 + nodePort := 31201 + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + Protocol: v1.ProtocolUDP, + } + + makeServiceMap(fp, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { + svc.Spec.ClusterIP = svcIP + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolUDP, + NodePort: int32(nodePort), + }} + }), + ) + makeEndpointsMap(fp) + + fp.syncProxyRules() + if fexec.CommandCalls != 0 { + t.Fatalf("Created service without endpoints must not clear conntrack entries") + } + + epIP := "10.180.0.1" + makeEndpointsMap(fp, + makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) { + ept.Subsets = []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{ + IP: epIP, + }}, + Ports: []v1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolUDP, + }}, + }} + }), + ) + + fp.syncProxyRules() + if fexec.CommandCalls != 2 { + t.Fatalf("Updated UDP service with new endpoints must clear UDP entries") + } + + // Delete ClusterIP Conntrack entries + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p %s", svcIP, strings.ToLower(string((v1.ProtocolUDP)))) + actualCommand := strings.Join(fcmd.CombinedOutputLog[0], " ") + if actualCommand != expectCommand { + t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand) + } + // Delete NodePort Conntrack entrie + expectCommand = fmt.Sprintf("conntrack -D -p %s --dport %d", strings.ToLower(string((v1.ProtocolUDP))), nodePort) + actualCommand = strings.Join(fcmd.CombinedOutputLog[1], " ") + if actualCommand != expectCommand { + t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand) + } +} + // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.