Skip to content

Commit

Permalink
Merge pull request kubernetes#99017 from aojea/automated-cherry-pick-…
Browse files Browse the repository at this point in the history
…of-#98305-upstream-release-1.20

Automated cherry pick of kubernetes#98305: kube-proxy: clear conntrack entries after rules are in place
  • Loading branch information
k8s-ci-robot authored Mar 2, 2021
2 parents 19c6bfe + d059a91 commit 1e3c47e
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 19 deletions.
1 change: 1 addition & 0 deletions pkg/proxy/iptables/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
Expand Down
40 changes: 24 additions & 16 deletions pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -846,14 +847,23 @@ func (proxier *Proxier) syncProxyRules() {
serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)

staleServices := serviceUpdateResult.UDPStaleClusterIP
// We need to detect stale connections to UDP Services so we
// can clean dangling conntrack entries that can blackhole traffic.
conntrackCleanupServiceIPs := serviceUpdateResult.UDPStaleClusterIP
conntrackCleanupServiceNodePorts := sets.NewInt()
// merge stale services gathered from updateEndpointsMap
// an UDP service that changes from 0 to non-0 endpoints is considered stale.
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
klog.V(2).Infof("Stale %s service %v -> %s", strings.ToLower(string(svcInfo.Protocol())), svcPortName, svcInfo.ClusterIP().String())
staleServices.Insert(svcInfo.ClusterIP().String())
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() {
staleServices.Insert(extIP)
conntrackCleanupServiceIPs.Insert(extIP)
}
nodePort := svcInfo.NodePort()
if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
klog.V(2).Infof("Stale %s service NodePort %v -> %d", strings.ToLower(string(svcInfo.Protocol())), svcPortName, nodePort)
conntrackCleanupServiceNodePorts.Insert(nodePort)
}
}
}
Expand Down Expand Up @@ -1278,16 +1288,6 @@ func (proxier *Proxier) syncProxyRules() {
klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
continue
}
if lp.Protocol == "udp" {
// TODO: We might have multiple services using the same port, and this will clear conntrack for all of them.
// This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services.
// This only affects UDP connections, which are not common.
// See issue: https://github.com/kubernetes/kubernetes/issues/49881
err := conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
if err != nil {
klog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err)
}
}
replacementPortsMap[lp] = socket
}
}
Expand Down Expand Up @@ -1633,14 +1633,22 @@ func (proxier *Proxier) syncProxyRules() {
}

// Finish housekeeping.
// Clear stale conntrack entries for UDP Services, this has to be done AFTER the iptables rules are programmed.
// TODO: these could be made more consistent.
klog.V(4).Infof("Deleting stale services IPs: %v", staleServices.UnsortedList())
for _, svcIP := range staleServices.UnsortedList() {
klog.V(4).Infof("Deleting conntrack stale entries for ClusterIP Services: %v", conntrackCleanupServiceIPs.UnsortedList())
for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() {
if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
klog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
}
}
klog.V(4).Infof("Deleting stale endpoint connections: %v", endpointUpdateResult.StaleEndpoints)
klog.V(4).Infof("Deleting conntrack stale entries for NodePorts Services: %v", conntrackCleanupServiceNodePorts.UnsortedList())
for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() {
err := conntrack.ClearEntriesForPort(proxier.exec, nodePort, isIPv6, v1.ProtocolUDP)
if err != nil {
klog.Errorf("Failed to clear udp conntrack on port %d: %v", nodePort, err)
}
}
klog.V(4).Infof("Deleting stale endpoint connections %v", endpointUpdateResult.StaleEndpoints)
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
}

Expand Down
97 changes: 94 additions & 3 deletions pkg/proxy/iptables/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ import (
"testing"
"time"

"k8s.io/klog/v2"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
Expand Down Expand Up @@ -454,6 +453,14 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) {
}
}

// fakeCloseable implements utilproxy.Closeable
type fakeCloseable struct{}

// Close fakes out the close() used by syncProxyRules to release a local port.
func (f *fakeCloseable) Close() error {
return nil
}

// fakePortOpener implements portOpener.
type fakePortOpener struct {
openPorts []*utilproxy.LocalPort
Expand All @@ -463,7 +470,7 @@ type fakePortOpener struct {
// to lock a local port.
func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
f.openPorts = append(f.openPorts, lp)
return nil, nil
return &fakeCloseable{}, nil
}

const testHostname = "test-hostname"
Expand Down Expand Up @@ -2682,4 +2689,88 @@ COMMIT
assert.NotEqual(t, expectedIPTablesWithSlice, fp.iptablesData.String())
}

func TestProxierDeleteNodePortStaleUDP(t *testing.T) {
fcmd := fakeexec.FakeCmd{}
fexec := fakeexec.FakeExec{
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
execFunc := func(cmd string, args ...string) exec.Cmd {
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
}
cmdOutput := "1 flow entries have been deleted"
cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil }

// Delete ClusterIP entries
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
// Delete NodePort entries
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
fexec.CommandScript = append(fexec.CommandScript, execFunc)

ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt, false)
fp.exec = &fexec

svcIP := "10.20.30.41"
svcPort := 80
nodePort := 31201
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
Protocol: v1.ProtocolUDP,
}

makeServiceMap(fp,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.ClusterIP = svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolUDP,
NodePort: int32(nodePort),
}}
}),
)
makeEndpointsMap(fp)

fp.syncProxyRules()
if fexec.CommandCalls != 0 {
t.Fatalf("Created service without endpoints must not clear conntrack entries")
}

epIP := "10.180.0.1"
makeEndpointsMap(fp,
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIP,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolUDP,
}},
}}
}),
)

fp.syncProxyRules()
if fexec.CommandCalls != 2 {
t.Fatalf("Updated UDP service with new endpoints must clear UDP entries")
}

// Delete ClusterIP Conntrack entries
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p %s", svcIP, strings.ToLower(string((v1.ProtocolUDP))))
actualCommand := strings.Join(fcmd.CombinedOutputLog[0], " ")
if actualCommand != expectCommand {
t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand)
}
// Delete NodePort Conntrack entrie
expectCommand = fmt.Sprintf("conntrack -D -p %s --dport %d", strings.ToLower(string((v1.ProtocolUDP))), nodePort)
actualCommand = strings.Join(fcmd.CombinedOutputLog[1], " ")
if actualCommand != expectCommand {
t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand)
}
}

// TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.

0 comments on commit 1e3c47e

Please sign in to comment.