Skip to content

Commit

Permalink
Fix flakiness in Kind e2e flow aggregator tests (#2308)
Browse files Browse the repository at this point in the history
- Flows are not getting expired as expected because of zero start time and
last export time is initialized as start time.This commit fixes that bug by
adding current time as start time if it is zero when a new connection is added.

- Idle flow export timeout is not configured properly in the e2e test.
There is a bug in the test and this commit fixes that.

- The expected number of records is not correct in Kind clusters
as packet counters are not supported in conntrack entries for
OVS userspace datapath. Flow exporter cannot send records after
active timeout expiry as we cannot consider flows to be active
without the statistics. Therefore, we expect only 2 records for any
given flow in Kind clusters and 3 flow records in other clusters.
We expect this limitation to be resolved soon when OVS
userspace datapath support statistics in conntrack entries.

- Additionally, we consider source port from iperf flows to ignore other
similar flow records from the iperf control flows. Because of this we
were passing the tests in Kind clusters even though there are only two flow
records.

- Kubectl logs command with since option is not outputting the logs of
ipfix collector as expected and this causes intermittent failures. Removing
the since option as we have source port as the gating parameter when processing
flow records.

Signed-off-by: Srikar Tati <stati@vmware.com>
  • Loading branch information
srikartati authored Jul 7, 2021
1 parent 136ecb7 commit a8c7970
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 28 deletions.
2 changes: 0 additions & 2 deletions pkg/agent/flowexporter/connections/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,9 @@ func filterAntreaConns(conns []*flowexporter.Connection, nodeConfig *config.Node

// Consider Pod-to-Pod, Pod-To-Service and Pod-To-External flows.
if srcIP.Equal(nodeConfig.GatewayConfig.IPv4) || dstIP.Equal(nodeConfig.GatewayConfig.IPv4) {
klog.V(4).Infof("Detected flow for which one of the endpoint is host gateway %s :%+v", nodeConfig.GatewayConfig.IPv4.String(), conn)
continue
}
if srcIP.Equal(nodeConfig.GatewayConfig.IPv6) || dstIP.Equal(nodeConfig.GatewayConfig.IPv6) {
klog.V(4).Infof("Detected flow for which one of the endpoint is host gateway %s :%+v", nodeConfig.GatewayConfig.IPv6.String(), conn)
continue
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio
}
}
cs.addNetworkPolicyMetadata(conn)

if conn.StartTime.IsZero() {
conn.StartTime = time.Now()
}
metrics.TotalAntreaConnectionsInConnTrackTable.Inc()
klog.V(4).Infof("New Antrea flow added: %v", conn)
// Add new antrea connection to connection store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
// To test service name mapping.
tuple4 := flowexporter.Tuple{SourceAddress: net.IP{10, 10, 10, 10}, DestinationAddress: net.IP{20, 20, 20, 20}, Protocol: 6, SourcePort: 5000, DestinationPort: 80}
testFlow4 := flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 50)),
StopTime: refTime,
FlowKey: tuple4,
Mark: openflow.ServiceCTMark,
IsPresent: true,
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ func (exp *flowExporter) sendFlowRecords() error {
} else {
exp.flowRecords.ValidateAndUpdateStats(key, record)
}
klog.V(4).InfoS("Record sent successfully", "flowKey", key, "record", record)
}
return nil
}
Expand All @@ -330,6 +331,7 @@ func (exp *flowExporter) sendFlowRecords() error {
return err
}
exp.numDataSetsSent = exp.numDataSetsSent + 1
klog.V(4).InfoS("Record for deny connection sent successfully", "flowKey", connKey, "connection", conn)
exp.denyConnStore.ResetConnStatsWithoutLock(connKey)
}
if time.Since(conn.LastExportTime) >= exp.idleFlowTimeout {
Expand All @@ -340,6 +342,7 @@ func (exp *flowExporter) sendFlowRecords() error {
return err
}
exp.numDataSetsSent = exp.numDataSetsSent + 1
klog.V(4).InfoS("Record for deny connection sent successfully", "flowKey", connKey, "connection", conn)
exp.denyConnStore.DeleteConnWithoutLock(connKey)
}
return nil
Expand Down
84 changes: 61 additions & 23 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ const (
testIngressRuleName = "test-ingress-rule-name"
testEgressRuleName = "test-egress-rule-name"
iperfTimeSec = 12
)

var (
// Single iperf run results in two connections with separate ports (control connection and actual data connection).
// As 2s is the export active timeout of flow exporter and iperf traffic runs for 12s, we expect totally 12 records
// exporting to the flow aggregator at time 2s, 4s, 6s, 8s, 10s, and 12s after iperf traffic begins.
Expand All @@ -144,6 +147,21 @@ func TestFlowAggregator(t *testing.T) {
defer teardownTest(t, data)
defer teardownFlowAggregator(t, data)

if testOptions.providerName == "kind" {
// Currently, in Kind clusters, OVS userspace datapath does not support
// packet statistics in the conntrack entries. Because of that Flow Exporter
// at Antrea agent cannot consider flows to be active and keep sending active
// records. Currently, Flow Exporter sends two records for a iperf flow
// in kind cluster with a duration of 12s: 1. A new iperf connection gets
// idled out after exporter idle timeout, which is after 1s in the test.
// In this case, flow aggregator sends the record after 4.5s 2. When the
// connection dies and TCP state becomes TIME_WAIT, which is
// at 12s in the test. Here, Flow Aggregator sends the record at 15.5s.
// We will remove this workaround once OVS userspace datapath supports packet
// statistics in conntrack entries.
expectedNumDataRecords = 2
}

k8sUtils, err = NewKubernetesUtils(data)
if err != nil {
t.Fatalf("Error when creating Kubernetes utils client: %v", err)
Expand Down Expand Up @@ -487,16 +505,15 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
timeStartSec := timeStart.Unix()
var cmdStr string
if !isIPv6 {
cmdStr = fmt.Sprintf("iperf3 -c %s -t %d|grep sender|awk '{print $7,$8}'", dstIP, iperfTimeSec)
cmdStr = fmt.Sprintf("iperf3 -c %s -t %d", dstIP, iperfTimeSec)
} else {
cmdStr = fmt.Sprintf("iperf3 -6 -c %s -t %d|grep sender|awk '{print $7,$8}'", dstIP, iperfTimeSec)
cmdStr = fmt.Sprintf("iperf3 -6 -c %s -t %d", dstIP, iperfTimeSec)
}
stdout, _, err := data.runCommandFromPod(testNamespace, "perftest-a", "perftool", []string{"bash", "-c", cmdStr})
if err != nil {
t.Errorf("Error when running iperf3 client: %v", err)
}
bandwidth := strings.TrimSpace(stdout)
bwSlice := strings.Split(bandwidth, " ")
bwSlice, srcPort := getBandwidthAndSourcePort(stdout)
// bandwidth from iperf output
bandwidthInFloat, err := strconv.ParseFloat(bwSlice[0], 64)
require.NoErrorf(t, err, "Error when converting iperf bandwidth %s to float64 type", bwSlice[0])
Expand All @@ -509,13 +526,15 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
t.Fatalf("Unit of the traffic bandwidth reported by iperf should either be Mbits or Gbits, failing the test.")
}

collectorOutput := getCollectorOutput(t, srcIP, dstIP, timeStart, true)
collectorOutput, recordSlices := getCollectorOutput(t, srcIP, dstIP, srcPort, timeStart, true)
// Iterate over recordSlices and build some results to test with expected results
recordSlices := getRecordsFromOutput(collectorOutput)
dataRecordsCount := 0
var octetTotalCount uint64
for _, record := range recordSlices {
if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) {
// Check the source port along with source and destination IPs as there
// are flow records for control flows during the iperf with same IPs
// and destination port.
if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) && strings.Contains(record, srcPort) {
dataRecordsCount = dataRecordsCount + 1
// Check if record has both Pod name of source and destination Pod.
if isIntraNode {
Expand Down Expand Up @@ -581,7 +600,7 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
}
// Checking only data records as data records cannot be decoded without template
// record.
assert.GreaterOrEqualf(t, dataRecordsCount, expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: ", len(recordSlices))
assert.GreaterOrEqualf(t, dataRecordsCount, expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: %s \n Collector output: %s", recordSlices, collectorOutput)
}

func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName string, srcPodName string, srcIP string, dstIP string, dstPort int32, isIPv6 bool) {
Expand All @@ -595,8 +614,7 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st
stdout, stderr, err := data.runCommandFromPod(testNamespace, srcPodName, busyboxContainerName, strings.Fields(cmd))
require.NoErrorf(t, err, "Error when running wget command, stdout: %s, stderr: %s", stdout, stderr)

collectorOutput := getCollectorOutput(t, srcIP, dstIP, timeStart, false)
recordSlices := getRecordsFromOutput(collectorOutput)
_, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", timeStart, false)
for _, record := range recordSlices {
if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) {
checkPodAndNodeData(t, record, srcPodName, srcNodeName, "", "")
Expand Down Expand Up @@ -625,9 +643,8 @@ func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2
_, _, err = data.runCommandFromPod(testNamespace, testFlow2.srcPodName, "", []string{"timeout", "2", "bash", "-c", cmdStr2})
assert.Error(t, err)

collectorOutput := getCollectorOutput(t, testFlow1.srcIP, testFlow2.srcIP, timeStart, false)
_, recordSlices := getCollectorOutput(t, testFlow1.srcIP, testFlow2.srcIP, "", timeStart, false)
// Iterate over recordSlices and build some results to test with expected results
recordSlices := getRecordsFromOutput(collectorOutput)
for _, record := range recordSlices {
var srcPodName, dstPodName string
if strings.Contains(record, testFlow1.srcIP) && strings.Contains(record, testFlow1.dstIP) {
Expand Down Expand Up @@ -730,45 +747,46 @@ func getUnit64FieldFromRecord(t *testing.T, record string, field string) uint64
return 0
}

func getCollectorOutput(t *testing.T, srcIP string, dstIP string, timeStart time.Time, checkAllRecords bool) string {
// getCollectorOutput polls the output of go-ipfix collector and checks if we have
// received all the expected records for a given flow with source IP, destination IP
// and source port. We send source port to ignore the control flows during the
// iperf test.
func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, timeStart time.Time, checkAllRecords bool) (string, []string) {
var collectorOutput string
var recordSlices []string
err := wait.PollImmediate(500*time.Millisecond, aggregatorInactiveFlowRecordTimeout, func() (bool, error) {
var rc int
var err error
// `pod-running-timeout` option is added to cover scenarios where ipfix flow-collector has crashed after being deployed
rc, collectorOutput, _, err = provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --since=%v --pod-running-timeout=%v ipfix-collector -n antrea-test", time.Since(timeStart).String(), aggregatorInactiveFlowRecordTimeout.String()))
rc, collectorOutput, _, err = provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --pod-running-timeout=%v ipfix-collector -n antrea-test", aggregatorInactiveFlowRecordTimeout.String()))
if err != nil || rc != 0 {
return false, err
}
// Checking that all the data records which correspond to the iperf flow are received
recordSlices = getRecordsFromOutput(collectorOutput)
if checkAllRecords {
recordSlices := getRecordsFromOutput(collectorOutput)
for _, record := range recordSlices {
exportTime := int64(getUnit64FieldFromRecord(t, record, "flowEndSeconds"))
if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) {
if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) && strings.Contains(record, srcPort) {
if exportTime >= timeStart.Unix()+iperfTimeSec {
return true, nil
}
}
}
return false, nil
} else {
return strings.Contains(collectorOutput, srcIP) && strings.Contains(collectorOutput, dstIP), nil
return strings.Contains(collectorOutput, srcIP) && strings.Contains(collectorOutput, dstIP) && strings.Contains(collectorOutput, srcPort), nil
}
})
require.NoErrorf(t, err, "IPFIX collector did not receive the expected records and timed out with error")
return collectorOutput
require.NoErrorf(t, err, "IPFIX collector did not receive the expected records in collector output: %v time start: %s iperf source port: %s", collectorOutput, timeStart.String(), srcPort)
return collectorOutput, recordSlices
}

func getRecordsFromOutput(output string) []string {
re := regexp.MustCompile("(?m)^.*" + "#" + ".*$[\r\n]+")
output = re.ReplaceAllString(output, "")
output = strings.TrimSpace(output)
recordSlices := strings.Split(output, "IPFIX-HDR:")
// Delete the first element from recordSlices
recordSlices[0] = recordSlices[len(recordSlices)-1]
recordSlices[len(recordSlices)-1] = ""
recordSlices = recordSlices[:len(recordSlices)-1]
return recordSlices
}

Expand Down Expand Up @@ -1001,3 +1019,23 @@ func deletePerftestServices(t *testing.T, data *TestData) {
}
}
}

// getBandwidthAndSourcePort parses iperf commands output and returns bandwidth
// and source port. Bandwidth is returned as a slice containing two strings (bandwidth
// value and bandwidth unit).
func getBandwidthAndSourcePort(iperfStdout string) ([]string, string) {
var bandwidth []string
var srcPort string
outputLines := strings.Split(iperfStdout, "\n")
for _, line := range outputLines {
if strings.Contains(line, "sender") {
fields := strings.Fields(line)
bandwidth = fields[6:8]
}
if strings.Contains(line, "connected") {
fields := strings.Fields(line)
srcPort = fields[5]
}
}
return bandwidth, srcPort
}
4 changes: 2 additions & 2 deletions test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ const (
nginxLBService = "nginx-loadbalancer"

exporterActiveFlowExportTimeout = 2 * time.Second
exporterInactiveFlowExportTimeout = 1 * time.Second
exporterIdleFlowExportTimeout = 1 * time.Second
aggregatorActiveFlowRecordTimeout = 3500 * time.Millisecond
aggregatorInactiveFlowRecordTimeout = 6 * time.Second
)
Expand Down Expand Up @@ -571,7 +571,7 @@ func (data *TestData) deployAntreaFlowExporter(ipfixCollector string) error {
{"FlowExporter", "true", true},
{"flowPollInterval", "\"1s\"", false},
{"activeFlowExportTimeout", fmt.Sprintf("\"%v\"", exporterActiveFlowExportTimeout), false},
{"inactiveFlowExportTimeout", fmt.Sprintf("\"%v\"", exporterInactiveFlowExportTimeout), false},
{"idleFlowExportTimeout", fmt.Sprintf("\"%v\"", exporterIdleFlowExportTimeout), false},
}
if ipfixCollector != "" {
ac = append(ac, configChange{"flowCollectorAddr", fmt.Sprintf("\"%s\"", ipfixCollector), false})
Expand Down

0 comments on commit a8c7970

Please sign in to comment.