Skip to content

Commit

Permalink
Add e2e tests for flow aggregator (#1628)
Browse files Browse the repository at this point in the history
In this commit, we add e2e tests for the flow aggregator. For these
tests, we setup an environment where the flow exporters in antrea agents
export flow records to the flow aggregator. The flow aggregator will run
the collecting, aggregating jobs and will export records to the external
IPFIX collector.

The existing "flow exporter" tests (where the agents export to the
external collector directly) are replaced with "flow aggregator" tests,
as we assume that the aggregator is always deployed.

Also, we removed the configuration in the agents causing flow records to
only be exported from the source node since we now have the aggregator
to correlate and de-duplicate flows. Related commit and PR are b44fd78,
PR#1268.
  • Loading branch information
Yongming Ding authored and srikartati committed Dec 19, 2020
1 parent d2d50a1 commit 9daeca9
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 128 deletions.
2 changes: 2 additions & 0 deletions ci/kind/test-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ function print_usage {

TESTBED_CMD=$(dirname $0)"/kind-setup.sh"
YML_CMD=$(dirname $0)"/../../hack/generate-manifest.sh"
FLOWAGGREGATOR_YML_CMD=$(dirname $0)"/../../hack/generate-manifest-flow-aggregator.sh"

function quit {
if [[ $? != 0 ]]; then
Expand Down Expand Up @@ -115,6 +116,7 @@ function run_test {
else
$YML_CMD --kind --encap-mode $current_mode $manifest_args | docker exec -i kind-control-plane dd of=/root/antrea.yml
fi
$FLOWAGGREGATOR_YML_CMD | docker exec -i kind-control-plane dd of=/root/flow-aggregator.yml
sleep 1
if $coverage; then
go test -v -timeout=35m github.com/vmware-tanzu/antrea/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --coverage --coverage-dir $ANTREA_COV_DIR
Expand Down
13 changes: 0 additions & 13 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,6 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
conn.DestinationPodNamespace = dIface.ContainerInterfaceConfig.PodNamespace
}

// Do not export the flow records of connections whose destination is local
// Pod and source is remote Pod. We export flow records only from source node,
// where the connection originates from. This is to avoid duplicate copies
// of flow records at flow collector. This restriction will be removed when
// flow aggregator is implemented. We miss some key information such as
// destination Pod info, ingress NetworkPolicy info, stats from destination
// node etc.
// TODO: Remove this when flow aggregator that correlates the flow records
// is implemented.
if !srcFound && dstFound {
conn.DoExport = false
}

// Process Pod-to-Service flows when Antrea Proxy is enabled.
if cs.antreaProxier != nil {
if conn.Mark == openflow.ServiceCTMark {
Expand Down
44 changes: 23 additions & 21 deletions test/e2e/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package e2e

import (
"fmt"
"net"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -130,42 +129,38 @@ func setupTest(tb testing.TB) (*TestData, error) {
}

func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, error, bool) {
data := &TestData{}
// TODO: remove hardcoding to IPv4 after flow aggregator supports IPv6
isIPv6 := false
if err := data.setupLogDirectoryForTest(tb.Name()); err != nil {
tb.Errorf("Error creating logs directory '%s': %v", data.logsDirForTestCase, err)
return nil, err, isIPv6
}
tb.Logf("Creating K8s clientset")
if err := data.createClient(); err != nil {
if err := testData.setupLogDirectoryForTest(tb.Name()); err != nil {
tb.Errorf("Error creating logs directory '%s': %v", testData.logsDirForTestCase, err)
return nil, err, isIPv6
}
tb.Logf("Creating '%s' K8s Namespace", testNamespace)
if err := data.createTestNamespace(); err != nil {
if err := testData.createTestNamespace(); err != nil {
return nil, err, isIPv6
}
// Create pod using ipfix collector image
if err := data.createPodOnNode("ipfix-collector", "", ipfixCollectorImage, nil, nil, nil, nil, true, nil); err != nil {
if err := testData.createPodOnNode("ipfix-collector", "", ipfixCollectorImage, nil, nil, nil, nil, true, nil); err != nil {
tb.Errorf("Error when creating the ipfix collector Pod: %v", err)
}
ipfixCollectorIP, err := data.podWaitForIPs(defaultTimeout, "ipfix-collector", testNamespace)
ipfixCollectorIP, err := testData.podWaitForIPs(defaultTimeout, "ipfix-collector", testNamespace)
if err != nil || len(ipfixCollectorIP.ipStrings) == 0 {
tb.Errorf("Error when waiting to get ipfix collector Pod IP: %v", err)
}
tb.Logf("Applying Antrea YAML with ipfix collector address")
ipStr := ipfixCollectorIP.ipStrings[0]
if net.ParseIP(ipStr).To4() == nil {
ipStr = fmt.Sprintf("[%s]", ipStr)
isIPv6 = true
ipStr := ipfixCollectorIP.ipv4.String()
tb.Logf("Applying flow aggregator YAML with ipfix collector address: %s", ipStr)
if err := testData.deployFlowAggregator(fmt.Sprintf("%s:%s:tcp", ipStr, ipfixCollectorPort)); err != nil {
return testData, err, isIPv6
}
if err := data.deployAntreaFlowExporter(fmt.Sprintf("%s:%s:tcp", ipStr, ipfixCollectorPort)); err != nil {
return data, err, isIPv6
tb.Logf("Deploying flow exporter")
if err := testData.deployAntreaFlowExporter(""); err != nil {
return testData, err, isIPv6
}
tb.Logf("Checking CoreDNS deployment")
if err := data.checkCoreDNSPods(defaultTimeout); err != nil {
return data, err, isIPv6
if err := testData.checkCoreDNSPods(defaultTimeout); err != nil {
return testData, err, isIPv6
}
return data, nil, isIPv6
return testData, nil, isIPv6
}

func exportLogs(tb testing.TB, data *TestData, logsSubDir string, writeNodeLogs bool) {
Expand Down Expand Up @@ -287,6 +282,13 @@ func exportLogs(tb testing.TB, data *TestData, logsSubDir string, writeNodeLogs
}
}

func teardownFlowAggregator(tb testing.TB, data *TestData) {
tb.Logf("Deleting '%s' K8s Namespace", flowAggregatorNamespace)
if err := data.deleteNamespace(flowAggregatorNamespace, defaultTimeout); err != nil {
tb.Logf("Error when tearing down flow aggregator: %v", err)
}
}

func teardownTest(tb testing.TB, data *TestData) {
exportLogs(tb, data, "beforeTeardown", true)
if empty, _ := IsDirEmpty(data.logsDirForTestCase); empty {
Expand Down
Loading

0 comments on commit 9daeca9

Please sign in to comment.