Skip to content

Commit

Permalink
Make e2e network client more resilient to errors (#640)
Browse files Browse the repository at this point in the history
  • Loading branch information
TheSpiritXIII authored Oct 19, 2023
1 parent d755898 commit a75efb4
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 7 deletions.
4 changes: 2 additions & 2 deletions e2e/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ func PortForwardClient(t *testing.T, restConfig *rest.Config, kubeClient client.
return nil, fmt.Errorf("unable to resolve TCP addr: %w", err)
}

pod, err := getPodByIP(ctx, kubeClient, addr.IP)
pod, container, err := PodByAddr(ctx, kubeClient, addr)
if err != nil {
return nil, fmt.Errorf("unable to get pod from IP %s: %w", addr.IP, err)
}
if err := waitUntilPodReady(ctx, t, restConfig, kubeClient, pod); err != nil {
if err := WaitForPodContainerReady(ctx, t, restConfig, kubeClient, pod, container); err != nil {
return nil, fmt.Errorf("failed waiting for pod from IP %s: %w", addr.IP, err)
}
resourceURL := restClient.
Expand Down
60 changes: 55 additions & 5 deletions e2e/podutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,41 @@ func containerStateString(state *corev1.ContainerState) string {
return "running"
}

func isPodReady(ctx context.Context, restConfig *rest.Config, pod *corev1.Pod) error {
func IsPodContainerReady(ctx context.Context, restConfig *rest.Config, pod *corev1.Pod, container string) error {
for _, status := range pod.Status.ContainerStatuses {
if status.Name == container && !status.Ready {
key := client.ObjectKeyFromObject(pod)
return fmt.Errorf("pod %s container %s not ready: %s", key, status.Name, containerStateString(&status.State))
}
return nil
}
key := client.ObjectKeyFromObject(pod)
return fmt.Errorf("no container named %s found in pod %s", container, key)
}

func WaitForPodContainerReady(ctx context.Context, t *testing.T, restConfig *rest.Config, kubeClient client.Client, pod *corev1.Pod, container string) error {
// Prevent doing an extra API lookup by checking first.
var err error
if err = IsPodContainerReady(ctx, restConfig, pod, container); err == nil {
return nil
}
t.Logf("waiting for pod to be ready: %s", err)
if waitErr := wait.Poll(2*time.Second, 30*time.Second, func() (done bool, err error) {
if err = kubeClient.Get(ctx, client.ObjectKeyFromObject(pod), pod); err != nil {
return false, err
}
err = IsPodContainerReady(ctx, restConfig, pod, container)
return err == nil, nil
}); waitErr != nil {
if errors.Is(waitErr, wait.ErrWaitTimeout) {
return err
}
return waitErr
}
return nil
}

func IsPodReady(ctx context.Context, restConfig *rest.Config, pod *corev1.Pod) error {
var errs []error
for _, status := range pod.Status.ContainerStatuses {
if !status.Ready {
Expand All @@ -54,18 +88,18 @@ func isPodReady(ctx context.Context, restConfig *rest.Config, pod *corev1.Pod) e
return errors.Join(errs...)
}

func waitUntilPodReady(ctx context.Context, t *testing.T, restConfig *rest.Config, kubeClient client.Client, pod *corev1.Pod) error {
func WaitForPodReady(ctx context.Context, t *testing.T, restConfig *rest.Config, kubeClient client.Client, pod *corev1.Pod) error {
// Prevent doing an extra API lookup by checking first.
var err error
if err = isPodReady(ctx, restConfig, pod); err == nil {
if err = IsPodReady(ctx, restConfig, pod); err == nil {
return nil
}
t.Logf("waiting for pod to be ready: %s", err)
if waitErr := wait.Poll(2*time.Second, 30*time.Second, func() (done bool, err error) {
if err = kubeClient.Get(ctx, client.ObjectKeyFromObject(pod), pod); err != nil {
return false, err
}
err = isPodReady(ctx, restConfig, pod)
err = IsPodReady(ctx, restConfig, pod)
return err == nil, nil
}); waitErr != nil {
if errors.Is(waitErr, wait.ErrWaitTimeout) {
Expand All @@ -76,7 +110,7 @@ func waitUntilPodReady(ctx context.Context, t *testing.T, restConfig *rest.Confi
return nil
}

func getPodByIP(ctx context.Context, kubeClient client.Client, ip net.IP) (*corev1.Pod, error) {
func PodByIP(ctx context.Context, kubeClient client.Client, ip net.IP) (*corev1.Pod, error) {
var pods corev1.PodList
if err := kubeClient.List(ctx, &pods, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector("status.podIP", ip.String()),
Expand All @@ -88,3 +122,19 @@ func getPodByIP(ctx context.Context, kubeClient client.Client, ip net.IP) (*core
}
return &pods.Items[0], nil
}

func PodByAddr(ctx context.Context, kubeClient client.Client, addr *net.TCPAddr) (*corev1.Pod, string, error) {
pod, err := PodByIP(ctx, kubeClient, addr.IP)
if err != nil {
return nil, "", err
}
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if int(port.ContainerPort) == addr.Port {
return pod, container.Name, nil
}
}
}
key := client.ObjectKeyFromObject(pod)
return nil, "", fmt.Errorf("unable to find port %d in pod %s", addr.Port, key)
}

0 comments on commit a75efb4

Please sign in to comment.