From d3b0f3f7db011e2d8e6781954a87d9f20312b8cf Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Tue, 17 Jan 2023 22:07:48 +0000 Subject: [PATCH] add scale test for probes Change-Id: I1e49943531b569b5e02f82369750a9ca899ae726 --- pkg/kubelet/prober/scale_test.go | 280 +++++++++++++++++++++++++++++++ 1 file changed, 280 insertions(+) create mode 100644 pkg/kubelet/prober/scale_test.go diff --git a/pkg/kubelet/prober/scale_test.go b/pkg/kubelet/prober/scale_test.go new file mode 100644 index 0000000000000..91ec87154c26b --- /dev/null +++ b/pkg/kubelet/prober/scale_test.go @@ -0,0 +1,280 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prober + +import ( + "context" + "fmt" + "net" + "net/http" + "sync" + "sync/atomic" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/kubelet/configmap" + kubepod "k8s.io/kubernetes/pkg/kubelet/pod" + podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" + "k8s.io/kubernetes/pkg/kubelet/prober/results" + "k8s.io/kubernetes/pkg/kubelet/secret" + "k8s.io/kubernetes/pkg/kubelet/status" + statustest "k8s.io/kubernetes/pkg/kubelet/status/testing" + kubeletutil "k8s.io/kubernetes/pkg/kubelet/util" + utilpointer "k8s.io/utils/pointer" +) + +// TCP sockets goes through a TIME-WAIT state (default 60 sec) before being freed, +// causing conntrack entries and ephemeral ports to be hold for 60 seconds +// despite the probe may have finished in less than 1 second. +// If the rate of probes is higher than the rate the OS recycles the ports used, +// it can consume a considerable number of ephemeral ports or conntrack entries. +// These tests verify that after certain period the probes keep working, if the probes +// don't close the sockets faster, they will start to fail. +// The test creates a TCP or HTTP server to fake a pod. It creates 1 pod with 600 fake +// containers each and runs one probe for each of these containers (all the probes comes +// from the same process, same as in the Kubelet, and targets the same IP:port to verify +// that the ephemeral port is not exhausted. + +// The default port range on a normal Linux system has 28321 free ephemeral ports per +// tuple srcIP,srcPort:dstIP:dstPort:Proto: /proc/sys/net/ipv4/ip_local_port_range 32768 60999 +// 1 pods x 600 containers/pod x 1 probes/container x 1 req/sec = 600 req/sec +// 600 req/sec x 59 sec = 35400 +// The test should run out of ephemeral ports in less than one minute and start failing connections +// Ref: https://github.com/kubernetes/kubernetes/issues/89898#issuecomment-1383207322 + +func TestTCPPortExhaustion(t *testing.T) { + // This test creates a considereable number of connections in a short time + // and flakes on constrained environments, thus it is skipped by default. + // The test is left for manual verification or experimentation with new + // changes on the probes. + t.Skip("skipping TCP port exhaustion tests") + + const ( + numTestPods = 1 + numContainers = 600 + ) + + if testing.Short() { + t.Skip("skipping TCP port exhaustion in short mode") + } + + tests := []struct { + name string + http bool // it can be tcp or http + }{ + {"TCP", false}, + {"HTTP", true}, + } + for _, tt := range tests { + t.Run(fmt.Sprintf(tt.name), func(t *testing.T) { + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager()) + podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() + m := NewManager( + status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker), + results.NewManager(), + results.NewManager(), + results.NewManager(), + nil, // runner + &record.FakeRecorder{}, + ).(*manager) + defer cleanup(t, m) + + now := time.Now() + fakePods := make([]*fakePod, numTestPods) + for i := 0; i < numTestPods; i++ { + fake, err := newFakePod(tt.http) + if err != nil { + t.Fatalf("unexpected error creating fake pod: %v", err) + } + defer fake.stop() + handler := fake.probeHandler() + fakePods[i] = fake + + pod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(fmt.Sprintf("pod%d", i)), + Name: fmt.Sprintf("pod%d", i), + Namespace: "test", + }, + Spec: v1.PodSpec{}, + Status: v1.PodStatus{ + Phase: v1.PodPhase(v1.PodReady), + PodIPs: []v1.PodIP{{IP: "127.0.0.1"}}, + }, + } + for j := 0; j < numContainers; j++ { + // use only liveness probes for simplicity, initial state is success for them + pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{ + Name: fmt.Sprintf("container%d", j), + LivenessProbe: newProbe(handler), + }) + pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{ + Name: fmt.Sprintf("container%d", j), + ContainerID: fmt.Sprintf("pod%d://container%d", i, j), + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{ + StartedAt: metav1.Now(), + }, + }, + Started: utilpointer.Bool(true), + }) + } + podManager.AddPod(&pod) + m.statusManager.SetPodStatus(&pod, pod.Status) + m.AddPod(&pod) + } + t.Logf("Adding %d pods with %d containers each in %v", numTestPods, numContainers, time.Since(now)) + + ctx, cancel := context.WithTimeout(context.Background(), 59*time.Second) + defer cancel() + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + for { + var result results.Update + var probeType string + select { + case result = <-m.startupManager.Updates(): + probeType = "startup" + case result = <-m.livenessManager.Updates(): + probeType = "liveness" + case result = <-m.readinessManager.Updates(): + probeType = "readiness" + case <-ctx.Done(): + return + } + switch result.Result.String() { + // The test will fail if any of the probes fails + case "Failure": + t.Errorf("Failure %s on contantinerID: %v Pod %v", probeType, result.ContainerID, result.PodUID) + case "UNKNOWN": // startup probes + t.Logf("UNKNOWN state for %v", result) + default: + } + } + }() + wg.Wait() + + // log the number of connections received in each pod for debugging test failures. + for _, pod := range fakePods { + n := pod.connections() + t.Logf("Number of connections %d", n) + } + + }) + } + +} + +func newProbe(handler v1.ProbeHandler) *v1.Probe { + return &v1.Probe{ + ProbeHandler: handler, + TimeoutSeconds: 1, + PeriodSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + } +} + +// newFakePod runs a server (TCP or HTTP) in a random port +func newFakePod(httpServer bool) (*fakePod, error) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, fmt.Errorf("failed to bind: %v", err) + } + f := &fakePod{ln: ln, http: httpServer} + + // spawn an http server or a TCP server that counts the number of connections received + if httpServer { + var mu sync.Mutex + visitors := map[string]struct{}{} + go http.Serve(ln, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + defer mu.Unlock() + if _, ok := visitors[r.RemoteAddr]; !ok { + atomic.AddInt64(&f.numConnection, 1) + visitors[r.RemoteAddr] = struct{}{} + } + })) + } else { + go func() { + for { + conn, err := ln.Accept() + if err != nil { + // exit when the listener is closed + return + } + atomic.AddInt64(&f.numConnection, 1) + // handle request but not block + go func(c net.Conn) { + defer c.Close() + // read but swallow the errors since the probe doesn't send data + buffer := make([]byte, 1024) + c.Read(buffer) + // respond + conn.Write([]byte("Hi back!\n")) + }(conn) + + } + }() + } + return f, nil + +} + +type fakePod struct { + ln net.Listener + numConnection int64 + http bool +} + +func (f *fakePod) probeHandler() v1.ProbeHandler { + port := f.ln.Addr().(*net.TCPAddr).Port + var handler v1.ProbeHandler + if f.http { + handler = v1.ProbeHandler{ + HTTPGet: &v1.HTTPGetAction{ + Host: "127.0.0.1", + Port: intstr.FromInt(port), + }, + } + } else { + handler = v1.ProbeHandler{ + TCPSocket: &v1.TCPSocketAction{ + Host: "127.0.0.1", + Port: intstr.FromInt(port), + }, + } + } + return handler +} + +func (f *fakePod) stop() { + f.ln.Close() +} + +func (f *fakePod) connections() int { + return int(atomic.LoadInt64(&f.numConnection)) +}