Skip to content

Commit

Permalink
E2E: Refactor handling of pods
Browse files Browse the repository at this point in the history
The concept of TestPodConfig and TestPod is defined, letting us create
testing pods from a single interface, and automating the creation
of UUID data to exchange.

As late modifications has shown that the number of parameters keeps growing
having a single fixture to create test pods will make easier to write
and read E2E tests.

Depends-On: submariner-io#148
  • Loading branch information
mangelajo committed Sep 12, 2019
1 parent d04a4f3 commit 6168d2b
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 52 deletions.
29 changes: 17 additions & 12 deletions test/e2e/dataplane/tcp_pod_to_service.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package dataplane

import (
"k8s.io/apimachinery/pkg/util/uuid"

"github.com/submariner-io/submariner/test/e2e/framework"

. "github.com/onsi/ginkgo"
Expand All @@ -22,30 +20,37 @@ var _ = Describe("[dataplane] Basic Pod to Service tests across clusters without

func testPod2ServiceTCP(f *framework.Framework, leftScheduling framework.TestPodScheduling, rightScheduling framework.TestPodScheduling) {

listenerUUID := string(uuid.NewUUID())
connectorUUID := string(uuid.NewUUID())

By("Creating a listener pod in cluster B, which will wait for a handshake over TCP")
listenerPod := f.CreateTCPCheckListenerPod(framework.ClusterB, rightScheduling, listenerUUID)
listenerPod := f.NewTestPod(&framework.TestPodConfig{
Type: framework.ListenerPod,
Cluster: framework.ClusterB,
Scheduling: rightScheduling,
})

By("Pointing a service ClusterIP to the listener pod in cluster B")
service := f.CreateTCPService(framework.ClusterB, listenerPod.Labels[framework.TestAppLabel], framework.TestPort)
service := listenerPod.CreateService()
framework.Logf("Service for listener pod has ClusterIP: %v", service.Spec.ClusterIP)

By("Creating a connector pod in cluster A, which will attempt the specific UUID handshake over TCP")
connectorPod := f.CreateTCPCheckConnectorPod(framework.ClusterA, leftScheduling, service.Spec.ClusterIP, connectorUUID)

connectorPod := f.NewTestPod(&framework.TestPodConfig{
Type: framework.ConnectorPod,
Cluster: framework.ClusterA,
Scheduling: leftScheduling,
RemoteIP: service.Spec.ClusterIP,
})

By("Waiting for the listener pod to exit with code 0, returning what listener sent")
exitStatusL, exitMessageL := f.WaitForPodFinishStatus(listenerPod, framework.ClusterB)
exitStatusL, exitMessageL := listenerPod.WaitForFinishStatus()
framework.Logf("Listener output:\n%s", exitMessageL)
Expect(exitStatusL).To(Equal(int32(0)))

By("Waiting for the connector pod to exit with code 0, returning what connector sent")
exitStatusC, exitMessageC := f.WaitForPodFinishStatus(connectorPod, framework.ClusterA)
exitStatusC, exitMessageC := connectorPod.WaitForFinishStatus()
framework.Logf("Connector output\n%s", exitMessageC)
Expect(exitStatusC).To(Equal(int32(0)))

By("Verifying what the pods sent to each other contain the right UUIDs")
Expect(exitMessageL).To(ContainSubstring(connectorUUID))
Expect(exitMessageC).To(ContainSubstring(listenerUUID))
Expect(exitMessageL).To(ContainSubstring(connectorPod.Config.Data))
Expect(exitMessageC).To(ContainSubstring(listenerPod.Config.Data))
}
43 changes: 16 additions & 27 deletions test/e2e/framework/network_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,10 @@ import (
. "github.com/onsi/gomega"
)

const (
TestPort = 1234
)

// create a test pod inside the current test namespace on the specified cluster.
// The pod will listen on TestPort over TCP, send sendString over the connection,
// and write the network response in the pod termination log, then exit with 0 status
func (f *Framework) CreateTCPCheckListenerPod(cluster TestCluster, scheduling TestPodScheduling, sendString string) *v1.Pod {
func (tp *TestPod) buildTCPCheckListenerPod() {

tcpCheckListenerPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -26,7 +22,7 @@ func (f *Framework) CreateTCPCheckListenerPod(cluster TestCluster, scheduling Te
},
},
Spec: v1.PodSpec{
Affinity: nodeAffinity(scheduling),
Affinity: nodeAffinity(tp.Config.Scheduling),
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Expand All @@ -36,25 +32,26 @@ func (f *Framework) CreateTCPCheckListenerPod(cluster TestCluster, scheduling Te
// resource environments from not sending at least some data before timeout.
Command: []string{"sh", "-c", "for i in $(seq 50); do echo listener says $SEND_STRING; done | nc -l -v -p $LISTEN_PORT -s 0.0.0.0 >/dev/termination-log 2>&1"},
Env: []v1.EnvVar{
{Name: "LISTEN_PORT", Value: strconv.Itoa(TestPort)},
{Name: "SEND_STRING", Value: sendString},
{Name: "LISTEN_PORT", Value: strconv.Itoa(tp.Config.Port)},
{Name: "SEND_STRING", Value: tp.Config.Data},
},
},
},
},
}

pc := f.ClusterClients[cluster].CoreV1().Pods(f.Namespace)
tcpListenerPod, err := pc.Create(&tcpCheckListenerPod)
pc := tp.framework.ClusterClients[tp.Config.Cluster].CoreV1().Pods(tp.framework.Namespace)
var err error
tp.Pod, err = pc.Create(&tcpCheckListenerPod)
Expect(err).NotTo(HaveOccurred())
return f.WaitForPodToBeReady(tcpListenerPod, cluster)
tp.WaitToBeReady()
}

// create a test pod inside the current test namespace on the specified cluster.
// The pod will connect to remoteIP:TestPort over TCP, send sendString over the
// connection, and write the network response in the pod termination log, then
// exit with 0 status
func (f *Framework) CreateTCPCheckConnectorPod(cluster TestCluster, scheduling TestPodScheduling, remoteIP string, sendString string) *v1.Pod {
func (tp *TestPod) buildTCPCheckConnectorPod() {

tcpCheckConnectorPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -64,7 +61,7 @@ func (f *Framework) CreateTCPCheckConnectorPod(cluster TestCluster, scheduling T
},
},
Spec: v1.PodSpec{
Affinity: nodeAffinity(scheduling),
Affinity: nodeAffinity(tp.Config.Scheduling),
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Expand All @@ -74,29 +71,21 @@ func (f *Framework) CreateTCPCheckConnectorPod(cluster TestCluster, scheduling T
// resource environments from not sending at least some data before timeout.
Command: []string{"sh", "-c", "for in in $(seq 50); do echo connector says $SEND_STRING; done | nc -v $REMOTE_IP $REMOTE_PORT -w 5 >/dev/termination-log 2>&1"},
Env: []v1.EnvVar{
{Name: "REMOTE_PORT", Value: strconv.Itoa(TestPort)},
{Name: "SEND_STRING", Value: sendString},
{Name: "REMOTE_IP", Value: remoteIP},
{Name: "REMOTE_PORT", Value: strconv.Itoa(tp.Config.Port)},
{Name: "SEND_STRING", Value: tp.Config.Data},
{Name: "REMOTE_IP", Value: tp.Config.RemoteIP},
},
},
},
},
}

pc := f.ClusterClients[cluster].CoreV1().Pods(f.Namespace)
tcpCheckPod, err := pc.Create(&tcpCheckConnectorPod)
pc := tp.framework.ClusterClients[tp.Config.Cluster].CoreV1().Pods(tp.framework.Namespace)
var err error
tp.Pod, err = pc.Create(&tcpCheckConnectorPod)
Expect(err).NotTo(HaveOccurred())
return tcpCheckPod
}

type TestPodScheduling int

const (
InvalidScheduling TestPodScheduling = iota
GatewayNode
NonGatewayNode
)

func nodeAffinity(scheduling TestPodScheduling) *v1.Affinity {

const gatewayLabel = "submariner.io/gateway"
Expand Down
104 changes: 91 additions & 13 deletions test/e2e/framework/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,80 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"

. "github.com/onsi/gomega"
)

func (f *Framework) WaitForPodToBeReady(waitedPod *v1.Pod, cluster TestCluster) *v1.Pod {
type TestPodType int

const (
InvalidPodType TestPodType = iota
ListenerPod
ConnectorPod
)

type TestPodScheduling int

const (
InvalidScheduling TestPodScheduling = iota
GatewayNode
NonGatewayNode
)

type TestPodConfig struct {
Type TestPodType
Cluster TestCluster
Scheduling TestPodScheduling
Port int
Data string
RemoteIP string
// TODO: namespace, once https://github.com/submariner-io/submariner/pull/141 is merged
}

type TestPod struct {
Pod *v1.Pod
Config *TestPodConfig
framework *Framework
}

const (
TestPort = 1234
)

func (f *Framework) NewTestPod(config *TestPodConfig) *TestPod {

// check if all necessary details are provided
Expect(config.Scheduling).ShouldNot(Equal(InvalidScheduling))
Expect(config.Type).ShouldNot(Equal(InvalidPodType))

// setup unset defaults
if config.Port == 0 {
config.Port = TestPort
}

if config.Data == "" {
config.Data = string(uuid.NewUUID())
}

testPod := &TestPod{Config: config, framework: f}

switch config.Type {
case ListenerPod:
testPod.buildTCPCheckListenerPod()
case ConnectorPod:
testPod.buildTCPCheckConnectorPod()
}

return testPod
}

func (tp *TestPod) WaitToBeReady() {
var finalPod *v1.Pod
pc := f.ClusterClients[cluster].CoreV1().Pods(f.Namespace)
pc := tp.framework.ClusterClients[tp.Config.Cluster].CoreV1().Pods(tp.framework.Namespace)
err := wait.PollImmediate(5*time.Second, 2*time.Minute, func() (bool, error) {
pod, err := pc.Get(waitedPod.Name, metav1.GetOptions{})
pod, err := pc.Get(tp.Pod.Name, metav1.GetOptions{})
if err != nil {
if IsTransientError(err) {
Logf("Transient failure when attempting to list pods: %v", err)
Expand All @@ -33,33 +97,47 @@ func (f *Framework) WaitForPodToBeReady(waitedPod *v1.Pod, cluster TestCluster)
return true, nil // pods is running
})
Expect(err).NotTo(HaveOccurred())
return finalPod
tp.Pod = finalPod
}

func (f *Framework) WaitForPodFinishStatus(waitedPod *v1.Pod, cluster TestCluster) (terminationCode int32, terminationMessage string) {
pc := f.ClusterClients[cluster].CoreV1().Pods(f.Namespace)
func (tp *TestPod) WaitForFinishStatus() (terminationCode int32, terminationMessage string) {

finished := tp.WaitForFinish()

Expect(finished).To(BeTrue())

terminationCode = tp.Pod.Status.ContainerStatuses[0].State.Terminated.ExitCode
terminationMessage = tp.Pod.Status.ContainerStatuses[0].State.Terminated.Message
return terminationCode, terminationMessage
}

func (tp *TestPod) WaitForFinish() bool {
pc := tp.framework.ClusterClients[tp.Config.Cluster].CoreV1().Pods(tp.framework.Namespace)
err := wait.PollImmediate(5*time.Second, 2*time.Minute, func() (bool, error) {
pod, err := pc.Get(waitedPod.Name, metav1.GetOptions{})
var err error
tp.Pod, err = pc.Get(tp.Pod.Name, metav1.GetOptions{})
if err != nil {
if IsTransientError(err) {
Logf("Transient failure when attempting to list pods: %v", err)
return false, nil // return nil to avoid PollImmediate from stopping
}
return false, err
}
switch pod.Status.Phase {
switch tp.Pod.Status.Phase {
case v1.PodSucceeded:
terminationCode = pod.Status.ContainerStatuses[0].State.Terminated.ExitCode
terminationMessage = pod.Status.ContainerStatuses[0].State.Terminated.Message
return true, nil
case v1.PodFailed:
terminationCode = pod.Status.ContainerStatuses[0].State.Terminated.ExitCode
terminationMessage = pod.Status.ContainerStatuses[0].State.Terminated.Message
return true, nil
default:
return false, nil
}
})

Expect(err).NotTo(HaveOccurred())
return terminationCode, terminationMessage

return tp.Pod.Status.Phase == v1.PodSucceeded || tp.Pod.Status.Phase == v1.PodFailed
}

func (tp *TestPod) CreateService() *v1.Service {
return tp.framework.CreateTCPService(tp.Config.Cluster, tp.Pod.Labels[TestAppLabel], tp.Config.Port)
}

0 comments on commit 6168d2b

Please sign in to comment.