diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index daae6aaf647..e6a2ba0ede7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -5,12 +5,10 @@ on: branches: - master - release-* - - feature/flow-aggregator push: branches: - master - release-* - - feature/flow-aggregator jobs: check-changes: @@ -109,7 +107,7 @@ jobs: run: make flow-aggregator-ubuntu - name: Push flow-aggregator Docker image to registry # Will remove the feature/flow-aggregator branch later - if: ${{ github.repository == 'vmware-tanzu/antrea' && github.event_name == 'push' && (github.ref == 'refs/heads/master' || github.ref == 'refs/heads/feature/flow-aggregator')}} + if: ${{ github.repository == 'vmware-tanzu/antrea' && github.event_name == 'push' && github.ref == 'refs/heads/master' }} env: DOCKER_USERNAME: ${{ secrets.DOCKER_USERNAME }} DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }} diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index eedf81d2d28..e379de7d15c 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -4,12 +4,10 @@ on: branches: - master - release-* - - feature/flow-aggregator push: branches: - master - release-* - - feature/flow-aggregator jobs: diff --git a/.github/workflows/kind.yml b/.github/workflows/kind.yml index 4bc98281b43..fba99257e0d 100755 --- a/.github/workflows/kind.yml +++ b/.github/workflows/kind.yml @@ -4,12 +4,10 @@ on: branches: - master - release-* - - feature/flow-aggregator push: branches: - master - release-* - - feature/flow-aggregator env: KIND_VERSION: v0.9.0 diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index dd69638d435..1693dd2b5d5 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -1248,7 +1248,7 @@ data: # If PORT is empty, we default to 4739, the standard IPFIX port. # If no PROTO is given, we consider "tcp" as default. We support "tcp" and "udp" # L4 transport protocols. - #flowCollectorAddr: "flow-aggregator.flow-aggregator.svc:tcp" + #flowCollectorAddr: "flow-aggregator.flow-aggregator.svc:4739:tcp" # Provide flow poll interval as a duration string. This determines how often the flow exporter dumps connections from the conntrack module. # Flow poll interval should be greater than or equal to 1s (one second). @@ -1315,7 +1315,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-tkk482c56m + name: antrea-config-gm7dcbm584 namespace: kube-system --- apiVersion: v1 @@ -1422,7 +1422,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-tkk482c56m + name: antrea-config-gm7dcbm584 name: antrea-config - name: antrea-controller-tls secret: @@ -1687,7 +1687,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-tkk482c56m + name: antrea-config-gm7dcbm584 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 4e486d55faa..3fe8c3d1bc1 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -1248,7 +1248,7 @@ data: # If PORT is empty, we default to 4739, the standard IPFIX port. # If no PROTO is given, we consider "tcp" as default. We support "tcp" and "udp" # L4 transport protocols. - #flowCollectorAddr: "flow-aggregator.flow-aggregator.svc:tcp" + #flowCollectorAddr: "flow-aggregator.flow-aggregator.svc:4739:tcp" # Provide flow poll interval as a duration string. This determines how often the flow exporter dumps connections from the conntrack module. # Flow poll interval should be greater than or equal to 1s (one second). @@ -1315,7 +1315,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-tkk482c56m + name: antrea-config-gm7dcbm584 namespace: kube-system --- apiVersion: v1 @@ -1422,7 +1422,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-tkk482c56m + name: antrea-config-gm7dcbm584 name: antrea-config - name: antrea-controller-tls secret: @@ -1689,7 +1689,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-tkk482c56m + name: antrea-config-gm7dcbm584 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 7a9754f466d..7157bcbb2b2 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -1248,7 +1248,7 @@ data: # If PORT is empty, we default to 4739, the standard IPFIX port. # If no PROTO is given, we consider "tcp" as default. We support "tcp" and "udp" # L4 transport protocols. - #flowCollectorAddr: "flow-aggregator.flow-aggregator.svc:tcp" + #flowCollectorAddr: "flow-aggregator.flow-aggregator.svc:4739:tcp" # Provide flow poll interval as a duration string. This determines how often the flow exporter dumps connections from the conntrack module. # Flow poll interval should be greater than or equal to 1s (one second). @@ -1315,7 +1315,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-ccm9d925m4 + name: antrea-config-h7t8ffthht namespace: kube-system --- apiVersion: v1 @@ -1422,7 +1422,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-ccm9d925m4 + name: antrea-config-h7t8ffthht name: antrea-config - name: antrea-controller-tls secret: @@ -1687,7 +1687,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-ccm9d925m4 + name: antrea-config-h7t8ffthht name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index ca4ebcf3d01..d59ff226773 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -1253,7 +1253,7 @@ data: # If PORT is empty, we default to 4739, the standard IPFIX port. # If no PROTO is given, we consider "tcp" as default. We support "tcp" and "udp" # L4 transport protocols. - #flowCollectorAddr: "flow-aggregator.flow-aggregator.svc:tcp" + #flowCollectorAddr: "flow-aggregator.flow-aggregator.svc:4739:tcp" # Provide flow poll interval as a duration string. This determines how often the flow exporter dumps connections from the conntrack module. # Flow poll interval should be greater than or equal to 1s (one second). @@ -1320,7 +1320,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-mcfc2b62gk + name: antrea-config-mh52t2hmmd namespace: kube-system --- apiVersion: v1 @@ -1436,7 +1436,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-mcfc2b62gk + name: antrea-config-mh52t2hmmd name: antrea-config - name: antrea-controller-tls secret: @@ -1736,7 +1736,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-mcfc2b62gk + name: antrea-config-mh52t2hmmd name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index e035c6bcc5f..4b490e6519a 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -1253,7 +1253,7 @@ data: # If PORT is empty, we default to 4739, the standard IPFIX port. # If no PROTO is given, we consider "tcp" as default. We support "tcp" and "udp" # L4 transport protocols. - #flowCollectorAddr: "flow-aggregator.flow-aggregator.svc:tcp" + #flowCollectorAddr: "flow-aggregator.flow-aggregator.svc:4739:tcp" # Provide flow poll interval as a duration string. This determines how often the flow exporter dumps connections from the conntrack module. # Flow poll interval should be greater than or equal to 1s (one second). @@ -1320,7 +1320,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-5c77mt4c28 + name: antrea-config-mfd9dcdh6d namespace: kube-system --- apiVersion: v1 @@ -1427,7 +1427,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-5c77mt4c28 + name: antrea-config-mfd9dcdh6d name: antrea-config - name: antrea-controller-tls secret: @@ -1692,7 +1692,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-5c77mt4c28 + name: antrea-config-mfd9dcdh6d name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/base/conf/antrea-agent.conf b/build/yamls/base/conf/antrea-agent.conf index 14b487feb78..b0639eeb002 100644 --- a/build/yamls/base/conf/antrea-agent.conf +++ b/build/yamls/base/conf/antrea-agent.conf @@ -97,7 +97,7 @@ featureGates: # If PORT is empty, we default to 4739, the standard IPFIX port. # If no PROTO is given, we consider "tcp" as default. We support "tcp" and "udp" # L4 transport protocols. -#flowCollectorAddr: "flow-aggregator.flow-aggregator.svc:tcp" +#flowCollectorAddr: "flow-aggregator.flow-aggregator.svc:4739:tcp" # Provide flow poll interval as a duration string. This determines how often the flow exporter dumps connections from the conntrack module. # Flow poll interval should be greater than or equal to 1s (one second). diff --git a/build/yamls/flow-aggregator.yml b/build/yamls/flow-aggregator.yml index 3caf2b55fdd..88bd37fb9b4 100644 --- a/build/yamls/flow-aggregator.yml +++ b/build/yamls/flow-aggregator.yml @@ -22,6 +22,7 @@ data: #aggregatorTransportProtocol: "tcp" kind: ConfigMap metadata: + annotations: {} labels: app: flow-aggregator name: flow-aggregator-configmap-kggb5829gb diff --git a/test/e2e/fixtures.go b/test/e2e/fixtures.go index bf51b6f647f..dec223ed4a8 100644 --- a/test/e2e/fixtures.go +++ b/test/e2e/fixtures.go @@ -146,18 +146,21 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, error, bool) { ipfixCollectorIP, err := testData.podWaitForIPs(defaultTimeout, "ipfix-collector", testNamespace) if err != nil || len(ipfixCollectorIP.ipStrings) == 0 { tb.Errorf("Error when waiting to get ipfix collector Pod IP: %v", err) + return nil, err, isIPv6 } ipStr := ipfixCollectorIP.ipv4.String() - tb.Logf("Applying flow aggregator YAML with ipfix collector address: %s", ipStr) - if err := testData.deployFlowAggregator(fmt.Sprintf("%s:%s:tcp", ipStr, ipfixCollectorPort)); err != nil { + + tb.Logf("Applying flow aggregator YAML with ipfix collector address: %s:%s:tcp", ipStr, ipfixCollectorPort) + faClusterIP, err := testData.deployFlowAggregator(fmt.Sprintf("%s:%s:tcp", ipStr, ipfixCollectorPort)) + if err != nil { return testData, err, isIPv6 } - tb.Logf("Deploying flow exporter") - if err := testData.deployAntreaFlowExporter(""); err != nil { + tb.Logf("Deploying flow exporter with collector address: %s:%s:tcp", faClusterIP, ipfixCollectorPort) + if err = testData.deployAntreaFlowExporter(fmt.Sprintf("%s:%s:tcp", faClusterIP, ipfixCollectorPort)); err != nil { return testData, err, isIPv6 } tb.Logf("Checking CoreDNS deployment") - if err := testData.checkCoreDNSPods(defaultTimeout); err != nil { + if err = testData.checkCoreDNSPods(defaultTimeout); err != nil { return testData, err, isIPv6 } return testData, nil, isIPv6 diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 88b0b1052d5..ee4ed6d0c02 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -23,10 +23,12 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" ) /* Sample output from the collector: @@ -90,10 +92,9 @@ AntreaProxy enabled (Inter-Node): Flow record from destination Node is ignored, */ const ( - ingressNetworkPolicyName = "test-flow-aggregator-networkpolicy-ingress" - egressNetworkPolicyName = "test-flow-aggregator-networkpolicy-egress" - collectorCheckDelay = 5 * time.Second - expectedNumTemplateRecords = 1 + ingressNetworkPolicyName = "test-flow-aggregator-networkpolicy-ingress" + egressNetworkPolicyName = "test-flow-aggregator-networkpolicy-egress" + collectorCheckTimeout = 10 * time.Second // Single iperf run results in two connections with separate ports (control connection and actual data connection). // As 5s is export interval and iperf traffic runs for 10s, we expect about 4 records exporting to the flow aggregator. // Since flow aggregator will aggregate records based on 5-tuple connection key, we expect 2 records. @@ -104,6 +105,7 @@ func TestFlowAggregator(t *testing.T) { // TODO: remove this limitation after flow aggregator supports IPv6 skipIfIPv6Cluster(t) skipIfNotIPv4Cluster(t) + skipIfProviderIs(t, "remote", "This test is not yet supported in jenkins e2e runs.") data, err, isIPv6 := setupTestWithIPFIXCollector(t) if err != nil { t.Fatalf("Error when setting up test: %v", err) @@ -138,12 +140,13 @@ func TestFlowAggregator(t *testing.T) { } }) - // InterNodeFlows tests the case, where Pods are deployed on different Nodes and their flow information is exported as IPFIX flow records. + // InterNodeFlows tests the case, where Pods are deployed on different Nodes + // and their flow information is exported as IPFIX flow records. t.Run("InterNodeFlows", func(t *testing.T) { if !isIPv6 { - checkRecordsForFlows(t, data, podAIP.ipv4.String(), podCIP.ipv4.String(), isIPv6, false, false, false) + checkRecordsForFlows(t, data, podAIP.ipv4.String(), podCIP.ipv4.String(), isIPv6, false, false, true) } else { - checkRecordsForFlows(t, data, podAIP.ipv6.String(), podCIP.ipv6.String(), isIPv6, false, false, false) + checkRecordsForFlows(t, data, podAIP.ipv6.String(), podCIP.ipv6.String(), isIPv6, false, false, true) } }) @@ -169,6 +172,7 @@ func TestFlowAggregator(t *testing.T) { } func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP string, isIPv6 bool, isIntraNode bool, checkService bool, checkNetworkPolicy bool) { + timeStart := time.Now() var cmdStr string if !isIPv6 { cmdStr = fmt.Sprintf("iperf3 -c %s|grep sender|awk '{print $7,$8}'", dstIP) @@ -181,64 +185,66 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri } bandwidth := strings.TrimSpace(stdout) - // Adding some delay to make sure all the data records corresponding to iperf flow are received. - time.Sleep(collectorCheckDelay) + // Polling to make sure all the data records corresponding to iperf flow are received. + err = wait.Poll(250*time.Millisecond, collectorCheckTimeout, func() (bool, error) { + rc, collectorOutput, _, err := provider.RunCommandOnNode(masterNodeName(), fmt.Sprintf("kubectl logs --since=%v ipfix-collector -n antrea-test", time.Since(timeStart).String())) + if err != nil || rc != 0 { + return false, err + } + return strings.Contains(collectorOutput, srcIP) && strings.Contains(collectorOutput, dstIP), nil + }) + require.NoError(t, err, "IPFIX collector did not receive expected number of data records and timed out.") - rc, collectorOutput, _, err := provider.RunCommandOnNode(masterNodeName(), fmt.Sprintf("kubectl logs ipfix-collector -n antrea-test")) + rc, collectorOutput, _, err := provider.RunCommandOnNode(masterNodeName(), fmt.Sprintf("kubectl logs --since=%v ipfix-collector -n antrea-test", time.Since(timeStart).String())) if err != nil || rc != 0 { t.Errorf("Error when getting logs %v, rc: %v", err, rc) } recordSlices := getRecordsFromOutput(collectorOutput) // Iterate over recordSlices and build some results to test with expected results - templateRecords := 0 dataRecordsCount := 0 for _, record := range recordSlices { - if strings.Contains(record, "TEMPLATE RECORD") { - templateRecords = templateRecords + 1 - } - if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) { dataRecordsCount = dataRecordsCount + 1 - // Check if record has both Pod name of source and destination pod. - if !strings.Contains(record, "perftest-a") { - t.Errorf("Record with srcIP does not have Pod name") - } - if !strings.Contains(record, "perftest-b") && isIntraNode { - t.Errorf("Record with dstIP does not have Pod name") - } - if !strings.Contains(record, "perftest-c") && !isIntraNode { - t.Errorf("Record with dstIP does not have Pod name") - } - - if checkService { - if !strings.Contains(record, "antrea-test/perftest-b") && isIntraNode { - t.Errorf("Record with ServiceIP does not have Service name") - } - if !strings.Contains(record, "antrea-test/perftest-c") && !isIntraNode { - t.Errorf("Record with ServiceIP does not have Service name") - } - } - if !strings.Contains(record, testNamespace) { - t.Errorf("Record does not have Pod Namespace") - } // In Kind clusters, there are two flow records for the iperf flow. // One of them has no bytes and we ignore that flow record. - if checkNetworkPolicy && !strings.Contains(record, "octetDeltaCount: 0") { - // Check if records have both ingress and egress network policies. - if !strings.Contains(record, ingressNetworkPolicyName) { - t.Errorf("Record does not have NetworkPolicy name with ingress rule") + if !strings.Contains(record, "octetTotalCount: 0") { + // Check if record has both Pod name of source and destination pod. + if isIntraNode { + checkPodAndNodeData(t, record, "perftest-a", masterNodeName(), "perftest-b", masterNodeName()) + } else { + checkPodAndNodeData(t, record, "perftest-a", masterNodeName(), "perftest-c", workerNodeName(1)) } - if !strings.Contains(record, egressNetworkPolicyName) { - t.Errorf("Record does not have NetworkPolicy name with egress rule") + + if checkService { + if isIntraNode { + if !strings.Contains(record, "antrea-test/perftest-b") { + t.Errorf("Record with ServiceIP does not have Service name") + } + } else { + if !strings.Contains(record, "antrea-test/perftest-c") { + t.Errorf("Record with ServiceIP does not have Service name") + } + } } - } - // Check the bandwidth using octetDeltaCount in data records sent in second ipfix interval - if strings.Contains(record, "seqno=2") || strings.Contains(record, "seqno=3") { - // In Kind clusters, there are two flow records for the iperf flow. - // One of them has no bytes and we ignore that flow record. + if checkNetworkPolicy && isIntraNode { + // Check if records have both ingress and egress network policies. + if !strings.Contains(record, ingressNetworkPolicyName) { + t.Errorf("Record does not have NetworkPolicy name with ingress rule") + } + if !strings.Contains(record, fmt.Sprintf("%s: %s", "ingressNetworkPolicyNamespace", testNamespace)) { + t.Errorf("Record does not have correct ingressNetworkPolicyNamespace") + } + if !strings.Contains(record, egressNetworkPolicyName) { + t.Errorf("Record does not have NetworkPolicy name with egress rule") + } + if !strings.Contains(record, fmt.Sprintf("%s: %s", "egressNetworkPolicyNamespace", testNamespace)) { + t.Errorf("Record does not have correct egressNetworkPolicyNamespace") + } + } + // Check the bandwidth using octetDeltaCount in data records if !strings.Contains(record, "octetDeltaCount: 0") { - //split the record in lines to compute bandwidth + // Split the record in lines to compute bandwidth splitLines := strings.Split(record, "\n") for _, line := range splitLines { if strings.Contains(line, "octetDeltaCount") { @@ -263,47 +269,32 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri } } } - // Check the aggregation results in infra tests - if !strings.Contains(record, fmt.Sprintf("%s: %s", "destinationPodNamespace", testNamespace)) { - t.Errorf("Record does not have correct destinationPodNamespace") - } - if isIntraNode { - if !strings.Contains(record, fmt.Sprintf("%s: %s", "destinationPodName", "perftest-b")) { - t.Errorf("Record does not have correct destinationPodName") - } - if !strings.Contains(record, fmt.Sprintf("%s: %s", "destinationNodeName", masterNodeName())) { - t.Errorf("Record does not have correct destinationNodeName") - } - if checkService { - if !strings.Contains(record, fmt.Sprintf("%s: %s", "destinationServicePortName", "antrea-test/perftest-b")) { - t.Errorf("Record does not have correct destinationServicePortName") - } - } - } else { - if !strings.Contains(record, fmt.Sprintf("%s: %s", "destinationPodName", "perftest-c")) { - t.Errorf("Record does not have correct destinationPodName") - } - if !strings.Contains(record, fmt.Sprintf("%s: %s", "destinationNodeName", workerNodeName(1))) { - t.Errorf("Record does not have correct destinationNodeName") - } - if checkService { - if !strings.Contains(record, fmt.Sprintf("%s: %s", "destinationServicePortName", "antrea-test/perftest-c")) { - t.Errorf("Record does not have correct destinationServicePortName") - } - } - } - if checkNetworkPolicy && !strings.Contains(record, "octetDeltaCount: 0") { - if !strings.Contains(record, fmt.Sprintf("%s: %s", "ingressNetworkPolicyName", ingressNetworkPolicyName)) { - t.Errorf("Record does not have correct ingressNetworkPolicyName") - } - if !strings.Contains(record, fmt.Sprintf("%s: %s", "ingressNetworkPolicyNamespace", testNamespace)) { - t.Errorf("Record does not have correct ingressNetworkPolicyNamespace") - } - } } } - assert.Equal(t, expectedNumTemplateRecords, templateRecords, "Flow aggregator should send out 1 template record") - assert.GreaterOrEqual(t, dataRecordsCount, expectedNumDataRecords, "IPFIX collector should receive expected number of flow records") + // Checking only data records as data records cannot be decoded without template + // record. + assert.GreaterOrEqualf(t, dataRecordsCount, expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: ", len(recordSlices)) +} + +func checkPodAndNodeData(t *testing.T, record, srcPod, srcNode, dstPod, dstNode string) { + if !strings.Contains(record, srcPod) { + t.Errorf("Record with srcIP does not have Pod name") + } + if !strings.Contains(record, fmt.Sprintf("%s: %s", "sourcePodNamespace", testNamespace)) { + t.Errorf("Record does not have correct sourcePodNamespace") + } + if !strings.Contains(record, fmt.Sprintf("%s: %s", "sourceNodeName", srcNode)) { + t.Errorf("Record does not have correct sourceNodeName") + } + if !strings.Contains(record, dstPod) { + t.Errorf("Record with dstIP does not have Pod name") + } + if !strings.Contains(record, fmt.Sprintf("%s: %s", "destinationPodNamespace", testNamespace)) { + t.Errorf("Record does not have correct destinationPodNamespace") + } + if !strings.Contains(record, fmt.Sprintf("%s: %s", "destinationNodeName", dstNode)) { + t.Errorf("Record does not have correct destinationNodeName") + } } func getRecordsFromOutput(output string) []string { diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 13c24b4864a..41feb008b0a 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -417,28 +417,32 @@ func (data *TestData) deployAntreaFlowExporter(ipfixCollector string) error { return data.mutateAntreaConfigMap(func(data map[string]string) { antreaAgentConf, _ := data["antrea-agent.conf"] antreaAgentConf = strings.Replace(antreaAgentConf, "# FlowExporter: false", " FlowExporter: true", 1) - antreaAgentConf = strings.Replace(antreaAgentConf, "#flowCollectorAddr: \"flow-aggregator.flow-aggregator.svc:tcp\"", fmt.Sprintf("flowCollectorAddr: \"%s\"", ipfixCollector), 1) + if ipfixCollector != "" { + antreaAgentConf = strings.Replace(antreaAgentConf, "#flowCollectorAddr: \"flow-aggregator.flow-aggregator.svc:4739:tcp\"", fmt.Sprintf("flowCollectorAddr: \"%s\"", ipfixCollector), 1) + } antreaAgentConf = strings.Replace(antreaAgentConf, "#flowPollInterval: \"5s\"", "flowPollInterval: \"1s\"", 1) - antreaAgentConf = strings.Replace(antreaAgentConf, "#flowExportFrequency: 12", "flowExportFrequency: 5", 1) + antreaAgentConf = strings.Replace(antreaAgentConf, "#flowExportFrequency: 12", "flowExportFrequency: 2", 1) data["antrea-agent.conf"] = antreaAgentConf }, false, true) } // deployFlowAggregator deploys flow aggregator with ipfix collector address. -func (data *TestData) deployFlowAggregator(ipfixCollector string) error { +func (data *TestData) deployFlowAggregator(ipfixCollector string) (string, error) { rc, _, _, err := provider.RunCommandOnNode(masterNodeName(), fmt.Sprintf("kubectl apply -f %s", flowaggregatorYML)) if err != nil || rc != 0 { - return fmt.Errorf("error when deploying flow aggregator; is %s available on the master Node?", flowaggregatorYML) + return "", fmt.Errorf("error when deploying flow aggregator; %s not available on the master Node", flowaggregatorYML) } - err = data.mutateFlowAggregatorConfigMap(ipfixCollector) - if err != nil { - return err + if err = data.mutateFlowAggregatorConfigMap(ipfixCollector); err != nil { + return "", err } - rc, _, _, err = provider.RunCommandOnNode(masterNodeName(), fmt.Sprintf("kubectl -n %s rollout status deployment/%s --timeout=%v", flowAggregatorNamespace, flowAggregatorDeployment, defaultTimeout)) - if err != nil || rc != 0 { - return fmt.Errorf("error when waiting for flow aggregator rollout to complete") + if rc, _, _, err = provider.RunCommandOnNode(masterNodeName(), fmt.Sprintf("kubectl -n %s rollout status deployment/%s --timeout=%v", flowAggregatorNamespace, flowAggregatorDeployment, 2*defaultTimeout)); err != nil || rc != 0 { + return "", fmt.Errorf("error when waiting for flow aggregator rollout to complete") } - return nil + svc, err := data.clientset.CoreV1().Services(flowAggregatorNamespace).Get(context.TODO(), flowAggregatorDeployment, metav1.GetOptions{}) + if err != nil { + return "", fmt.Errorf("unable to get service %v: %v", flowAggregatorDeployment, err) + } + return svc.Spec.ClusterIP, nil } func (data *TestData) mutateFlowAggregatorConfigMap(ipfixCollector string) error { diff --git a/test/e2e/infra/vagrant/push_antrea.sh b/test/e2e/infra/vagrant/push_antrea.sh index 6707217ba49..39fdf109c52 100755 --- a/test/e2e/infra/vagrant/push_antrea.sh +++ b/test/e2e/infra/vagrant/push_antrea.sh @@ -6,7 +6,7 @@ function usage() { --prometheus Deploy Prometheus service to scrape metrics from Antrea Agents and Controllers --flow-collector Provide the IPFIX flow collector address to collect the flows from the Flow Aggregator service It should be given in the format IP:port:proto. Example: 192.168.1.100:4739:udp - Please note that with this option we deploy the Flow Aggregator service along with the Antrea daemonset." + Please note that with this option we deploy the Flow Aggregator Service along with Antrea." } # Process execution flags