From c9d365d3ff7db80d2dc71644aa59f32685de959e Mon Sep 17 00:00:00 2001 From: Srikar Tati Date: Mon, 10 Aug 2020 17:08:52 -0700 Subject: [PATCH] Addreseed the comments --- cmd/antrea-agent/agent.go | 2 +- .../flowexporter/connections/conntrack.go | 3 +- .../flowexporter/connections/conntrack_ovs.go | 6 +- pkg/agent/flowexporter/exporter/exporter.go | 63 ++++++++++++------- test/e2e/flowexporter_test.go | 2 +- test/e2e/framework.go | 17 +---- 6 files changed, 47 insertions(+), 46 deletions(-) diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 911d49cf46e..451a05d9776 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -252,7 +252,7 @@ func run(o *Options) error { flowExporter := exporter.NewFlowExporter( flowrecords.NewFlowRecords(connStore), o.config.FlowExportFrequency) - go wait.Until(func() { flowExporter.CheckAndDoExport(o.flowCollector, pollDone) }, o.pollInterval, stopCh) + go wait.Until(func() { flowExporter.Export(o.flowCollector, stopCh, pollDone) }, 0, stopCh) } <-stopCh diff --git a/pkg/agent/flowexporter/connections/conntrack.go b/pkg/agent/flowexporter/connections/conntrack.go index 593836c33ed..270e56e3817 100644 --- a/pkg/agent/flowexporter/connections/conntrack.go +++ b/pkg/agent/flowexporter/connections/conntrack.go @@ -25,7 +25,7 @@ import ( "github.com/vmware-tanzu/antrea/pkg/ovs/ovsctl" ) -// InitializeConnTrackDumper initialize the ConnTrackDumper interface for different OS and datapath types. +// InitializeConnTrackDumper initializes the ConnTrackDumper interface for different OS and datapath types. func InitializeConnTrackDumper(nodeConfig *config.NodeConfig, serviceCIDR *net.IPNet, ovsctlClient ovsctl.OVSCtlClient, ovsDatapathType string) ConnTrackDumper { var connTrackDumper ConnTrackDumper if ovsDatapathType == ovsconfig.OVSDatapathSystem { @@ -60,6 +60,7 @@ func filterAntreaConns(conns []*flowexporter.Connection, nodeConfig *config.Node // Conntrack flows will be different for Pod-to-Service flows w/ Antrea-proxy. This implementation will be simpler, when the // Antrea proxy is supported. if serviceCIDR.Contains(srcIP) || serviceCIDR.Contains(dstIP) { + klog.V(4).Infof("Detected a flow with Cluster IP :%v", conn) continue } filteredConns = append(filteredConns, conn) diff --git a/pkg/agent/flowexporter/connections/conntrack_ovs.go b/pkg/agent/flowexporter/connections/conntrack_ovs.go index 38cd6be2c0d..deb770f09e6 100644 --- a/pkg/agent/flowexporter/connections/conntrack_ovs.go +++ b/pkg/agent/flowexporter/connections/conntrack_ovs.go @@ -65,7 +65,7 @@ func (ct *connTrackOvsCtl) DumpFlows(zoneFilter uint16) ([]*flowexporter.Connect } filteredConns := filterAntreaConns(conns, ct.nodeConfig, ct.serviceCIDR, zoneFilter) - klog.V(2).Infof("Flow exporter considered flows: %d", len(filteredConns)) + klog.V(2).Infof("FlowExporter considered flows: %d", len(filteredConns)) return filteredConns, nil } @@ -83,14 +83,14 @@ func (ct *connTrackOvsCtl) ovsAppctlDumpConnections(zoneFilter uint16) ([]*flowe for _, flow := range outputFlow { conn, err := flowStringToAntreaConnection(flow, zoneFilter) if err != nil { - klog.Warningf("Ignoring the flow from conntrack dump due to the error: %v", err) + klog.V(4).Infof("Ignoring the flow from conntrack dump due to parsing error: %v", err) continue } if conn != nil { antreaConns = append(antreaConns, conn) } } - klog.V(2).Infof("Finished dumping -- total no. of flows in conntrack: %d", len(antreaConns)) + klog.V(2).Infof("FlowExporter considered flows in conntrack: %d", len(antreaConns)) return antreaConns, nil } diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index 612189bdf62..d986f24321f 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -91,34 +91,49 @@ func NewFlowExporter(records *flowrecords.FlowRecords, exportFrequency uint) *fl } } -// CheckAndDoExport enables us to export flow records periodically at a given flow export frequency. -func (exp *flowExporter) CheckAndDoExport(collector net.Addr, pollDone chan struct{}) { - // Number of pollDone signals received or poll cycles should be equal to export frequency before starting the export cycle. - // This is necessary because IPFIX collector computes throughput based on flow records received interval. - <-pollDone - exp.pollCycle++ - if exp.pollCycle%exp.exportFrequency == 0 { - if exp.process == nil { - err := exp.initFlowExporter(collector) - if err != nil { - klog.Errorf("Error when initializing flow exporter: %v", err) - return +// DoExport enables us to export flow records periodically at a given flow export frequency. +func (exp *flowExporter) Export(collector net.Addr, stopCh <-chan struct{}, pollDone <-chan struct{}) { + for { + select { + case <-stopCh: + return + case <-pollDone: + // Number of pollDone signals received or poll cycles should be equal to export frequency before starting + // the export cycle. This is necessary because IPFIX collector computes throughput based on flow records received interval. + exp.pollCycle++ + if exp.pollCycle%exp.exportFrequency == 0 { + // Retry to connect to IPFIX collector if the exporting process gets reset + if exp.process == nil { + err := exp.initFlowExporter(collector) + if err != nil { + klog.Errorf("Error when initializing flow exporter: %v", err) + // There could be other errors while initializing flow exporter other than connecting to IPFIX collector, + // therefore closing the connection and resetting the process. + if exp.process != nil { + exp.process.CloseConnToCollector() + exp.process = nil + } + return + } + } + // Build and send flow records to IPFIX collector. + exp.flowRecords.BuildFlowRecords() + err := exp.sendFlowRecords() + if err != nil { + klog.Errorf("Error when sending flow records: %v", err) + // If there is an error when sending flow records because of intermittent connectivity, we reset the connection + // to IPFIX collector and retry in the next export cycle to reinitialize the connection and send flow records. + exp.process.CloseConnToCollector() + exp.process = nil + return + } + + exp.pollCycle = 0 + klog.V(2).Infof("Successfully exported IPFIX flow records") } } - exp.flowRecords.BuildFlowRecords() - err := exp.sendFlowRecords() - if err != nil { - klog.Errorf("Error when sending flow records: %v", err) - // If there is an error when sending flow records because of intermittent connectivity, we reset the connection - // to IPFIX collector and retry in the next export cycle to reinitialize the connection and send flow records. - exp.process.CloseConnToCollector() - exp.process = nil - } - exp.pollCycle = 0 - klog.V(2).Infof("Successfully exported IPFIX flow records") } - return } func (exp *flowExporter) initFlowExporter(collector net.Addr) error { diff --git a/test/e2e/flowexporter_test.go b/test/e2e/flowexporter_test.go index b195afa8276..96fbd822cc9 100644 --- a/test/e2e/flowexporter_test.go +++ b/test/e2e/flowexporter_test.go @@ -62,7 +62,7 @@ func TestFlowExporter(t *testing.T) { rc, collectorOutput, _, err := provider.RunCommandOnNode(masterNodeName(), fmt.Sprintf("kubectl logs ipfix-collector -n antrea-test")) if err != nil || rc != 0 { - t.Fatalf("error when getting logs %v, rc: %v", err, rc) + t.Fatalf("Error when getting logs %v, rc: %v", err, rc) } /* Parse through IPFIX collector output. Sample output (with truncated fields) is given below: diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 83f945138a3..c344eddab76 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -311,12 +311,11 @@ func (data *TestData) deployAntreaFlowExporter(ipfixCollector string) error { } antreaAgentConf, _ := configMap.Data["antrea-agent.conf"] - antreaAgentConf = strings.Replace(antreaAgentConf, "# FlowExporter: false", " FlowExporter: true", 1) + antreaAgentConf = strings.Replace(antreaAgentConf, "# FlowExporter: false", " FlowExporter: true", 1) antreaAgentConf = strings.Replace(antreaAgentConf, "#flowCollectorAddr: \"\"", fmt.Sprintf("flowCollectorAddr: \"%s\"", ipfixCollector), 1) antreaAgentConf = strings.Replace(antreaAgentConf, "#flowPollInterval: \"5s\"", "flowPollInterval: \"1s\"", 1) antreaAgentConf = strings.Replace(antreaAgentConf, "#flowExportFrequency: 12", "flowExportFrequency: 5", 1) configMap.Data["antrea-agent.conf"] = antreaAgentConf - if _, err := data.clientset.CoreV1().ConfigMaps(antreaNamespace).Update(context.TODO(), configMap, metav1.UpdateOptions{}); err != nil { return fmt.Errorf("failed to update ConfigMap %s: %v", configMap.Name, err) } @@ -328,20 +327,6 @@ func (data *TestData) deployAntreaFlowExporter(ipfixCollector string) error { return fmt.Errorf("error when restarting antrea-agent Pod: %v", err) } - // Just to be safe disabling the FlowExporter feature for subsequent tests. - configMap, err = data.GetAntreaConfigMap(antreaNamespace) - if err != nil { - return fmt.Errorf("failed to get ConfigMap: %v", err) - } - - antreaAgentConf, _ = configMap.Data["antrea-agent.conf"] - antreaAgentConf = strings.Replace(antreaAgentConf, " FlowExporter: true", " FlowExporter: false", 1) - configMap.Data["antrea-agent.conf"] = antreaAgentConf - - if _, err := data.clientset.CoreV1().ConfigMaps(antreaNamespace).Update(context.TODO(), configMap, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("failed to update ConfigMap %s: %v", configMap.Name, err) - } - return nil }