Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

E2E scheduling of pods to GW/NonGW nodes #148

Merged
merged 1 commit into from
Sep 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 73 additions & 42 deletions test/e2e/dataplane/tcp_pod_to_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"strings"

"github.com/submariner-io/submariner/test/e2e/framework"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"

Expand All @@ -14,59 +13,91 @@ import (

var _ = Describe("[dataplane] Basic Pod to Service tests across clusters without discovery", func() {

var (
listenerPod, connectorPod *v1.Pod
listenerUUID, connectorUUID string
exitStatusC, exitStatusL int32
exitMessageC, exitMessageL string
)

f := framework.NewDefaultFramework("dataplane-p2s-nd")

BeforeEach(func() {
It("Should be able to perform a Pod to Service TCP connection and exchange data between different clusters NonGW to NonGW", func() {
testPod2ServiceTCP(f, framework.NonGatewayNode, framework.NonGatewayNode)
})

listenerUUID = string(uuid.NewUUID())
connectorUUID = string(uuid.NewUUID())
It("Should be able to perform a Pod to Service TCP connection and exchange data between different clusters GW to GW", func() {
testPod2ServiceTCP(f, framework.GatewayNode, framework.GatewayNode)
})

By("Creating a listener pod in cluster B, which will wait for a handshake over TCP")
listenerPod = f.CreateTCPCheckListenerPod(framework.ClusterB, listenerUUID)
It("Should preserve the source IP (GW to GW node)", func() {
testPod2ServiceTCPIPPreservation(f, framework.GatewayNode, framework.GatewayNode)
})

By("Pointing a service ClusterIP to the listener pod in cluster B")
service := f.CreateTCPService(framework.ClusterB, listenerPod.Labels[framework.TestAppLabel], framework.TestPort)
framework.Logf("Service for listener pod has ClusterIP: %v", service.Spec.ClusterIP)
It("Should preserve the source IP (NonGW to NonGW node)", func() {
testPod2ServiceTCPIPPreservation(f, framework.NonGatewayNode, framework.NonGatewayNode)
})
})

By("Creating a connector pod in cluster A, which will attempt the specific UUID handshake over TCP")
connectorPod = f.CreateTCPCheckConnectorPod(framework.ClusterA, listenerPod, service.Spec.ClusterIP, connectorUUID)
func testPod2ServiceTCP(f *framework.Framework, leftScheduling framework.TestPodScheduling, rightScheduling framework.TestPodScheduling) {

By("Waiting for the listener pod to exit with code 0, returning what listener sent")
exitStatusL, exitMessageL = f.WaitForPodFinishStatus(listenerPod, framework.ClusterB)
framework.Logf("Listener output:\n%s", keepLines(exitMessageL, 3))
Expect(exitStatusL).To(Equal(int32(0)))
listenerUUID := string(uuid.NewUUID())
connectorUUID := string(uuid.NewUUID())

By("Waiting for the connector pod to exit with code 0, returning what connector sent")
exitStatusC, exitMessageC = f.WaitForPodFinishStatus(connectorPod, framework.ClusterA)
framework.Logf("Connector output\n%s", keepLines(exitMessageC, 2))
Expect(exitStatusC).To(Equal(int32(0)))
})
By("Creating a listener pod in cluster B, which will wait for a handshake over TCP")
listenerPod := f.CreateTCPCheckListenerPod(framework.ClusterB, rightScheduling, listenerUUID)

It("Should be able to perform a Pod to Service TCP connection and exchange data between different clusters", func() {
By("Verifying what the pods sent to each other contain the right UUIDs")
Expect(exitMessageL).To(ContainSubstring(connectorUUID))
Expect(exitMessageC).To(ContainSubstring(listenerUUID))
})
By("Pointing a service ClusterIP to the listener pod in cluster B")
service := f.CreateTCPService(framework.ClusterB, listenerPod.Labels[framework.TestAppLabel], framework.TestPort)
framework.Logf("Service for listener pod has ClusterIP: %v", service.Spec.ClusterIP)

It("Should preserve the source IP", func() {
By("Retrieving updated connector pod information, including PodIP")
pc := f.ClusterClients[framework.ClusterA].CoreV1().Pods(f.Namespace)
connectorPod, err := pc.Get(connectorPod.Name, metav1.GetOptions{})
Expect(err).ShouldNot(HaveOccurred())
By("Creating a connector pod in cluster A, which will attempt the specific UUID handshake over TCP")
connectorPod := f.CreateTCPCheckConnectorPod(framework.ClusterA, leftScheduling, service.Spec.ClusterIP, connectorUUID)

framework.Logf("Connector pod has IP: %s", connectorPod.Status.PodIP)
By("Verifying the output of listener pod which must contain the source IP")
Expect(exitMessageL).To(ContainSubstring(connectorPod.Status.PodIP))
})
By("Waiting for the listener pod to exit with code 0, returning what listener sent")
exitStatusL, exitMessageL := f.WaitForPodFinishStatus(listenerPod, framework.ClusterB)
framework.Logf("Listener output:\n%s", keepLines(exitMessageL, 3))
Expect(exitStatusL).To(Equal(int32(0)))

})
By("Waiting for the connector pod to exit with code 0, returning what connector sent")
exitStatusC, exitMessageC := f.WaitForPodFinishStatus(connectorPod, framework.ClusterA)
framework.Logf("Connector output\n%s", keepLines(exitMessageC, 2))
Expect(exitStatusC).To(Equal(int32(0)))

By("Verifying what the pods sent to each other contain the right UUIDs")
Expect(exitMessageL).Should(ContainSubstring(connectorUUID))
Expect(exitMessageC).Should(ContainSubstring(listenerUUID))
}

func testPod2ServiceTCPIPPreservation(f *framework.Framework, leftScheduling framework.TestPodScheduling, rightScheduling framework.TestPodScheduling) {

// TODO(mangelajo): remove the repetition of this function, work being
// already done in PR: https://github.com/submariner-io/submariner/pull/149
listenerUUID := string(uuid.NewUUID())
connectorUUID := string(uuid.NewUUID())

By("Creating a listener pod in cluster B, which will wait for a handshake over TCP")
listenerPod := f.CreateTCPCheckListenerPod(framework.ClusterB, rightScheduling, listenerUUID)

By("Pointing a service ClusterIP to the listener pod in cluster B")
service := f.CreateTCPService(framework.ClusterB, listenerPod.Labels[framework.TestAppLabel], framework.TestPort)
framework.Logf("Service for listener pod has ClusterIP: %v", service.Spec.ClusterIP)

By("Creating a connector pod in cluster A, which will attempt the specific UUID handshake over TCP")
connectorPod := f.CreateTCPCheckConnectorPod(framework.ClusterA, leftScheduling, service.Spec.ClusterIP, connectorUUID)

By("Waiting for the listener pod to exit with code 0, returning what listener sent")
exitStatusL, exitMessageL := f.WaitForPodFinishStatus(listenerPod, framework.ClusterB)
framework.Logf("Listener output:\n%s", keepLines(exitMessageL, 3))
Expect(exitStatusL).To(Equal(int32(0)))

By("Waiting for the connector pod to exit with code 0, returning what connector sent")
exitStatusC, exitMessageC := f.WaitForPodFinishStatus(connectorPod, framework.ClusterA)
framework.Logf("Connector output\n%s", keepLines(exitMessageC, 2))
Expect(exitStatusC).To(Equal(int32(0)))

By("Retrieving updated connector pod information, including PodIP")
pc := f.ClusterClients[framework.ClusterA].CoreV1().Pods(f.Namespace)
connectorPod, err := pc.Get(connectorPod.Name, metav1.GetOptions{})
Expect(err).ShouldNot(HaveOccurred())

framework.Logf("Connector pod has IP: %s", connectorPod.Status.PodIP)
By("Verifying the output of listener pod which must contain the source IP")
Expect(exitMessageL).To(ContainSubstring(connectorPod.Status.PodIP))
}

func keepLines(output string, n int) string {
lines := strings.Split(output, "\n")
Expand Down
10 changes: 6 additions & 4 deletions test/e2e/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ const (
PollInterval = 100 * time.Millisecond
)

type ClusterIndex int

const (
ClusterA = 0
ClusterB = 1
ClusterC = 2
ClusterA ClusterIndex = iota
ClusterB
ClusterC
)

// Framework supports common operations used by e2e tests; it will keep a client & a namespace for you.
Expand Down Expand Up @@ -107,7 +109,7 @@ func (f *Framework) BeforeEach() {
}

for idx, clientSet := range f.ClusterClients {
switch idx {
switch ClusterIndex(idx) {
case ClusterA: // On the first cluster we let k8s generate a name for the namespace
namespace := generateNamespace(clientSet, f.BaseName, namespaceLabels)
f.Namespace = namespace.GetName()
Expand Down
52 changes: 50 additions & 2 deletions test/e2e/framework/network_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
// create a test pod inside the current test namespace on the specified cluster.
// The pod will listen on TestPort over TCP, send sendString over the connection,
// and write the network response in the pod termination log, then exit with 0 status
func (f *Framework) CreateTCPCheckListenerPod(cluster int, sendString string) *v1.Pod {
func (f *Framework) CreateTCPCheckListenerPod(cluster ClusterIndex, scheduling TestPodScheduling, sendString string) *v1.Pod {

tcpCheckListenerPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -26,6 +26,7 @@ func (f *Framework) CreateTCPCheckListenerPod(cluster int, sendString string) *v
},
},
Spec: v1.PodSpec{
Affinity: nodeAffinity(scheduling),
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Expand Down Expand Up @@ -53,7 +54,7 @@ func (f *Framework) CreateTCPCheckListenerPod(cluster int, sendString string) *v
// The pod will connect to remoteIP:TestPort over TCP, send sendString over the
// connection, and write the network response in the pod termination log, then
// exit with 0 status
func (f *Framework) CreateTCPCheckConnectorPod(cluster int, remoteCheckPod *v1.Pod, remoteIP string, sendString string) *v1.Pod {
func (f *Framework) CreateTCPCheckConnectorPod(cluster ClusterIndex, scheduling TestPodScheduling, remoteIP string, sendString string) *v1.Pod {

tcpCheckConnectorPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -63,6 +64,7 @@ func (f *Framework) CreateTCPCheckConnectorPod(cluster int, remoteCheckPod *v1.P
},
},
Spec: v1.PodSpec{
Affinity: nodeAffinity(scheduling),
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Expand All @@ -86,3 +88,49 @@ func (f *Framework) CreateTCPCheckConnectorPod(cluster int, remoteCheckPod *v1.P
Expect(err).NotTo(HaveOccurred())
return tcpCheckPod
}

type TestPodScheduling int

const (
InvalidScheduling TestPodScheduling = iota
GatewayNode
NonGatewayNode
)

func nodeAffinity(scheduling TestPodScheduling) *v1.Affinity {

const gatewayLabel = "submariner.io/gateway"

var nodeSelReqs []v1.NodeSelectorRequirement

switch scheduling {
case GatewayNode:
nodeSelReqs = addNodeSelectorRequirement(nodeSelReqs, gatewayLabel, v1.NodeSelectorOpIn, []string{"true"})

case NonGatewayNode:
nodeSelReqs = addNodeSelectorRequirement(nodeSelReqs, gatewayLabel, v1.NodeSelectorOpDoesNotExist, nil)
nodeSelReqs = addNodeSelectorRequirement(nodeSelReqs, gatewayLabel, v1.NodeSelectorOpNotIn, []string{"true"})
}

affinity := v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: nodeSelReqs,
},
},
},
},
}
return &affinity
}

func addNodeSelectorRequirement(nodeSelReqs []v1.NodeSelectorRequirement, label string,
op v1.NodeSelectorOperator, values []string) []v1.NodeSelectorRequirement {
return append(nodeSelReqs, v1.NodeSelectorRequirement{
Key: label,
Operator: v1.NodeSelectorOpNotIn,
Values: []string{"true"},
})
}
4 changes: 2 additions & 2 deletions test/e2e/framework/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
. "github.com/onsi/gomega"
)

func (f *Framework) WaitForPodToBeReady(waitedPod *v1.Pod, cluster int) *v1.Pod {
func (f *Framework) WaitForPodToBeReady(waitedPod *v1.Pod, cluster ClusterIndex) *v1.Pod {
var finalPod *v1.Pod
pc := f.ClusterClients[cluster].CoreV1().Pods(f.Namespace)
err := wait.PollImmediate(5*time.Second, 2*time.Minute, func() (bool, error) {
Expand All @@ -36,7 +36,7 @@ func (f *Framework) WaitForPodToBeReady(waitedPod *v1.Pod, cluster int) *v1.Pod
return finalPod
}

func (f *Framework) WaitForPodFinishStatus(waitedPod *v1.Pod, cluster int) (terminationCode int32, terminationMessage string) {
func (f *Framework) WaitForPodFinishStatus(waitedPod *v1.Pod, cluster ClusterIndex) (terminationCode int32, terminationMessage string) {
pc := f.ClusterClients[cluster].CoreV1().Pods(f.Namespace)
err := wait.PollImmediate(5*time.Second, 2*time.Minute, func() (bool, error) {
pod, err := pc.Get(waitedPod.Name, metav1.GetOptions{})
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/framework/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const (
TestAppLabel = "test-app"
)

func (f *Framework) CreateTCPService(cluster int, selectorName string, port int) *v1.Service {
func (f *Framework) CreateTCPService(cluster ClusterIndex, selectorName string, port int) *v1.Service {

tcpService := v1.Service{
ObjectMeta: metav1.ObjectMeta{
Expand Down