-
Notifications
You must be signed in to change notification settings - Fork 386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support sending IPFIX flow records for Antrea flow exporter #825
Conversation
Thanks for your PR. The following commands are available:
These commands can only be run by members of the vmware-tanzu organization. |
51f7ec1
to
ac163fd
Compare
I did testing with IPFIX collector (C-based ipfixlib) in a local k8s cluster running iperf service with 2 tcp clients and one server. Collector is able to see all the data records sent at ipfix export interval. Posting output for reference below. In the process, fixed some bugs in previous patch and ipfix library.
|
1c86e77
to
09a1c32
Compare
Added few unit test. Using the go-ipfixlib at "github.com/srikartati/go-ipfixlib" by making it public. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A general question - have we observed higher CPU usage/contention when flow exporting is enabled?
defer cs.mutex.Unlock() | ||
|
||
for k, v := range cs.connections { | ||
cs.mutex.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not safe to iterate a map without lock? What happens if another routine add/delete keys?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is possible when conntrack poller go routine accessing the map. Doing this to avoid lock contention on whole map, when we are building flow record for a single connection. The disadvantage is that we may send connection data of last poll cycle (difference in seconds) for very few flow records--as this is continuous monitoring, I thought this is ok. What is your opinion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to provide some further clarification, by considering a scenario, w.r.t following code. Here as well if routine executing iterate grabs the lock first, then we will build records using old connection data. However, by releasing lock after every item, we can decrease the contention.
lock
addOrUpdateConnectionMap
unlock
lock
IterateConnectionMap
unlock
Also, I would like to note that range connectionMap will iterate on old copy of connection map even connection map is updated in other routine. I missed that in my previous comment. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not know map iteration in Golang is thread safe.
Anyway could you add some comments to explain why you release the lock there, and what could be the impact?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, Jianjun. Iteration over the map is not thread-safe depending on concurrent operations. Releasing lock in our case may work because of current deletion logic, and the presumption that sending old connection data is fine. Added the comment in the code. Please take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds tricky. Have you verified it really works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It worked as expected with respect to deletion logic. However, because of the IPFIX collector requirement of sending accurate flow records rather than from the last poll cycle, I changed the synchronization logic.
@@ -36,3 +41,11 @@ type Connection struct { | |||
DestinationPodNamespace string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some comments somewhere indicating Pod name/NS is filled only when the Pod is local?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A general question - have we observed higher CPU usage/contention when flow exporting is enabled?
I did not look closely into CPU utilization stats yet. I will compare the following scenarios w.r.t cpu utilization and percentage of used and runnable times for processes:
- No flow exporter
- Flow exporter with no workload
- Flow exporter with workload.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will checkin the fixes along with the e2e test in the next patch. Please let me know if you have further comments. Thanks.
defer cs.mutex.Unlock() | ||
|
||
for k, v := range cs.connections { | ||
cs.mutex.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is possible when conntrack poller go routine accessing the map. Doing this to avoid lock contention on whole map, when we are building flow record for a single connection. The disadvantage is that we may send connection data of last poll cycle (difference in seconds) for very few flow records--as this is continuous monitoring, I thought this is ok. What is your opinion?
Thanks for your PR. The following commands are available:
These commands can only be run by members of the vmware-tanzu organization. |
bd0f7d8
to
fb947d1
Compare
Regenerated manifest files too.
Changed synchronization logic for poll(ConnectionStore.Run) and export(FlowExporter.Run) go routines. Fixed e2e tests.
First record should send 0 delta bytes/packets otherwise already established flow will show incorrect throughput (Mb/s or PPS)
This will be removed when network policy info is added in flow records.
9a3212c
to
e90d1b2
Compare
Thanks for your PR. The following commands are available:
|
Major change is handling of error in exporter go routine.
e90d1b2
to
841469c
Compare
|
||
func (exp *ipfixExportingProcess) CloseConnToCollector() { | ||
exp.ExportingProcess.CloseConnToCollector() | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return
not needed (same below and maybe in other places?)
} | ||
|
||
// CheckAndDoExport enables us to export flow records periodically at a given flow export frequency. | ||
func (exp *flowExporter) CheckAndDoExport(collector net.Addr, pollDone chan struct{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the Check
part of the name is a bit misleading and not needed. I would suggest removing it altogether. Or at least mention in the comment that the function ensures there is a live connection to the collector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the name.
klog.V(2).Infof("Successfully exported IPFIX flow records") | ||
} | ||
|
||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return
not needed
cmd/antrea-agent/agent.go
Outdated
flowExporter := exporter.NewFlowExporter( | ||
flowrecords.NewFlowRecords(connStore), | ||
o.config.FlowExportFrequency) | ||
go wait.Until(func() { flowExporter.CheckAndDoExport(o.flowCollector, pollDone) }, o.pollInterval, stopCh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's not exactly what I had in mind here. You should still have a select loop in CheckAndDoExport
, the time duration provided here should be one second or a few seconds, and should only be used to establish the connection again in case of error.
in case of error, CheckAndDoExport
logs the error, then returns. wait.Until
will then take care of calling it again (which will establish the connection, and start a new for / select loop). If the connection never breaks or there is no export error, CheckAndDoExport
will not return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I handled the connection creation through if exp.process == nil
only when there is error.
I have to change this when we move the handling of connection logic to go-ipfix. Keeping this as is for now as discussed offline.
for _, flow := range outputFlow { | ||
conn, err := flowStringToAntreaConnection(flow, zoneFilter) | ||
if err != nil { | ||
klog.Warningf("Ignoring the flow from conntrack dump due to the error: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
klog.Warningf("Ignoring the flow from conntrack dump due to the error: %v", err) | |
klog.Warningf("Ignoring the flow from conntrack dump due parsing error: %v", err) | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/test-all |
/test-networkpolicy |
f1744e4
to
b8d9ee4
Compare
/test-all |
for { | ||
select { | ||
case <-stopCh: | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/break/return
b8d9ee4
to
747569d
Compare
test/e2e/flowexporter_test.go
Outdated
|
||
rc, collectorOutput, _, err := provider.RunCommandOnNode(masterNodeName(), fmt.Sprintf("kubectl logs ipfix-collector -n antrea-test")) | ||
if err != nil || rc != 0 { | ||
t.Fatalf("error when getting logs %v, rc: %v", err, rc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: first letter of log message should be capitalized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
t.Fatalf("Error in converting octetDeltaCount to int type") | ||
} | ||
// compute the bandwidth using 5s as interval | ||
recBandwidth := (deltaBytes * 8.0) / float64((int64(5.0))*time.Second.Nanoseconds()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recBandwidth := (deltaBytes * 8.0) / float64(5*time.Second.Nanoseconds())
does not work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it works. For some reason, I started with 5.0 to make everything float and navigated around that. Changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed this change. As tests have finished in this PR, modified this file in this PR #984 .
747569d
to
0ef571a
Compare
/test-all |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
A few things we talked about offline to keep in mind for post 0.9:
- handle reconnections to the IPFix collector in the go-ipfix library
- add unit tests for the reconnection case if possible
- improve OVS appctl connection parsing logic
Thanks for the review. Yes there is an issue for the first one in go-ipfix. I will keep a note of the last one to enhance OVS appctl dump flows function. This is needed for windows support. |
Added support to export IPFIX flow records that are built from
connection map using IPFIX library.
Tested with local IPFIX collector running in k8s cluster. Unit tests were added. Test with
elastiflow collector in progress.
Fixes# 712