Skip to content

Commit

Permalink
Support collecting logs from K8s API as fallback for supportbundle (#…
Browse files Browse the repository at this point in the history
…3659)

When the normal supportbundle api fails for some nodes or the controller,
use the kubernetes api instead to collect Pods' logs. Also, in both cases,
clusterinfo will always be collected first.

Signed-off-by: Hang Yan <yhang@vmware.com>
  • Loading branch information
hangyan authored Feb 6, 2025
1 parent 10566ca commit a51535a
Show file tree
Hide file tree
Showing 4 changed files with 387 additions and 36 deletions.
196 changes: 180 additions & 16 deletions pkg/antctl/raw/supportbundle/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@ import (
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"gopkg.in/yaml.v2"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
utilerror "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
Expand All @@ -47,6 +51,8 @@ import (
systemv1beta1 "antrea.io/antrea/pkg/apis/system/v1beta1"
antrea "antrea.io/antrea/pkg/client/clientset/versioned"
systemclientset "antrea.io/antrea/pkg/client/clientset/versioned/typed/system/v1beta1"
"antrea.io/antrea/pkg/util/compress"
"antrea.io/antrea/pkg/util/k8s"
)

const (
Expand Down Expand Up @@ -581,6 +587,20 @@ func controllerRemoteRunE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("failed to create clientset: %w", err)
}

if err := os.MkdirAll(option.dir, 0700); err != nil {
return fmt.Errorf("error when creating output dir: %w", err)
}

f, err := os.Create(filepath.Join(option.dir, "clusterinfo"))
if err != nil {
return err
}
defer f.Close()
err = getClusterInfo(f, k8sClientset)
if err != nil {
return err
}

var controllerClient systemclientset.SupportBundleInterface
var agentClients map[string]systemclientset.SupportBundleInterface

Expand Down Expand Up @@ -625,29 +645,17 @@ func controllerRemoteRunE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("no matched Nodes found to collect agent bundles")
}

if err := os.MkdirAll(option.dir, 0700|os.ModeDir); err != nil {
return fmt.Errorf("error when creating output dir: %w", err)
}
amount := len(agentClients) * 2
if controllerClient != nil {
amount += 2
}
bar := barTmpl.Start(amount)
defer bar.Finish()
defer bar.Set("prefix", "Finish ")
f, err := os.Create(filepath.Join(option.dir, "clusterinfo"))
if err != nil {
return err
}
defer f.Close()
err = getClusterInfo(f, k8sClientset)
if err != nil {
return err
}

results := requestAll(ctx, agentClients, controllerClient, bar)
results = downloadAll(ctx, agentClients, controllerClient, dir, bar, results)
return processResults(results, dir)
return processResults(ctx, antreaClientset, k8sClientset, results, dir)
}

func genErrorMsg(resultMap map[string]error) string {
Expand All @@ -659,8 +667,9 @@ func genErrorMsg(resultMap map[string]error) string {
}

// processResults will output the failed nodes and their reasons if any. If no data was collected,
// error is returned, otherwise will return nil.
func processResults(resultMap map[string]error, dir string) error {
// error is returned, otherwise will return nil. For failed nodes and controller, will also trying to get logs from
// kubernetes api.
func processResults(ctx context.Context, antreaClientset antrea.Interface, k8sClient kubernetes.Interface, resultMap map[string]error, dir string) error {
resultStr := ""
var failedNodes []string
allFailed := true
Expand All @@ -676,7 +685,8 @@ func processResults(resultMap map[string]error, dir string) error {
}
}

if resultMap[""] != nil {
controllerFailed := resultMap[""] != nil
if controllerFailed {
fmt.Println("Controller Info Failed Reason: " + resultMap[""].Error())
}

Expand All @@ -689,9 +699,163 @@ func processResults(resultMap map[string]error, dir string) error {
err = writeFailedNodes(dir, failedNodes)
}

// download logs from kubernetes api
if failedNodes != nil {
if err = downloadFallbackAgentBundleFromKubernetes(ctx, antreaClientset, k8sClient, failedNodes, dir); err != nil {
fmt.Println("Failed to download agent bundle from kubernetes api: " + err.Error())
} else {
allFailed = false
}
}
if controllerFailed {
if err = downloadFallbackControllerBundleFromKubernetes(ctx, antreaClientset, k8sClient, dir); err != nil {
fmt.Println("Failed to download controller bundle from kubernetes api: " + err.Error())
} else {
allFailed = false
}
}

if allFailed {
return fmt.Errorf("no data was collected: %s", genErrorMsg(resultMap))
} else {
return err
}
}

func downloadFallbackControllerBundleFromKubernetes(ctx context.Context, antreaClientset antrea.Interface, k8sClient kubernetes.Interface, dir string) error {
tmpDir, err := afero.TempDir(defaultFS, "", "bundle_tmp_")
if err != nil {
return err
}
defer defaultFS.RemoveAll(tmpDir)

var podRef *corev1.ObjectReference
if err := func() error {
controllerInfo, err := antreaClientset.CrdV1beta1().AntreaControllerInfos().Get(ctx, v1beta1.AntreaControllerInfoResourceName, metav1.GetOptions{})
if err != nil {
return err
}
podRef = &controllerInfo.PodRef
data, err := yaml.Marshal(controllerInfo)
if err != nil {
return err
}
if err := afero.WriteFile(defaultFS, filepath.Join(dir, "controllerinfo"), data, 0644); err != nil {
return err
}
return nil
}(); err != nil {
return err
}
if podRef == nil {
return fmt.Errorf("no podRef found in AntreaControllerInfo")
}
pod, err := k8sClient.CoreV1().Pods(podRef.Namespace).Get(ctx, podRef.Name, metav1.GetOptions{})
if err != nil {
return err
}
if err := downloadPodLogs(ctx, k8sClient, pod.Namespace, pod.Name, k8s.GetPodContainerNames(pod), tmpDir); err != nil {
return err
}
return packPodBundle(pod, dir, tmpDir)
}

func downloadFallbackAgentBundleFromKubernetes(ctx context.Context, antreaClientset antrea.Interface, k8sClient kubernetes.Interface, failedNodes []string, dir string) error {
agentInfoList, err := antreaClientset.CrdV1beta1().AntreaAgentInfos().List(ctx, metav1.ListOptions{ResourceVersion: "0"})
if err != nil {
return err
}

agentInfoMap := map[string]v1beta1.AntreaAgentInfo{}
for _, agentInfo := range agentInfoList.Items {
agentInfoMap[agentInfo.Name] = agentInfo
}
pods, err := k8sClient.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{
ResourceVersion: "0",
LabelSelector: "app=antrea,component=antrea-agent",
})
if err != nil {
return err
}
failedNodeSet := sets.NewString(failedNodes...)
var errors []error
for _, pod := range pods.Items {
if !failedNodeSet.Has(pod.Spec.NodeName) {
continue
}
if err := func() error {
tmpDir, err := afero.TempDir(defaultFS, "", "bundle_tmp_")
if err != nil {
return err
}
defer defaultFS.RemoveAll(tmpDir)
if agentInfo, ok := agentInfoMap[pod.Spec.NodeName]; ok {
data, err := yaml.Marshal(agentInfo)
if err != nil {
return err
}
if err = afero.WriteFile(defaultFS, filepath.Join(tmpDir, "agentinfo"), data, 0644); err != nil {
return err
}
}
err = downloadPodLogs(ctx, k8sClient, pod.Namespace, pod.Name, k8s.GetPodContainerNames(&pod), tmpDir)
if err != nil {
return err
}
return packPodBundle(&pod, dir, tmpDir)
}(); err != nil {
errors = append(errors, err)
}
}
return utilerror.NewAggregate(errors)
}

func packPodBundle(pod *corev1.Pod, dir string, bundleDir string) error {
prefix := "agent_"
if strings.Contains(pod.Name, "controller") {
prefix = "controller_"
}
gzFileName := filepath.Join(dir, prefix+pod.Spec.NodeName+".tar.gz")
f, err := defaultFS.Create(gzFileName)
if err != nil {
return err
}
defer f.Close()
_, err = compress.PackDir(defaultFS, bundleDir, f)
return err
}

func downloadPodLogs(ctx context.Context, k8sClient kubernetes.Interface, namespace string, podName string, containers []string, dir string) error {
downloadContainerLogs := func(containerName string) error {
containerDirName, _ := strings.CutPrefix(containerName, "antrea-")
containerLogDir := filepath.Join(dir, "logs", containerDirName)
err := os.MkdirAll(containerLogDir, 0755)
if err != nil {
return err
}
fileName := filepath.Join(containerLogDir, containerName+".log")
f, err := defaultFS.Create(fileName)
if err != nil {
return err
}
defer f.Close()
logOption := &corev1.PodLogOptions{
Container: containerName,
}
logs := k8sClient.CoreV1().Pods(namespace).GetLogs(podName, logOption)
logStream, err := logs.Stream(ctx)
if err != nil {
return err
}

if _, err = io.Copy(f, logStream); err != nil {
return err
}
return logStream.Close()
}
var errors []error
for _, containerName := range containers {
errors = append(errors, downloadContainerLogs(containerName))
}
return utilerror.NewAggregate(errors)
}
Loading

0 comments on commit a51535a

Please sign in to comment.