Skip to content

Commit

Permalink
Flow exporter feature
Browse files Browse the repository at this point in the history
Added e2e test that can run on vagrant cluster. Tried to get this
working on kind cluster. There are issues with ovs-appctl access on
kind cluster nodes, so skipping the test for kind provider.

Addressed review comments.
  • Loading branch information
srikartati committed Jun 24, 2020
1 parent ca41a08 commit fb947d1
Show file tree
Hide file tree
Showing 28 changed files with 775 additions and 213 deletions.
19 changes: 19 additions & 0 deletions build/images/ipfixcollector/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
FROM ubuntu:18.04

LABEL maintainer="Antrea <projectantrea-dev@googlegroups.com>"
LABEL description="A Docker image based on Ubuntu 18.04 which contains simple IPFIX collector to run flow exporter tests"

WORKDIR /ipfix

COPY libipfix-impd4e_110224.tgz /ipfix

RUN apt-get update && \
apt-get install -y --no-install-recommends gcc libc6-dev build-essential libpcap0.8-dev && \
tar -xvf libipfix-* && rm libipfix-* && \
cd libipfix_* && ./configure && make && make install && ldconfig && \
cp collector/ipfix_collector /usr/local/bin && \
apt-get remove -y gcc build-essential && \
rm -rf /var/cache/apt/* /var/lib/apt/lists/*


ENTRYPOINT "ipfix_collector"
19 changes: 19 additions & 0 deletions build/images/ipfixcollector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# images/ipfixcollector

This Docker image is a very lightweight image based on Ubuntu 18.04 which
includes ipfix collector based on libipfix, a C library.
In this image, IPFIX collector listening on tcp:4739 port.

libipfix-impd4e_110224.tgz is downloaded from http://sourceforge.net/projects/libipfix/
If required, please get the latest tar and build the image again.

New version of the image can be built and pushed to Dockerhub using following instructions:

```bash
cd build/images/ethtool
docker build -t antrea/ipfixcollector:latest .
docker push antrea/ipfixcollector:latest
```

The `docker push` command will fail if you do not have permission to push to the
`antrea` Dockerhub repository.
Binary file not shown.
13 changes: 10 additions & 3 deletions build/yamls/antrea-eks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,13 @@ data:
# flow records of conntrack flows on OVS bridge. If no L4 transport proto is given, we consider tcp as default.
# Defaults to "".
#flowCollectorAddr: ""
# Provide flow exporter poll and export interval in format "0s:0s". This determines how often flow exporter polls connections
# in conntrack module and exports IPFIX flow records that are built from connection store.
# Any value in range [1s, ExportInterval(s)) for poll interval is acceptable.
# Any value in range (PollInterval(s), 600s] for export interval is acceptable.
# Defaults to "5s:60s". Follow the time units of duration.
#pollAndExportInterval: ""
antrea-cni.conflist: |
{
"cniVersion":"0.3.0",
Expand Down Expand Up @@ -443,7 +450,7 @@ metadata:
annotations: {}
labels:
app: antrea
name: antrea-config-b2h6hh8cbb
name: antrea-config-67ktmhfcmm
namespace: kube-system
---
apiVersion: v1
Expand Down Expand Up @@ -548,7 +555,7 @@ spec:
key: node-role.kubernetes.io/master
volumes:
- configMap:
name: antrea-config-b2h6hh8cbb
name: antrea-config-67ktmhfcmm
name: antrea-config
- name: antrea-controller-tls
secret:
Expand Down Expand Up @@ -762,7 +769,7 @@ spec:
operator: Exists
volumes:
- configMap:
name: antrea-config-b2h6hh8cbb
name: antrea-config-67ktmhfcmm
name: antrea-config
- hostPath:
path: /etc/cni/net.d
Expand Down
13 changes: 10 additions & 3 deletions build/yamls/antrea-gke.yml
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,13 @@ data:
# flow records of conntrack flows on OVS bridge. If no L4 transport proto is given, we consider tcp as default.
# Defaults to "".
#flowCollectorAddr: ""
# Provide flow exporter poll and export interval in format "0s:0s". This determines how often flow exporter polls connections
# in conntrack module and exports IPFIX flow records that are built from connection store.
# Any value in range [1s, ExportInterval(s)) for poll interval is acceptable.
# Any value in range (PollInterval(s), 600s] for export interval is acceptable.
# Defaults to "5s:60s". Follow the time units of duration.
#pollAndExportInterval: ""
antrea-cni.conflist: |
{
"cniVersion":"0.3.0",
Expand Down Expand Up @@ -443,7 +450,7 @@ metadata:
annotations: {}
labels:
app: antrea
name: antrea-config-6fch497g6d
name: antrea-config-mt828fc4db
namespace: kube-system
---
apiVersion: v1
Expand Down Expand Up @@ -548,7 +555,7 @@ spec:
key: node-role.kubernetes.io/master
volumes:
- configMap:
name: antrea-config-6fch497g6d
name: antrea-config-mt828fc4db
name: antrea-config
- name: antrea-controller-tls
secret:
Expand Down Expand Up @@ -760,7 +767,7 @@ spec:
operator: Exists
volumes:
- configMap:
name: antrea-config-6fch497g6d
name: antrea-config-mt828fc4db
name: antrea-config
- hostPath:
path: /etc/cni/net.d
Expand Down
13 changes: 10 additions & 3 deletions build/yamls/antrea-ipsec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,13 @@ data:
# flow records of conntrack flows on OVS bridge. If no L4 transport proto is given, we consider tcp as default.
# Defaults to "".
#flowCollectorAddr: ""
# Provide flow exporter poll and export interval in format "0s:0s". This determines how often flow exporter polls connections
# in conntrack module and exports IPFIX flow records that are built from connection store.
# Any value in range [1s, ExportInterval(s)) for poll interval is acceptable.
# Any value in range (PollInterval(s), 600s] for export interval is acceptable.
# Defaults to "5s:60s". Follow the time units of duration.
#pollAndExportInterval: ""
antrea-cni.conflist: |
{
"cniVersion":"0.3.0",
Expand Down Expand Up @@ -443,7 +450,7 @@ metadata:
annotations: {}
labels:
app: antrea
name: antrea-config-5972td7g8f
name: antrea-config-thb9ff9g6t
namespace: kube-system
---
apiVersion: v1
Expand Down Expand Up @@ -557,7 +564,7 @@ spec:
key: node-role.kubernetes.io/master
volumes:
- configMap:
name: antrea-config-th9cdgt9c6
name: antrea-config-thb9ff9g6t
name: antrea-config
- name: antrea-controller-tls
secret:
Expand Down Expand Up @@ -804,7 +811,7 @@ spec:
operator: Exists
volumes:
- configMap:
name: antrea-config-5972td7g8f
name: antrea-config-thb9ff9g6t
name: antrea-config
- hostPath:
path: /etc/cni/net.d
Expand Down
13 changes: 10 additions & 3 deletions build/yamls/antrea.yml
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,13 @@ data:
# flow records of conntrack flows on OVS bridge. If no L4 transport proto is given, we consider tcp as default.
# Defaults to "".
#flowCollectorAddr: ""
# Provide flow exporter poll and export interval in format "0s:0s". This determines how often flow exporter polls connections
# in conntrack module and exports IPFIX flow records that are built from connection store.
# Any value in range [1s, ExportInterval(s)) for poll interval is acceptable.
# Any value in range (PollInterval(s), 600s] for export interval is acceptable.
# Defaults to "5s:60s". Follow the time units of duration.
#pollAndExportInterval: ""
antrea-cni.conflist: |
{
"cniVersion":"0.3.0",
Expand Down Expand Up @@ -443,7 +450,7 @@ metadata:
annotations: {}
labels:
app: antrea
name: antrea-config-chk67fb6cb
name: antrea-config-7884cbfgfh
namespace: kube-system
---
apiVersion: v1
Expand Down Expand Up @@ -548,7 +555,7 @@ spec:
key: node-role.kubernetes.io/master
volumes:
- configMap:
name: antrea-config-chk67fb6cb
name: antrea-config-7884cbfgfh
name: antrea-config
- name: antrea-controller-tls
secret:
Expand Down Expand Up @@ -760,7 +767,7 @@ spec:
operator: Exists
volumes:
- configMap:
name: antrea-config-chk67fb6cb
name: antrea-config-7884cbfgfh
name: antrea-config
- hostPath:
path: /etc/cni/net.d
Expand Down
7 changes: 7 additions & 0 deletions build/yamls/base/conf/antrea-agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,10 @@
# flow records of conntrack flows on OVS bridge. If no L4 transport proto is given, we consider tcp as default.
# Defaults to "".
#flowCollectorAddr: ""

# Provide flow exporter poll and export interval in format "0s:0s". This determines how often flow exporter polls connections
# in conntrack module and exports IPFIX flow records that are built from connection store.
# Any value in range [1s, ExportInterval(s)) for poll interval is acceptable.
# Any value in range (PollInterval(s), 600s] for export interval is acceptable.
# Defaults to "5s:60s". Follow the time units of duration.
#pollAndExportInterval: ""
15 changes: 10 additions & 5 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,18 @@ func run(o *Options) error {

// Initialize flow exporter and start functions to poll conntrack flows and export IPFIX flow records
if o.flowCollector != nil {
connTrack := connections.NewConnTrack(nodeConfig, serviceCIDRNet, connections.NewConnTrackPoller())
connStore := connections.NewConnectionStore(connTrack, ifaceStore)
var connTrack connections.ConnTrack
if o.config.OVSDatapathType == ovsconfig.OVSDatapathSystem {
connTrack = connections.NewConnTrack(connections.NewCTPollerSystem(), nodeConfig, serviceCIDRNet, o.config.OVSDatapathType)
} else if o.config.OVSDatapathType == ovsconfig.OVSDatapathNetdev {
connTrack = connections.NewConnTrack(connections.NewCTPollerNetdev(), nodeConfig, serviceCIDRNet, o.config.OVSDatapathType)
}

connStore := connections.NewConnectionStore(connTrack, ifaceStore, o.pollingInterval)
flowRecords := flowrecords.NewFlowRecords(connStore)
flowExporter, err := exporter.InitFlowExporter(o.flowCollector, flowRecords)
flowExporter, err := exporter.InitFlowExporter(o.flowCollector, flowRecords, o.exportInterval)
if err != nil {
// Antrea agent do not exit, if flow exporter cannot be initialized.
// Currently, only logging the error.
// Antrea agent does not exit, if flow exporter cannot be initialized; only error is logged.
klog.Errorf("error when initializing flow exporter: %v", err)
} else {
go connStore.Run(stopCh)
Expand Down
6 changes: 6 additions & 0 deletions cmd/antrea-agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,10 @@ type AgentConfig struct {
// flow records of conntrack flows on OVS bridge. If no L4 transport proto is given, we consider tcp as default.
// Defaults to "".
FlowCollectorAddr string `yaml:"flowCollectorAddr,omitempty"`
// Provide flow exporter poll and export interval in format "0s:0s". This determines how often flow exporter polls connections
// in conntrack module and exports IPFIX flow records that are built from connection store.
// Any value in range [1s, ExportInterval(s)) for poll interval is acceptable.
// Any value in range (PollInterval(s), 600s] for export interval is acceptable.
// Defaults to "5s:60s". Follow the time units of duration.
PollAndExportInterval string `yaml:"pollAndExportInterval,omitempty"`
}
86 changes: 55 additions & 31 deletions cmd/antrea-agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io/ioutil"
"net"
"strings"
"time"

"github.com/spf13/pflag"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -52,6 +53,10 @@ type Options struct {
config *AgentConfig
// IPFIX flow collector
flowCollector net.Addr
// Flow exporter polling interval
pollingInterval time.Duration
// Flow exporter export interval
exportInterval time.Duration
}

func newOptions() *Options {
Expand Down Expand Up @@ -106,41 +111,55 @@ func (o *Options) validate(args []string) error {
return fmt.Errorf("IPSec tunnel may only be enabled on %s mode", config.TrafficEncapModeEncap)
}
if o.config.FlowCollectorAddr != "" {
if o.config.OVSDatapathType == ovsconfig.OVSDatapathNetdev {
return fmt.Errorf("exporting flows is not supported for OVS datapath type %s", o.config.OVSDatapathType)
} else {
// Check if it is TCP or UDP
strSlice := strings.Split(o.config.FlowCollectorAddr, ":")
var proto string
if len(strSlice) == 2 {
// No separator "." and proto is given
proto = "tcp"
} else if len(strSlice) > 2 {
if strSlice[2] == "udp" {
proto = "udp"
} else {
// All other cases default proto is tcp
proto = "tcp"
}
// Check if it is TCP or UDP
strSlice := strings.Split(o.config.FlowCollectorAddr, ":")
var proto string
if len(strSlice) == 2 {
// No separator "." and proto is given
proto = "tcp"
} else if len(strSlice) > 2 {
if strSlice[2] == "udp" {
proto = "udp"
} else {
return fmt.Errorf("IPFIX flow collector is given in invalid format: %v", err)
// All other cases default proto is tcp
proto = "tcp"
}
// Convert the string input in net.Addr format
hostPortAddr := strSlice[0]+":"+strSlice[1]
_, _, err := net.SplitHostPort(hostPortAddr)
} else {
return fmt.Errorf("IPFIX flow collector is given in invalid format: %v", err)
}
// Convert the string input in net.Addr format
hostPortAddr := strSlice[0] + ":" + strSlice[1]
_, _, err := net.SplitHostPort(hostPortAddr)
if err != nil {
return fmt.Errorf("IPFIX flow collector is given in invalid format: %v", err)
}
if proto == "udp" {
o.flowCollector, err = net.ResolveUDPAddr("udp", hostPortAddr)
if err != nil {
return fmt.Errorf("IPFIX flow collector is given in invalid format: %v", err)
return fmt.Errorf("IPFIX flow collector over UDP proto is not resolved: %v", err)
}
if proto == "udp" {
o.flowCollector, err = net.ResolveUDPAddr("udp", hostPortAddr)
if err != nil {
return fmt.Errorf("IPFIX flow collector over UDP proto is not resolved. Error: %v", err)
}
} else {
o.flowCollector, err = net.ResolveTCPAddr("tcp", hostPortAddr)
if err != nil {
return fmt.Errorf("IPFIX flow collector server TCP proto is not resolved. Error: %v", err)
}
} else {
o.flowCollector, err = net.ResolveTCPAddr("tcp", hostPortAddr)
if err != nil {
return fmt.Errorf("IPFIX flow collector server TCP proto is not resolved: %v", err)
}
}

if o.config.PollAndExportInterval != "" {
intervalSlice := strings.Split(o.config.PollAndExportInterval, ":")
if len(intervalSlice) != 2 {
return fmt.Errorf("flow exporter intervals %s is not in acceptable format \"OOs:OOs\"", o.config.PollAndExportInterval)
}
o.pollingInterval, err = time.ParseDuration(intervalSlice[0])
if err != nil {
return fmt.Errorf("poll interval is not provided in right format: %v", err)
}
o.exportInterval, err = time.ParseDuration(intervalSlice[1])
if err != nil {
return fmt.Errorf("export interval is not provided in right format: %v", err)
}
if o.pollingInterval > o.exportInterval {
return fmt.Errorf("poll interval should be less than or equal to export interval")
}
}
}
Expand Down Expand Up @@ -212,4 +231,9 @@ func (o *Options) setDefaults() {
if o.config.APIPort == 0 {
o.config.APIPort = apis.AntreaAgentAPIPort
}

if o.config.FlowCollectorAddr != "" && o.config.PollAndExportInterval == "" {
o.pollingInterval = 5 * time.Second
o.exportInterval = 60 * time.Second
}
}
Loading

0 comments on commit fb947d1

Please sign in to comment.