From a51535a84e5a1d3cd608c01b98cfc9598af11ff0 Mon Sep 17 00:00:00 2001 From: Hang Yan Date: Fri, 7 Feb 2025 02:05:28 +0800 Subject: [PATCH] Support collecting logs from K8s API as fallback for supportbundle (#3659) When the normal supportbundle api fails for some nodes or the controller, use the kubernetes api instead to collect Pods' logs. Also, in both cases, clusterinfo will always be collected first. Signed-off-by: Hang Yan --- pkg/antctl/raw/supportbundle/command.go | 196 +++++++++++++++++-- pkg/antctl/raw/supportbundle/command_test.go | 151 ++++++++++++-- pkg/util/compress/compress.go | 64 ++++++ pkg/util/k8s/pod.go | 12 ++ 4 files changed, 387 insertions(+), 36 deletions(-) diff --git a/pkg/antctl/raw/supportbundle/command.go b/pkg/antctl/raw/supportbundle/command.go index d5787929dd2..6feb9dfd2e3 100644 --- a/pkg/antctl/raw/supportbundle/command.go +++ b/pkg/antctl/raw/supportbundle/command.go @@ -33,10 +33,14 @@ import ( "golang.org/x/sync/errgroup" "golang.org/x/time/rate" "gopkg.in/yaml.v2" + + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" + utilerror "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog/v2" @@ -47,6 +51,8 @@ import ( systemv1beta1 "antrea.io/antrea/pkg/apis/system/v1beta1" antrea "antrea.io/antrea/pkg/client/clientset/versioned" systemclientset "antrea.io/antrea/pkg/client/clientset/versioned/typed/system/v1beta1" + "antrea.io/antrea/pkg/util/compress" + "antrea.io/antrea/pkg/util/k8s" ) const ( @@ -581,6 +587,20 @@ func controllerRemoteRunE(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to create clientset: %w", err) } + if err := os.MkdirAll(option.dir, 0700); err != nil { + return fmt.Errorf("error when creating output dir: %w", err) + } + + f, err := os.Create(filepath.Join(option.dir, "clusterinfo")) + if err != nil { + return err + } + defer f.Close() + err = getClusterInfo(f, k8sClientset) + if err != nil { + return err + } + var controllerClient systemclientset.SupportBundleInterface var agentClients map[string]systemclientset.SupportBundleInterface @@ -625,9 +645,6 @@ func controllerRemoteRunE(cmd *cobra.Command, args []string) error { return fmt.Errorf("no matched Nodes found to collect agent bundles") } - if err := os.MkdirAll(option.dir, 0700|os.ModeDir); err != nil { - return fmt.Errorf("error when creating output dir: %w", err) - } amount := len(agentClients) * 2 if controllerClient != nil { amount += 2 @@ -635,19 +652,10 @@ func controllerRemoteRunE(cmd *cobra.Command, args []string) error { bar := barTmpl.Start(amount) defer bar.Finish() defer bar.Set("prefix", "Finish ") - f, err := os.Create(filepath.Join(option.dir, "clusterinfo")) - if err != nil { - return err - } - defer f.Close() - err = getClusterInfo(f, k8sClientset) - if err != nil { - return err - } results := requestAll(ctx, agentClients, controllerClient, bar) results = downloadAll(ctx, agentClients, controllerClient, dir, bar, results) - return processResults(results, dir) + return processResults(ctx, antreaClientset, k8sClientset, results, dir) } func genErrorMsg(resultMap map[string]error) string { @@ -659,8 +667,9 @@ func genErrorMsg(resultMap map[string]error) string { } // processResults will output the failed nodes and their reasons if any. If no data was collected, -// error is returned, otherwise will return nil. -func processResults(resultMap map[string]error, dir string) error { +// error is returned, otherwise will return nil. For failed nodes and controller, will also trying to get logs from +// kubernetes api. +func processResults(ctx context.Context, antreaClientset antrea.Interface, k8sClient kubernetes.Interface, resultMap map[string]error, dir string) error { resultStr := "" var failedNodes []string allFailed := true @@ -676,7 +685,8 @@ func processResults(resultMap map[string]error, dir string) error { } } - if resultMap[""] != nil { + controllerFailed := resultMap[""] != nil + if controllerFailed { fmt.Println("Controller Info Failed Reason: " + resultMap[""].Error()) } @@ -689,9 +699,163 @@ func processResults(resultMap map[string]error, dir string) error { err = writeFailedNodes(dir, failedNodes) } + // download logs from kubernetes api + if failedNodes != nil { + if err = downloadFallbackAgentBundleFromKubernetes(ctx, antreaClientset, k8sClient, failedNodes, dir); err != nil { + fmt.Println("Failed to download agent bundle from kubernetes api: " + err.Error()) + } else { + allFailed = false + } + } + if controllerFailed { + if err = downloadFallbackControllerBundleFromKubernetes(ctx, antreaClientset, k8sClient, dir); err != nil { + fmt.Println("Failed to download controller bundle from kubernetes api: " + err.Error()) + } else { + allFailed = false + } + } + if allFailed { return fmt.Errorf("no data was collected: %s", genErrorMsg(resultMap)) } else { return err } } + +func downloadFallbackControllerBundleFromKubernetes(ctx context.Context, antreaClientset antrea.Interface, k8sClient kubernetes.Interface, dir string) error { + tmpDir, err := afero.TempDir(defaultFS, "", "bundle_tmp_") + if err != nil { + return err + } + defer defaultFS.RemoveAll(tmpDir) + + var podRef *corev1.ObjectReference + if err := func() error { + controllerInfo, err := antreaClientset.CrdV1beta1().AntreaControllerInfos().Get(ctx, v1beta1.AntreaControllerInfoResourceName, metav1.GetOptions{}) + if err != nil { + return err + } + podRef = &controllerInfo.PodRef + data, err := yaml.Marshal(controllerInfo) + if err != nil { + return err + } + if err := afero.WriteFile(defaultFS, filepath.Join(dir, "controllerinfo"), data, 0644); err != nil { + return err + } + return nil + }(); err != nil { + return err + } + if podRef == nil { + return fmt.Errorf("no podRef found in AntreaControllerInfo") + } + pod, err := k8sClient.CoreV1().Pods(podRef.Namespace).Get(ctx, podRef.Name, metav1.GetOptions{}) + if err != nil { + return err + } + if err := downloadPodLogs(ctx, k8sClient, pod.Namespace, pod.Name, k8s.GetPodContainerNames(pod), tmpDir); err != nil { + return err + } + return packPodBundle(pod, dir, tmpDir) +} + +func downloadFallbackAgentBundleFromKubernetes(ctx context.Context, antreaClientset antrea.Interface, k8sClient kubernetes.Interface, failedNodes []string, dir string) error { + agentInfoList, err := antreaClientset.CrdV1beta1().AntreaAgentInfos().List(ctx, metav1.ListOptions{ResourceVersion: "0"}) + if err != nil { + return err + } + + agentInfoMap := map[string]v1beta1.AntreaAgentInfo{} + for _, agentInfo := range agentInfoList.Items { + agentInfoMap[agentInfo.Name] = agentInfo + } + pods, err := k8sClient.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{ + ResourceVersion: "0", + LabelSelector: "app=antrea,component=antrea-agent", + }) + if err != nil { + return err + } + failedNodeSet := sets.NewString(failedNodes...) + var errors []error + for _, pod := range pods.Items { + if !failedNodeSet.Has(pod.Spec.NodeName) { + continue + } + if err := func() error { + tmpDir, err := afero.TempDir(defaultFS, "", "bundle_tmp_") + if err != nil { + return err + } + defer defaultFS.RemoveAll(tmpDir) + if agentInfo, ok := agentInfoMap[pod.Spec.NodeName]; ok { + data, err := yaml.Marshal(agentInfo) + if err != nil { + return err + } + if err = afero.WriteFile(defaultFS, filepath.Join(tmpDir, "agentinfo"), data, 0644); err != nil { + return err + } + } + err = downloadPodLogs(ctx, k8sClient, pod.Namespace, pod.Name, k8s.GetPodContainerNames(&pod), tmpDir) + if err != nil { + return err + } + return packPodBundle(&pod, dir, tmpDir) + }(); err != nil { + errors = append(errors, err) + } + } + return utilerror.NewAggregate(errors) +} + +func packPodBundle(pod *corev1.Pod, dir string, bundleDir string) error { + prefix := "agent_" + if strings.Contains(pod.Name, "controller") { + prefix = "controller_" + } + gzFileName := filepath.Join(dir, prefix+pod.Spec.NodeName+".tar.gz") + f, err := defaultFS.Create(gzFileName) + if err != nil { + return err + } + defer f.Close() + _, err = compress.PackDir(defaultFS, bundleDir, f) + return err +} + +func downloadPodLogs(ctx context.Context, k8sClient kubernetes.Interface, namespace string, podName string, containers []string, dir string) error { + downloadContainerLogs := func(containerName string) error { + containerDirName, _ := strings.CutPrefix(containerName, "antrea-") + containerLogDir := filepath.Join(dir, "logs", containerDirName) + err := os.MkdirAll(containerLogDir, 0755) + if err != nil { + return err + } + fileName := filepath.Join(containerLogDir, containerName+".log") + f, err := defaultFS.Create(fileName) + if err != nil { + return err + } + defer f.Close() + logOption := &corev1.PodLogOptions{ + Container: containerName, + } + logs := k8sClient.CoreV1().Pods(namespace).GetLogs(podName, logOption) + logStream, err := logs.Stream(ctx) + if err != nil { + return err + } + + if _, err = io.Copy(f, logStream); err != nil { + return err + } + return logStream.Close() + } + var errors []error + for _, containerName := range containers { + errors = append(errors, downloadContainerLogs(containerName)) + } + return utilerror.NewAggregate(errors) +} diff --git a/pkg/antctl/raw/supportbundle/command_test.go b/pkg/antctl/raw/supportbundle/command_test.go index cbca643e7f2..5d7ae2ec61d 100644 --- a/pkg/antctl/raw/supportbundle/command_test.go +++ b/pkg/antctl/raw/supportbundle/command_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 Antrea Authors +// copyright 2023 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import ( "errors" "fmt" "path/filepath" + "strings" "testing" "time" @@ -41,6 +42,7 @@ import ( fakeclientset "antrea.io/antrea/pkg/client/clientset/versioned/fake" "antrea.io/antrea/pkg/client/clientset/versioned/scheme" systemclientset "antrea.io/antrea/pkg/client/clientset/versioned/typed/system/v1beta1" + "antrea.io/antrea/pkg/util/compress" ) var ( @@ -59,6 +61,10 @@ var ( Kind: "Node", Name: "node-1", }, + PodRef: v1.ObjectReference{ + Name: "antrea-controller-1", + Namespace: "kube-system", + }, } node1 = v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -76,7 +82,11 @@ var ( } node2 = v1.Node{ ObjectMeta: metav1.ObjectMeta{ - Name: "node-1", + Name: "node-1", + ResourceVersion: "0", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{}, }, } node3 = v1.Node{ @@ -95,7 +105,7 @@ var ( } agentInfo1 = &v1beta1.AntreaAgentInfo{ ObjectMeta: metav1.ObjectMeta{ - Name: "antrea-agent-1", + Name: "node-1", }, APIPort: 0, PodRef: v1.ObjectReference{ @@ -108,7 +118,7 @@ var ( } agentInfo2 = &v1beta1.AntreaAgentInfo{ ObjectMeta: metav1.ObjectMeta{ - Name: "antrea-agent-2", + Name: "node-2", }, APIPort: 0, PodRef: v1.ObjectReference{ @@ -119,6 +129,63 @@ var ( Name: "node-3", }, } + controllerPod = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "antrea-controller-1", + Namespace: "kube-system", + Labels: map[string]string{ + "app": "antrea", + "component": "antrea-controller", + }, + }, + Spec: v1.PodSpec{ + NodeName: "node-1", + Containers: []v1.Container{ + { + Name: "antrea-controller", + }, + }, + }, + } + pod1 = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "antrea-agent-1", + Namespace: "kube-system", + Labels: map[string]string{ + "app": "antrea", + "component": "antrea-agent", + }, + }, + Spec: v1.PodSpec{ + NodeName: "node-1", + Containers: []v1.Container{ + { + Name: "antrea-agent", + }, + { + Name: "antrea-ovs", + }, + }, + }, + } + pod2 = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "antrea-agent-2", + Namespace: "kube-system", + Labels: map[string]string{ + "app": "antrea", + "component": "antrea-agent", + }, + }, + Spec: v1.PodSpec{ + NodeName: "node-2", + Containers: []v1.Container{ + { + Name: "antrea-agent", + }, + }, + }, + } nameList = []string{"node-1", "node-3"} ) @@ -320,9 +387,9 @@ func TestProcessResults(t *testing.T) { option.dir = path }() tests := []struct { - name string - resultMap map[string]error - expectedErr string + name string + resultMap map[string]error + expectFileList map[string][]string }{ { name: "All nodes failed", @@ -331,7 +398,20 @@ func TestProcessResults(t *testing.T) { "node-1": fmt.Errorf("error-1"), "node-2": fmt.Errorf("error-2"), }, - expectedErr: "no data was collected:", + expectFileList: map[string][]string{ + "": { + filepath.Join("logs", "controller", "antrea-controller.log"), + }, + "node-1": { + "agentinfo", + filepath.Join("logs", "ovs", "antrea-ovs.log"), + filepath.Join("logs", "agent", "antrea-agent.log"), + }, + "node-2": { + "agentinfo", + filepath.Join("logs", "agent", "antrea-agent.log"), + }, + }, }, { name: "Not all nodes failed", @@ -340,28 +420,59 @@ func TestProcessResults(t *testing.T) { "node-1": fmt.Errorf("error-1"), "node-2": nil, }, + expectFileList: map[string][]string{ + "": { + filepath.Join("logs", "controller", "antrea-controller.log"), + }, + "node-1": { + "agentinfo", + filepath.Join("logs", "ovs", "antrea-ovs.log"), + filepath.Join("logs", "agent", "antrea-agent.log"), + }, + }, }, } + defaultFS = afero.NewMemMapFs() + defer func() { + defaultFS = afero.NewOsFs() + option.dir = "" + }() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - defaultFS = afero.NewMemMapFs() + option.dir = strings.ReplaceAll(tt.name, " ", "-") defaultFS.MkdirAll(option.dir, 0755) - defer func() { - defaultFS = afero.NewOsFs() - }() - - err := processResults(tt.resultMap, option.dir) - if tt.expectedErr != "" { - require.ErrorContains(t, err, tt.expectedErr) - } else { - require.NoError(t, err) - } - // Both test cases above have failed Nodes, hence this file should always be created/ + antreaInterface := fakeclientset.NewSimpleClientset(&controllerInfo, agentInfo1, agentInfo2) + k8sClient := fake.NewSimpleClientset(controllerPod, pod1, pod2) + require.NoError(t, processResults(context.TODO(), antreaInterface, k8sClient, tt.resultMap, option.dir)) b, err := afero.ReadFile(defaultFS, filepath.Join(option.dir, "failed_nodes")) require.NoError(t, err) data := string(b) + ok, checkErr := afero.Exists(defaultFS, filepath.Join(option.dir, "controllerinfo")) + require.NoError(t, checkErr) + assert.True(t, ok) + for node, err := range tt.resultMap { + tgzFileName := fmt.Sprintf("agent_%s.tar.gz", node) + if node == "" { + tgzFileName = "controller_node-1.tar.gz" + } + if err != nil { + // fallback path to retrieve data from kubernetes API instead of Antrea API. + ok, checkErr := afero.Exists(defaultFS, filepath.Join(option.dir, tgzFileName)) + require.NoError(t, checkErr) + require.True(t, ok, "expected bundle file %s not found", tgzFileName) + + unpackError := compress.UnpackDir(defaultFS, filepath.Join(option.dir, tgzFileName), option.dir) + require.NoError(t, unpackError) + files, _ := tt.expectFileList[node] + for _, expectFileName := range files { + ok, checkErr = afero.Exists(defaultFS, filepath.Join(option.dir, expectFileName)) + require.NoError(t, checkErr) + assert.True(t, ok, "expected bundle file %s for %s not found", expectFileName, node) + } + + } if node == "" { continue } diff --git a/pkg/util/compress/compress.go b/pkg/util/compress/compress.go index 3567c5d6faf..0991bd90be7 100644 --- a/pkg/util/compress/compress.go +++ b/pkg/util/compress/compress.go @@ -18,6 +18,8 @@ import ( "archive/tar" "compress/gzip" "crypto/sha256" + "errors" + "fmt" "io" "os" "path/filepath" @@ -26,6 +28,68 @@ import ( "github.com/spf13/afero" ) +// Sanitize archive file pathing from "G305: Zip Slip vulnerability" +func sanitizeArchivePath(d, t string) (string, error) { + v := filepath.Join(d, t) + if strings.HasPrefix(v, filepath.Clean(d)) { + return v, nil + } + return "", fmt.Errorf("%s: %s", "content filepath is tainted", t) +} + +func UnpackDir(fs afero.Fs, fileName string, targetDir string) error { + file, err := fs.Open(fileName) + if err != nil { + return err + } + defer file.Close() + + reader, err := gzip.NewReader(file) + if err != nil { + return err + } + defer reader.Close() + tarReader := tar.NewReader(reader) + + for true { + header, err := tarReader.Next() + if err == io.EOF { + break + } + if err != nil { + return err + } + targetPath, err := sanitizeArchivePath(targetDir, header.Name) + if err != nil { + return err + } + switch header.Typeflag { + case tar.TypeDir: + if err := fs.Mkdir(targetPath, 0755); err != nil { + return err + } + case tar.TypeReg: + outFile, err := fs.Create(targetPath) + defer outFile.Close() + if err != nil { + return err + } + for { + // to resolve G110: Potential DoS vulnerability via decompression bomb + if _, err := io.CopyN(outFile, tarReader, 1024); err != nil { + if err == io.EOF { + break + } + return err + } + } + default: + return errors.New("unknown type found when reading tgz file") + } + } + return nil +} + func PackDir(fs afero.Fs, dir string, writer io.Writer) ([]byte, error) { hash := sha256.New() gzWriter := gzip.NewWriter(io.MultiWriter(hash, writer)) diff --git a/pkg/util/k8s/pod.go b/pkg/util/k8s/pod.go index f14c2a73a56..e171d0940d5 100644 --- a/pkg/util/k8s/pod.go +++ b/pkg/util/k8s/pod.go @@ -20,3 +20,15 @@ import v1 "k8s.io/api/core/v1" func IsPodTerminated(pod *v1.Pod) bool { return pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded } + +// GetPodContainersNames returns all the container names in a Pod, including init containers. +func GetPodContainerNames(pod *v1.Pod) []string { + var names []string + for _, c := range pod.Spec.InitContainers { + names = append(names, c.Name) + } + for _, c := range pod.Spec.Containers { + names = append(names, c.Name) + } + return names +}