Skip to content

Commit

Permalink
testutils: add cluster.WaitForPods() (#3875)
Browse files Browse the repository at this point in the history
  • Loading branch information
rmfitzpatrick authored Nov 9, 2023
1 parent cb8e08e commit 568d638
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 121 deletions.
30 changes: 4 additions & 26 deletions tests/general/discoverymode/k8s_observer_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,33 +67,11 @@ func TestK8sObserver(t *testing.T) {
sout, serr, err = cluster.Apply(ds.Render(t))
require.NoError(t, err, "stdout: %s, stderr: %s", sout, serr)

require.Eventually(t, func() bool {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
rPod, err := cluster.Clientset.CoreV1().Pods(namespace.Name).Get(ctx, redisName, metav1.GetOptions{})
require.NoError(t, err)
tc.Logger.Debug(fmt.Sprintf("redis is: %s\n", rPod.Status.Phase))
return rPod.Status.Phase == corev1.PodRunning
}, 5*time.Minute, 1*time.Second)
cluster.WaitForPods(redisName, namespace.Name, 5*time.Minute)

var collectorPodName string
require.Eventually(t, func() bool {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
dsPods, err := cluster.Clientset.CoreV1().Pods(namespace.Name).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("name = %s", ds.Name),
})
require.NoError(t, err)
if len(dsPods.Items) > 0 {
collectorPod := dsPods.Items[0]
tc.Logger.Debug(fmt.Sprintf("collector is: %s\n", collectorPod.Status.Phase))
cPod, err := cluster.Clientset.CoreV1().Pods(collectorPod.Namespace).Get(ctx, collectorPod.Name, metav1.GetOptions{})
collectorPodName = cPod.Name
require.NoError(t, err)
return cPod.Status.Phase == corev1.PodRunning
}
return false
}, 5*time.Minute, 1*time.Second)
pods := cluster.WaitForPods(ds.Name, namespace.Name, 5*time.Minute)
require.Len(t, pods, 1)
collectorPodName := pods[0].Name

expectedMetrics := tc.ResourceMetrics("k8s-observer-smart-agent-redis.yaml")
require.NoError(t, tc.OTLPReceiverSink.AssertAllMetricsReceived(t, *expectedMetrics, 30*time.Second))
Expand Down
44 changes: 3 additions & 41 deletions tests/receivers/discovery/k8s_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,7 @@ func TestDiscoveryReceiverWithK8sObserverProvidesEndpointLogs(t *testing.T) {
sout, serr, err = cluster.Apply(manifests.RenderAll(t, clusterRole, clusterRoleBinding, ds))
require.NoError(t, err, "stdout: %s, stderr: %s", sout, serr)

require.Eventually(t, func() bool {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
dsPods, err := cluster.Clientset.CoreV1().Pods(namespace.Name).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("name = %s", ds.Name),
})
require.NoError(t, err)
if len(dsPods.Items) > 0 {
collectorPod := dsPods.Items[0]
tc.Logger.Debug(fmt.Sprintf("collector is: %s\n", collectorPod.Status.Phase))
cPod, err := cluster.Clientset.CoreV1().Pods(collectorPod.Namespace).Get(ctx, collectorPod.Name, metav1.GetOptions{})
require.NoError(t, err)
return cPod.Status.Phase == corev1.PodRunning
}
return false
}, 5*time.Minute, 1*time.Second)
cluster.WaitForPods(ds.Name, namespace.Name, 5*time.Minute)

expectedResourceLogs := tc.ResourceLogs("k8s_observer_endpoints.yaml")
require.NoError(t, tc.OTLPReceiverSink.AssertAllLogsReceived(t, *expectedResourceLogs, 30*time.Second))
Expand Down Expand Up @@ -135,31 +120,8 @@ func TestDiscoveryReceiverWithK8sObserverAndSmartAgentRedisReceiverProvideStatus
sout, serr, err = cluster.Apply(manifests.RenderAll(t, clusterRole, clusterRoleBinding, ds))
require.NoError(t, err, "stdout: %s, stderr: %s", sout, serr)

require.Eventually(t, func() bool {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
rPod, err := cluster.Clientset.CoreV1().Pods(namespace.Name).Get(ctx, redis, metav1.GetOptions{})
require.NoError(t, err)
tc.Logger.Debug(fmt.Sprintf("redis is: %s\n", rPod.Status.Phase))
return rPod.Status.Phase == corev1.PodRunning
}, 5*time.Minute, 1*time.Second)

require.Eventually(t, func() bool {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
dsPods, err := cluster.Clientset.CoreV1().Pods(namespace.Name).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("name = %s", ds.Name),
})
require.NoError(t, err)
if len(dsPods.Items) > 0 {
collectorPod := dsPods.Items[0]
tc.Logger.Debug(fmt.Sprintf("collector is: %s\n", collectorPod.Status.Phase))
cPod, err := cluster.Clientset.CoreV1().Pods(collectorPod.Namespace).Get(ctx, collectorPod.Name, metav1.GetOptions{})
require.NoError(t, err)
return cPod.Status.Phase == corev1.PodRunning
}
return false
}, 5*time.Minute, 1*time.Second)
cluster.WaitForPods(redis, namespace.Name, 5*time.Minute)
cluster.WaitForPods(ds.Name, namespace.Name, 5*time.Minute)

expectedResourceLogs := tc.ResourceLogs("k8s_observer_smart_agent_redis_statuses.yaml")
require.NoError(t, tc.OTLPReceiverSink.AssertAllLogsReceived(t, *expectedResourceLogs, 30*time.Second))
Expand Down
31 changes: 4 additions & 27 deletions tests/receivers/smartagent/collectd-mysql/bundled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,9 @@ func TestK8sObserver(t *testing.T) {
sout, serr, err = cluster.Apply(manifests.RenderAll(t, clusterRole, clusterRoleBinding, ds))
require.NoError(t, err, "stdout: %s, stderr: %s", sout, serr)

var collectorName string
// wait for collector to run
require.Eventually(t, func() bool {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
dsPods, err := cluster.Clientset.CoreV1().Pods(namespace.Name).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("name = %s", ds.Name),
})
require.NoError(t, err)
if len(dsPods.Items) > 0 {
collectorPod := dsPods.Items[0]
tc.Logger.Debug(fmt.Sprintf("collector is: %s\n", collectorPod.Status.Phase))
cPod, err := cluster.Clientset.CoreV1().Pods(collectorPod.Namespace).Get(ctx, collectorPod.Name, metav1.GetOptions{})
require.NoError(t, err)
collectorName = cPod.Name
return cPod.Status.Phase == corev1.PodRunning
}
return false
}, 5*time.Minute, 1*time.Second)
pods := cluster.WaitForPods(ds.Name, namespace.Name, 5*time.Minute)
require.Len(t, pods, 1)
collectorName := pods[0].Name

expectedMetrics := tc.ResourceMetrics("all.yaml")
require.NoError(t, tc.OTLPReceiverSink.AssertAllMetricsReceived(t, *expectedMetrics, 30*time.Second))
Expand Down Expand Up @@ -149,14 +133,7 @@ func (cluster testCluster) createMySQL(name, namespace, serviceAccount string) s
)
require.NoError(cluster.Testcase, err)

require.Eventually(cluster.Testcase, func() bool {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
rPod, err := cluster.Clientset.CoreV1().Pods(namespace).Get(ctx, mysql.Name, metav1.GetOptions{})
require.NoError(cluster.Testcase, err)
cluster.Testcase.Logger.Debug(fmt.Sprintf("mysql is: %s\n", rPod.Status.Phase))
return rPod.Status.Phase == corev1.PodRunning
}, 5*time.Minute, 1*time.Second)
cluster.WaitForPods(mysql.Name, namespace, 5*time.Minute)

require.Eventually(cluster.Testcase, func() bool {
stdOut, _, err := cluster.Kubectl("logs", "-n", namespace, mysql.Name)
Expand Down
31 changes: 4 additions & 27 deletions tests/receivers/smartagent/postgresql/bundled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,9 @@ func TestK8sObserver(t *testing.T) {
sout, serr, err = cluster.Apply(manifests.RenderAll(t, configMap, ds))
require.NoError(t, err, "stdout: %s, stderr: %s", sout, serr)

var collectorName string
// wait for collector to run
require.Eventually(t, func() bool {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
dsPods, err := cluster.Clientset.CoreV1().Pods(namespace.Name).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("name = %s", ds.Name),
})
require.NoError(t, err)
if len(dsPods.Items) > 0 {
collectorPod := dsPods.Items[0]
tc.Logger.Debug(fmt.Sprintf("collector is: %s\n", collectorPod.Status.Phase))
cPod, err := cluster.Clientset.CoreV1().Pods(collectorPod.Namespace).Get(ctx, collectorPod.Name, metav1.GetOptions{})
require.NoError(t, err)
collectorName = cPod.Name
return cPod.Status.Phase == corev1.PodRunning
}
return false
}, 5*time.Minute, 1*time.Second)
pods := cluster.WaitForPods(ds.Name, namespace.Name, 5*time.Minute)
require.Len(t, pods, 1)
collectorName := pods[0].Name

expectedMetrics := tc.ResourceMetrics("all.yaml")
require.NoError(t, tc.OTLPReceiverSink.AssertAllMetricsReceived(t, *expectedMetrics, 30*time.Second))
Expand Down Expand Up @@ -247,14 +231,7 @@ func (cluster testCluster) createPostgres(name, namespace, serviceAccount string
)
require.NoError(cluster.Testcase, err)

require.Eventually(cluster.Testcase, func() bool {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
rPod, err := cluster.Clientset.CoreV1().Pods(namespace).Get(ctx, postgres.Name, metav1.GetOptions{})
require.NoError(cluster.Testcase, err)
cluster.Testcase.Logger.Debug(fmt.Sprintf("postgres is: %s\n", rPod.Status.Phase))
return rPod.Status.Phase == corev1.PodRunning
}, 5*time.Minute, 1*time.Second)
cluster.WaitForPods(postgres.Name, namespace, 5*time.Minute)
return string(postgres.UID)
}

Expand Down
69 changes: 69 additions & 0 deletions tests/testutils/kubeutils/kind_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"os"
"regexp"
"strings"
"text/template"
"time"
Expand All @@ -30,6 +31,8 @@ import (
docker "github.com/docker/docker/client"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -96,6 +99,13 @@ func (k *KindCluster) Create() {
require.NoError(k.Testcase, err)
k.Clientset, err = kubernetes.NewForConfig(restConfig)
require.NoError(k.Testcase, err)

for _, pod := range []string{
"kube-apiserver", "kube-controller-manager", "kube-scheduler", "etcd-cluster", "coredns", "kindnet", "kube-proxy",
} {
k.WaitForPods(pod, "kube-system", 2*time.Minute)
}
k.WaitForPods("local-path-provisioner", "local-path-storage", 2*time.Minute)
}

func (k *KindCluster) Teardown() {
Expand Down Expand Up @@ -157,6 +167,65 @@ func (k KindCluster) Delete(manifests string) (stdOut, stdErr bytes.Buffer, err
return k.runKubectl(bytes.NewReader([]byte(manifests)), "delete", "-f", k.tmpManifestFile(manifests))
}

func (k KindCluster) WaitForPods(podNameRegex, namespaceName string, duration time.Duration) []*corev1.Pod {
return k.WaitForPodFn(podNameRegex, namespaceName, duration, func(pod *corev1.Pod) bool {
k.Testcase.Logger.Debug(fmt.Sprintf("%s status %v", pod.Name, pod.Status))
for _, cond := range pod.Status.Conditions {
if cond.Type == corev1.PodReady {
return cond.Status == corev1.ConditionTrue
}
}
return false
})
}

func (k KindCluster) WaitForPodFn(podNameRegex, namespaceName string, duration time.Duration, podFn func(*corev1.Pod) bool) []*corev1.Pod {
namePattern, e := regexp.Compile(podNameRegex)
require.NoError(k.Testcase, e)

var running []*corev1.Pod
require.Eventually(k.Testcase, func() bool {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
pods, err := k.Clientset.CoreV1().Pods(namespaceName).List(ctx, metav1.ListOptions{})
require.NoError(k.Testcase, err)
cancel()

matches := map[string]struct{}{}

for i := range pods.Items {
pod := pods.Items[i]
if namePattern.MatchString(pod.Name) {
matches[pod.Name] = struct{}{}
}
}

if len(matches) == 0 {
return false
}

var matched []*corev1.Pod
for name := range matches {
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
pod, err := k.Clientset.CoreV1().Pods(namespaceName).Get(ctx, name, metav1.GetOptions{})
require.NoError(k.Testcase, err)
cancel()
if podFn(pod) {
delete(matches, name)
matched = append(matched, pod)
}
}

if len(matches) == 0 {
running = matched
return true
}
return false
}, duration, time.Second)

return running
}

func (k KindCluster) runKubectl(stdin io.Reader, args ...string) (stdOut, stdErr bytes.Buffer, err error) {
stdOut = bytes.Buffer{}
stdErr = bytes.Buffer{}
Expand Down

0 comments on commit 568d638

Please sign in to comment.