diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 2bd97cf9eef8a..d4795d93b69fd 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -25,6 +25,7 @@ go_library( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/discovery/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 50d13c573085d..bb5ce61b35720 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -36,6 +36,7 @@ import ( v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" @@ -846,14 +847,23 @@ func (proxier *Proxier) syncProxyRules() { serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges) endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) - staleServices := serviceUpdateResult.UDPStaleClusterIP + // We need to detect stale connections to UDP Services so we + // can clean dangling conntrack entries that can blackhole traffic. + conntrackCleanupServiceIPs := serviceUpdateResult.UDPStaleClusterIP + conntrackCleanupServiceNodePorts := sets.NewInt() // merge stale services gathered from updateEndpointsMap + // an UDP service that changes from 0 to non-0 endpoints is considered stale. for _, svcPortName := range endpointUpdateResult.StaleServiceNames { if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { klog.V(2).Infof("Stale %s service %v -> %s", strings.ToLower(string(svcInfo.Protocol())), svcPortName, svcInfo.ClusterIP().String()) - staleServices.Insert(svcInfo.ClusterIP().String()) + conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPStrings() { - staleServices.Insert(extIP) + conntrackCleanupServiceIPs.Insert(extIP) + } + nodePort := svcInfo.NodePort() + if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 { + klog.V(2).Infof("Stale %s service NodePort %v -> %d", strings.ToLower(string(svcInfo.Protocol())), svcPortName, nodePort) + conntrackCleanupServiceNodePorts.Insert(nodePort) } } } @@ -1278,16 +1288,6 @@ func (proxier *Proxier) syncProxyRules() { klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) continue } - if lp.Protocol == "udp" { - // TODO: We might have multiple services using the same port, and this will clear conntrack for all of them. - // This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services. - // This only affects UDP connections, which are not common. - // See issue: https://github.com/kubernetes/kubernetes/issues/49881 - err := conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP) - if err != nil { - klog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err) - } - } replacementPortsMap[lp] = socket } } @@ -1633,14 +1633,22 @@ func (proxier *Proxier) syncProxyRules() { } // Finish housekeeping. + // Clear stale conntrack entries for UDP Services, this has to be done AFTER the iptables rules are programmed. // TODO: these could be made more consistent. - klog.V(4).Infof("Deleting stale services IPs: %v", staleServices.UnsortedList()) - for _, svcIP := range staleServices.UnsortedList() { + klog.V(4).Infof("Deleting conntrack stale entries for ClusterIP Services: %v", conntrackCleanupServiceIPs.UnsortedList()) + for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() { if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil { klog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err) } } - klog.V(4).Infof("Deleting stale endpoint connections: %v", endpointUpdateResult.StaleEndpoints) + klog.V(4).Infof("Deleting conntrack stale entries for NodePorts Services: %v", conntrackCleanupServiceNodePorts.UnsortedList()) + for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() { + err := conntrack.ClearEntriesForPort(proxier.exec, nodePort, isIPv6, v1.ProtocolUDP) + if err != nil { + klog.Errorf("Failed to clear udp conntrack on port %d: %v", nodePort, err) + } + } + klog.V(4).Infof("Deleting stale endpoint connections %v", endpointUpdateResult.StaleEndpoints) proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 0cae52809ddce..de2523893dda0 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -26,14 +26,13 @@ import ( "testing" "time" - "k8s.io/klog/v2" - "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" utilproxy "k8s.io/kubernetes/pkg/proxy/util" @@ -454,6 +453,14 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) { } } +// fakeCloseable implements utilproxy.Closeable +type fakeCloseable struct{} + +// Close fakes out the close() used by syncProxyRules to release a local port. +func (f *fakeCloseable) Close() error { + return nil +} + // fakePortOpener implements portOpener. type fakePortOpener struct { openPorts []*utilproxy.LocalPort @@ -463,7 +470,7 @@ type fakePortOpener struct { // to lock a local port. func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) { f.openPorts = append(f.openPorts, lp) - return nil, nil + return &fakeCloseable{}, nil } const testHostname = "test-hostname" @@ -2682,4 +2689,88 @@ COMMIT assert.NotEqual(t, expectedIPTablesWithSlice, fp.iptablesData.String()) } +func TestProxierDeleteNodePortStaleUDP(t *testing.T) { + fcmd := fakeexec.FakeCmd{} + fexec := fakeexec.FakeExec{ + LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, + } + execFunc := func(cmd string, args ...string) exec.Cmd { + return fakeexec.InitFakeCmd(&fcmd, cmd, args...) + } + cmdOutput := "1 flow entries have been deleted" + cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil } + + // Delete ClusterIP entries + fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) + fexec.CommandScript = append(fexec.CommandScript, execFunc) + // Delete NodePort entries + fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) + fexec.CommandScript = append(fexec.CommandScript, execFunc) + + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt, false) + fp.exec = &fexec + + svcIP := "10.20.30.41" + svcPort := 80 + nodePort := 31201 + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + Protocol: v1.ProtocolUDP, + } + + makeServiceMap(fp, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { + svc.Spec.ClusterIP = svcIP + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolUDP, + NodePort: int32(nodePort), + }} + }), + ) + makeEndpointsMap(fp) + + fp.syncProxyRules() + if fexec.CommandCalls != 0 { + t.Fatalf("Created service without endpoints must not clear conntrack entries") + } + + epIP := "10.180.0.1" + makeEndpointsMap(fp, + makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) { + ept.Subsets = []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{ + IP: epIP, + }}, + Ports: []v1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolUDP, + }}, + }} + }), + ) + + fp.syncProxyRules() + if fexec.CommandCalls != 2 { + t.Fatalf("Updated UDP service with new endpoints must clear UDP entries") + } + + // Delete ClusterIP Conntrack entries + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p %s", svcIP, strings.ToLower(string((v1.ProtocolUDP)))) + actualCommand := strings.Join(fcmd.CombinedOutputLog[0], " ") + if actualCommand != expectCommand { + t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand) + } + // Delete NodePort Conntrack entrie + expectCommand = fmt.Sprintf("conntrack -D -p %s --dport %d", strings.ToLower(string((v1.ProtocolUDP))), nodePort) + actualCommand = strings.Join(fcmd.CombinedOutputLog[1], " ") + if actualCommand != expectCommand { + t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand) + } +} + // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.