Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flow-Aggregator] Add correlate fields and fix e2e tests with stats and network policy #1682

Merged
merged 2 commits into from
Dec 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ func (exp *flowExporter) sendFlowRecords() error {
return err
}
}
if err := exp.flowRecords.ValidateAndUpdateStats(key, record); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh.. this was there before right? That is a good amount of logic missing :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea.. I missed this part when changing the code.

return err
}
return nil
}
err := exp.flowRecords.ForAllFlowRecordsDo(addAndSendFlowRecord)
Expand Down
3 changes: 3 additions & 0 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,12 @@ var (
"destinationPodNamespace",
"destinationNodeName",
"destinationClusterIPv4",
"destinationServicePort",
"destinationServicePortName",
"ingressNetworkPolicyName",
"ingressNetworkPolicyNamespace",
"egressNetworkPolicyName",
"egressNetworkPolicyNamespace",
}
)

Expand Down
28 changes: 22 additions & 6 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestFlowAggregator(t *testing.T) {

// IntraNodeFlows tests the case, where Pods are deployed on same Node and their flow information is exported as IPFIX flow records.
t.Run("IntraNodeFlows", func(t *testing.T) {
np1, np2 := deployNetworkPolicies(t, data)
np1, np2 := deployNetworkPolicies(t, data, "perftest-a", "perftest-b")
defer func() {
if np1 != nil {
if err = data.deleteNetworkpolicy(np1); err != nil {
Expand All @@ -143,6 +143,19 @@ func TestFlowAggregator(t *testing.T) {
// InterNodeFlows tests the case, where Pods are deployed on different Nodes
// and their flow information is exported as IPFIX flow records.
t.Run("InterNodeFlows", func(t *testing.T) {
np1, np2 := deployNetworkPolicies(t, data, "perftest-a", "perftest-c")
defer func() {
if np1 != nil {
if err = data.deleteNetworkpolicy(np1); err != nil {
t.Errorf("Error when deleting network policy: %v", err)
}
}
if np2 != nil {
if err = data.deleteNetworkpolicy(np2); err != nil {
t.Errorf("Error when deleting network policy: %v", err)
}
}
}()
if !isIPv6 {
checkRecordsForFlows(t, data, podAIP.ipv4.String(), podCIP.ipv4.String(), isIPv6, false, false, true)
} else {
Expand Down Expand Up @@ -227,7 +240,7 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
}
}
}
if checkNetworkPolicy && isIntraNode {
if checkNetworkPolicy {
// Check if records have both ingress and egress network policies.
if !strings.Contains(record, ingressNetworkPolicyName) {
t.Errorf("Record does not have NetworkPolicy name with ingress rule")
Expand All @@ -247,7 +260,7 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
// Split the record in lines to compute bandwidth
splitLines := strings.Split(record, "\n")
for _, line := range splitLines {
if strings.Contains(line, "octetDeltaCount") {
if strings.Contains(line, "octetDeltaCount:") {
lineSlice := strings.Split(line, ":")
deltaBytes, err := strconv.ParseFloat(strings.TrimSpace(lineSlice[1]), 64)
if err != nil {
Expand All @@ -261,6 +274,9 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
if err != nil {
t.Errorf("Error in converting iperf bandwidth to float64 type")
}
if strings.Contains(bwSlice[1], "Mbits") {
iperfBandwidth = iperfBandwidth / float64(1000)
}
t.Logf("Iperf bandwidth: %v", iperfBandwidth)
t.Logf("IPFIX record bandwidth: %v", recBandwidth)
assert.InDeltaf(t, recBandwidth, iperfBandwidth, 5, "Difference between Iperf bandwidth and IPFIX record bandwidth should be less than 5Gb/s")
Expand Down Expand Up @@ -309,7 +325,7 @@ func getRecordsFromOutput(output string) []string {
return recordSlices
}

func deployNetworkPolicies(t *testing.T, data *TestData) (np1 *networkingv1.NetworkPolicy, np2 *networkingv1.NetworkPolicy) {
func deployNetworkPolicies(t *testing.T, data *TestData, srcPod, dstPod string) (np1 *networkingv1.NetworkPolicy, np2 *networkingv1.NetworkPolicy) {
// Add NetworkPolicy between two iperf Pods.
var err error
np1, err = data.createNetworkPolicy(ingressNetworkPolicyName, &networkingv1.NetworkPolicySpec{
Expand All @@ -319,7 +335,7 @@ func deployNetworkPolicies(t *testing.T, data *TestData) (np1 *networkingv1.Netw
From: []networkingv1.NetworkPolicyPeer{{
PodSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"antrea-e2e": "perftest-a",
"antrea-e2e": srcPod,
},
}},
},
Expand All @@ -335,7 +351,7 @@ func deployNetworkPolicies(t *testing.T, data *TestData) (np1 *networkingv1.Netw
To: []networkingv1.NetworkPolicyPeer{{
PodSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"antrea-e2e": "perftest-b",
"antrea-e2e": dstPod,
},
}},
},
Expand Down