Skip to content

Commit

Permalink
E2E scheduling of pods to GW/NonGW nodes
Browse files Browse the repository at this point in the history
Before this patch, E2E didn't have any control of where pods
landed, this made the testing of different scenarios random.

With this patch we ensure the testing of the datapath when
traffic needs to transit from a worker node to a gateway node.

Cluster type has been converted to an enumeration as part of this
commit.
  • Loading branch information
mangelajo committed Sep 18, 2019
1 parent 9aad038 commit af73357
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 51 deletions.
115 changes: 73 additions & 42 deletions test/e2e/dataplane/tcp_pod_to_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"strings"

"github.com/submariner-io/submariner/test/e2e/framework"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"

Expand All @@ -14,59 +13,91 @@ import (

var _ = Describe("[dataplane] Basic Pod to Service tests across clusters without discovery", func() {

var (
listenerPod, connectorPod *v1.Pod
listenerUUID, connectorUUID string
exitStatusC, exitStatusL int32
exitMessageC, exitMessageL string
)

f := framework.NewDefaultFramework("dataplane-p2s-nd")

BeforeEach(func() {
It("Should be able to perform a Pod to Service TCP connection and exchange data between different clusters NonGW to NonGW", func() {
testPod2ServiceTCP(f, framework.NonGatewayNode, framework.NonGatewayNode)
})

listenerUUID = string(uuid.NewUUID())
connectorUUID = string(uuid.NewUUID())
It("Should be able to perform a Pod to Service TCP connection and exchange data between different clusters GW to GW", func() {
testPod2ServiceTCP(f, framework.GatewayNode, framework.GatewayNode)
})

By("Creating a listener pod in cluster B, which will wait for a handshake over TCP")
listenerPod = f.CreateTCPCheckListenerPod(framework.ClusterB, listenerUUID)
It("Should preserve the source IP (GW to GW node)", func() {
testPod2ServiceTCPIPPreservation(f, framework.GatewayNode, framework.GatewayNode)
})

By("Pointing a service ClusterIP to the listener pod in cluster B")
service := f.CreateTCPService(framework.ClusterB, listenerPod.Labels[framework.TestAppLabel], framework.TestPort)
framework.Logf("Service for listener pod has ClusterIP: %v", service.Spec.ClusterIP)
It("Should preserve the source IP (NonGW to NonGW node)", func() {
testPod2ServiceTCPIPPreservation(f, framework.NonGatewayNode, framework.NonGatewayNode)
})
})

By("Creating a connector pod in cluster A, which will attempt the specific UUID handshake over TCP")
connectorPod = f.CreateTCPCheckConnectorPod(framework.ClusterA, listenerPod, service.Spec.ClusterIP, connectorUUID)
func testPod2ServiceTCP(f *framework.Framework, leftScheduling framework.TestPodScheduling, rightScheduling framework.TestPodScheduling) {

By("Waiting for the listener pod to exit with code 0, returning what listener sent")
exitStatusL, exitMessageL = f.WaitForPodFinishStatus(listenerPod, framework.ClusterB)
framework.Logf("Listener output:\n%s", keepLines(exitMessageL, 3))
Expect(exitStatusL).To(Equal(int32(0)))
listenerUUID := string(uuid.NewUUID())
connectorUUID := string(uuid.NewUUID())

By("Waiting for the connector pod to exit with code 0, returning what connector sent")
exitStatusC, exitMessageC = f.WaitForPodFinishStatus(connectorPod, framework.ClusterA)
framework.Logf("Connector output\n%s", keepLines(exitMessageC, 2))
Expect(exitStatusC).To(Equal(int32(0)))
})
By("Creating a listener pod in cluster B, which will wait for a handshake over TCP")
listenerPod := f.CreateTCPCheckListenerPod(framework.ClusterB, rightScheduling, listenerUUID)

It("Should be able to perform a Pod to Service TCP connection and exchange data between different clusters", func() {
By("Verifying what the pods sent to each other contain the right UUIDs")
Expect(exitMessageL).To(ContainSubstring(connectorUUID))
Expect(exitMessageC).To(ContainSubstring(listenerUUID))
})
By("Pointing a service ClusterIP to the listener pod in cluster B")
service := f.CreateTCPService(framework.ClusterB, listenerPod.Labels[framework.TestAppLabel], framework.TestPort)
framework.Logf("Service for listener pod has ClusterIP: %v", service.Spec.ClusterIP)

It("Should preserve the source IP", func() {
By("Retrieving updated connector pod information, including PodIP")
pc := f.ClusterClients[framework.ClusterA].CoreV1().Pods(f.Namespace)
connectorPod, err := pc.Get(connectorPod.Name, metav1.GetOptions{})
Expect(err).ShouldNot(HaveOccurred())
By("Creating a connector pod in cluster A, which will attempt the specific UUID handshake over TCP")
connectorPod := f.CreateTCPCheckConnectorPod(framework.ClusterA, leftScheduling, service.Spec.ClusterIP, connectorUUID)

framework.Logf("Connector pod has IP: %s", connectorPod.Status.PodIP)
By("Verifying the output of listener pod which must contain the source IP")
Expect(exitMessageL).To(ContainSubstring(connectorPod.Status.PodIP))
})
By("Waiting for the listener pod to exit with code 0, returning what listener sent")
exitStatusL, exitMessageL := f.WaitForPodFinishStatus(listenerPod, framework.ClusterB)
framework.Logf("Listener output:\n%s", keepLines(exitMessageL, 3))
Expect(exitStatusL).To(Equal(int32(0)))

})
By("Waiting for the connector pod to exit with code 0, returning what connector sent")
exitStatusC, exitMessageC := f.WaitForPodFinishStatus(connectorPod, framework.ClusterA)
framework.Logf("Connector output\n%s", keepLines(exitMessageC, 2))
Expect(exitStatusC).To(Equal(int32(0)))

By("Verifying what the pods sent to each other contain the right UUIDs")
Expect(exitMessageL).Should(ContainSubstring(connectorUUID))
Expect(exitMessageC).Should(ContainSubstring(listenerUUID))
}

func testPod2ServiceTCPIPPreservation(f *framework.Framework, leftScheduling framework.TestPodScheduling, rightScheduling framework.TestPodScheduling) {

// TODO(mangelajo): remove the repetition of this function, work being
// already done in PR: https://github.com/submariner-io/submariner/pull/149
listenerUUID := string(uuid.NewUUID())
connectorUUID := string(uuid.NewUUID())

By("Creating a listener pod in cluster B, which will wait for a handshake over TCP")
listenerPod := f.CreateTCPCheckListenerPod(framework.ClusterB, rightScheduling, listenerUUID)

By("Pointing a service ClusterIP to the listener pod in cluster B")
service := f.CreateTCPService(framework.ClusterB, listenerPod.Labels[framework.TestAppLabel], framework.TestPort)
framework.Logf("Service for listener pod has ClusterIP: %v", service.Spec.ClusterIP)

By("Creating a connector pod in cluster A, which will attempt the specific UUID handshake over TCP")
connectorPod := f.CreateTCPCheckConnectorPod(framework.ClusterA, leftScheduling, service.Spec.ClusterIP, connectorUUID)

By("Waiting for the listener pod to exit with code 0, returning what listener sent")
exitStatusL, exitMessageL := f.WaitForPodFinishStatus(listenerPod, framework.ClusterB)
framework.Logf("Listener output:\n%s", keepLines(exitMessageL, 3))
Expect(exitStatusL).To(Equal(int32(0)))

By("Waiting for the connector pod to exit with code 0, returning what connector sent")
exitStatusC, exitMessageC := f.WaitForPodFinishStatus(connectorPod, framework.ClusterA)
framework.Logf("Connector output\n%s", keepLines(exitMessageC, 2))
Expect(exitStatusC).To(Equal(int32(0)))

By("Retrieving updated connector pod information, including PodIP")
pc := f.ClusterClients[framework.ClusterA].CoreV1().Pods(f.Namespace)
connectorPod, err := pc.Get(connectorPod.Name, metav1.GetOptions{})
Expect(err).ShouldNot(HaveOccurred())

framework.Logf("Connector pod has IP: %s", connectorPod.Status.PodIP)
By("Verifying the output of listener pod which must contain the source IP")
Expect(exitMessageL).To(ContainSubstring(connectorPod.Status.PodIP))
}

func keepLines(output string, n int) string {
lines := strings.Split(output, "\n")
Expand Down
10 changes: 6 additions & 4 deletions test/e2e/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ const (
PollInterval = 100 * time.Millisecond
)

type ClusterIndex int

const (
ClusterA = 0
ClusterB = 1
ClusterC = 2
ClusterA ClusterIndex = iota
ClusterB
ClusterC
)

// Framework supports common operations used by e2e tests; it will keep a client & a namespace for you.
Expand Down Expand Up @@ -107,7 +109,7 @@ func (f *Framework) BeforeEach() {
}

for idx, clientSet := range f.ClusterClients {
switch idx {
switch ClusterIndex(idx) {
case ClusterA: // On the first cluster we let k8s generate a name for the namespace
namespace := generateNamespace(clientSet, f.BaseName, namespaceLabels)
f.Namespace = namespace.GetName()
Expand Down
52 changes: 50 additions & 2 deletions test/e2e/framework/network_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
// create a test pod inside the current test namespace on the specified cluster.
// The pod will listen on TestPort over TCP, send sendString over the connection,
// and write the network response in the pod termination log, then exit with 0 status
func (f *Framework) CreateTCPCheckListenerPod(cluster int, sendString string) *v1.Pod {
func (f *Framework) CreateTCPCheckListenerPod(cluster ClusterIndex, scheduling TestPodScheduling, sendString string) *v1.Pod {

tcpCheckListenerPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -26,6 +26,7 @@ func (f *Framework) CreateTCPCheckListenerPod(cluster int, sendString string) *v
},
},
Spec: v1.PodSpec{
Affinity: nodeAffinity(scheduling),
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Expand Down Expand Up @@ -53,7 +54,7 @@ func (f *Framework) CreateTCPCheckListenerPod(cluster int, sendString string) *v
// The pod will connect to remoteIP:TestPort over TCP, send sendString over the
// connection, and write the network response in the pod termination log, then
// exit with 0 status
func (f *Framework) CreateTCPCheckConnectorPod(cluster int, remoteCheckPod *v1.Pod, remoteIP string, sendString string) *v1.Pod {
func (f *Framework) CreateTCPCheckConnectorPod(cluster ClusterIndex, scheduling TestPodScheduling, remoteIP string, sendString string) *v1.Pod {

tcpCheckConnectorPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -63,6 +64,7 @@ func (f *Framework) CreateTCPCheckConnectorPod(cluster int, remoteCheckPod *v1.P
},
},
Spec: v1.PodSpec{
Affinity: nodeAffinity(scheduling),
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Expand All @@ -86,3 +88,49 @@ func (f *Framework) CreateTCPCheckConnectorPod(cluster int, remoteCheckPod *v1.P
Expect(err).NotTo(HaveOccurred())
return tcpCheckPod
}

type TestPodScheduling int

const (
InvalidScheduling TestPodScheduling = iota
GatewayNode
NonGatewayNode
)

func nodeAffinity(scheduling TestPodScheduling) *v1.Affinity {

const gatewayLabel = "submariner.io/gateway"

var nodeSelReqs []v1.NodeSelectorRequirement

switch scheduling {
case GatewayNode:
nodeSelReqs = addNodeSelectorRequirement(nodeSelReqs, gatewayLabel, v1.NodeSelectorOpIn, []string{"true"})

case NonGatewayNode:
nodeSelReqs = addNodeSelectorRequirement(nodeSelReqs, gatewayLabel, v1.NodeSelectorOpDoesNotExist, nil)
nodeSelReqs = addNodeSelectorRequirement(nodeSelReqs, gatewayLabel, v1.NodeSelectorOpNotIn, []string{"true"})
}

affinity := v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: nodeSelReqs,
},
},
},
},
}
return &affinity
}

func addNodeSelectorRequirement(nodeSelReqs []v1.NodeSelectorRequirement, label string,
op v1.NodeSelectorOperator, values []string) []v1.NodeSelectorRequirement {
return append(nodeSelReqs, v1.NodeSelectorRequirement{
Key: label,
Operator: v1.NodeSelectorOpNotIn,
Values: []string{"true"},
})
}
4 changes: 2 additions & 2 deletions test/e2e/framework/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
. "github.com/onsi/gomega"
)

func (f *Framework) WaitForPodToBeReady(waitedPod *v1.Pod, cluster int) *v1.Pod {
func (f *Framework) WaitForPodToBeReady(waitedPod *v1.Pod, cluster ClusterIndex) *v1.Pod {
var finalPod *v1.Pod
pc := f.ClusterClients[cluster].CoreV1().Pods(f.Namespace)
err := wait.PollImmediate(5*time.Second, 2*time.Minute, func() (bool, error) {
Expand All @@ -36,7 +36,7 @@ func (f *Framework) WaitForPodToBeReady(waitedPod *v1.Pod, cluster int) *v1.Pod
return finalPod
}

func (f *Framework) WaitForPodFinishStatus(waitedPod *v1.Pod, cluster int) (terminationCode int32, terminationMessage string) {
func (f *Framework) WaitForPodFinishStatus(waitedPod *v1.Pod, cluster ClusterIndex) (terminationCode int32, terminationMessage string) {
pc := f.ClusterClients[cluster].CoreV1().Pods(f.Namespace)
err := wait.PollImmediate(5*time.Second, 2*time.Minute, func() (bool, error) {
pod, err := pc.Get(waitedPod.Name, metav1.GetOptions{})
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/framework/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const (
TestAppLabel = "test-app"
)

func (f *Framework) CreateTCPService(cluster int, selectorName string, port int) *v1.Service {
func (f *Framework) CreateTCPService(cluster ClusterIndex, selectorName string, port int) *v1.Service {

tcpService := v1.Service{
ObjectMeta: metav1.ObjectMeta{
Expand Down

0 comments on commit af73357

Please sign in to comment.