Skip to content

Commit

Permalink
Combine FlowExporter resources initialization.
Browse files Browse the repository at this point in the history
Signed-off-by: heanlan <hanlan@vmware.com>
  • Loading branch information
heanlan committed Nov 3, 2021
1 parent a48f4db commit 4443094
Show file tree
Hide file tree
Showing 15 changed files with 158 additions and 130 deletions.
75 changes: 30 additions & 45 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ import (
"antrea.io/antrea/pkg/agent/controller/networkpolicy"
"antrea.io/antrea/pkg/agent/controller/noderoute"
"antrea.io/antrea/pkg/agent/controller/traceflow"
"antrea.io/antrea/pkg/agent/flowexporter/connections"
"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/flowexporter/exporter"
"antrea.io/antrea/pkg/agent/flowexporter/priorityqueue"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/metrics"
npl "antrea.io/antrea/pkg/agent/nodeportlocal"
Expand Down Expand Up @@ -258,14 +257,6 @@ func run(o *Options) error {
statusManagerEnabled := antreaPolicyEnabled
loggingEnabled := antreaPolicyEnabled

var denyConnStore *connections.DenyConnectionStore
var denyPriorityQueue *priorityqueue.ExpirePriorityQueue
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
denyPriorityQueue = priorityqueue.NewExpirePriorityQueue(o.activeFlowTimeout, o.idleFlowTimeout)
denyConnStore = connections.NewDenyConnectionStore(ifaceStore, proxier, denyPriorityQueue, o.staleConnectionTimeout)
go denyConnStore.RunPeriodicDeletion(stopCh)
}

networkPolicyController, err := networkpolicy.NewNetworkPolicyController(
antreaClientProvider,
ofClient,
Expand All @@ -278,7 +269,6 @@ func run(o *Options) error {
antreaProxyEnabled,
statusManagerEnabled,
loggingEnabled,
denyConnStore,
asyncRuleDeleteInterval,
o.config.DNSServerOverride)
if err != nil {
Expand Down Expand Up @@ -364,6 +354,34 @@ func run(o *Options) error {
return err
}

var flowExporter *exporter.FlowExporter
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
flowExporterOptions := &flowexporter.FlowExporterOptions{
FlowCollectorAddr: o.flowCollectorAddr,
FlowCollectorProto: o.flowCollectorProto,
ActiveFlowTimeout: o.activeFlowTimeout,
IdleFlowTimeout: o.idleFlowTimeout,
StaleConnectionTimeout: o.staleConnectionTimeout,
PollInterval: o.pollInterval}
flowExporter, err = exporter.NewFlowExporter(
ifaceStore,
proxier,
k8sClient,
nodeRouteController,
networkConfig.TrafficEncapMode,
nodeConfig,
serviceCIDRNet,
serviceCIDRNetv6,
&ovsDatapathType,
features.DefaultFeatureGate.Enabled(features.AntreaProxy),
networkPolicyController,
flowExporterOptions)
if err != nil {
return fmt.Errorf("error when creating IPFIX flow exporter: %v", err)
}
networkPolicyController.SetDenyConnStore(flowExporter.GetDenyConnStore())
}

// Start the NPL agent.
if features.DefaultFeatureGate.Enabled(features.NodePortLocal) && o.config.NodePortLocal.Enable {
nplController, err := npl.InitializeNPLAgent(
Expand Down Expand Up @@ -479,41 +497,8 @@ func run(o *Options) error {
go ofClient.StartPacketInHandler(packetInReasons, stopCh)
}

// Initialize flow exporter to start go routines to poll conntrack flows and export IPFIX flow records
var conntrackPriorityQueue *priorityqueue.ExpirePriorityQueue
// Start the goroutine to periodically export IPFIX flow records.
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode)
v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode)
isNetworkPolicyOnly := networkConfig.TrafficEncapMode.IsNetworkPolicyOnly()

conntrackPriorityQueue = priorityqueue.NewExpirePriorityQueue(o.activeFlowTimeout, o.idleFlowTimeout)
conntrackConnStore := connections.NewConntrackConnectionStore(
connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, serviceCIDRNetv6, ovsDatapathType, features.DefaultFeatureGate.Enabled(features.AntreaProxy)),
ifaceStore,
v4Enabled,
v6Enabled,
proxier,
networkPolicyController,
o.pollInterval,
conntrackPriorityQueue,
o.staleConnectionTimeout)
go conntrackConnStore.Run(stopCh)

flowExporter, err := exporter.NewFlowExporter(
conntrackConnStore,
denyConnStore,
o.flowCollectorAddr,
o.flowCollectorProto,
v4Enabled,
v6Enabled,
k8sClient,
nodeRouteController,
isNetworkPolicyOnly,
conntrackPriorityQueue,
denyPriorityQueue)
if err != nil {
return fmt.Errorf("error when creating IPFIX flow exporter: %v", err)
}
go flowExporter.Run(stopCh)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
antreaProxyEnabled bool,
statusManagerEnabled bool,
loggingEnabled bool,
denyConnStore *connections.DenyConnectionStore,
asyncRuleDeleteInterval time.Duration,
dnsServerOverride string) (*Controller, error) {
idAllocator := newIDAllocator(asyncRuleDeleteInterval, dnsInterceptRuleID)
Expand All @@ -131,7 +130,6 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
antreaProxyEnabled: antreaProxyEnabled,
statusManagerEnabled: statusManagerEnabled,
loggingEnabled: loggingEnabled,
denyConnStore: denyConnStore,
}
if antreaPolicyEnabled {
var err error
Expand Down Expand Up @@ -408,6 +406,10 @@ func (c *Controller) GetControllerConnectionStatus() bool {
return c.addressGroupWatcher.isConnected() && c.appliedToGroupWatcher.isConnected() && c.networkPolicyWatcher.isConnected()
}

func (c *Controller) SetDenyConnStore(denyConnStore *connections.DenyConnectionStore) {
c.denyConnStore = denyConnStore
}

// Run begins watching and processing Antrea AddressGroups, AppliedToGroups
// and NetworkPolicies, and spawns workers that reconciles NetworkPolicy rules.
// Run will not return until stopCh is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) {
ch2 := make(chan string, 100)
groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(false, ch2)}
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", ch, groupCounters, ch2,
true, true, true, true, nil, testAsyncDeleteInterval, "8.8.8.8:53")
true, true, true, true, testAsyncDeleteInterval, "8.8.8.8:53")
reconciler := newMockReconciler()
controller.reconciler = reconciler
controller.antreaPolicyLogger = nil
Expand Down
8 changes: 3 additions & 5 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,13 @@ type connectionStore struct {
func NewConnectionStore(
ifaceStore interfacestore.InterfaceStore,
proxier proxy.Proxier,
expirePriorityQueue *priorityqueue.ExpirePriorityQueue,
staleConnectionTimeout time.Duration,
) connectionStore {
o *flowexporter.FlowExporterOptions) connectionStore {
return connectionStore{
connections: make(map[flowexporter.ConnectionKey]*flowexporter.Connection),
ifaceStore: ifaceStore,
antreaProxier: proxier,
expirePriorityQueue: expirePriorityQueue,
staleConnectionTimeout: staleConnectionTimeout,
expirePriorityQueue: priorityqueue.NewExpirePriorityQueue(o.ActiveFlowTimeout, o.IdleFlowTimeout),
staleConnectionTimeout: o.StaleConnectionTimeout,
}
}

Expand Down
17 changes: 14 additions & 3 deletions pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,21 @@ import (
)

const (
testActiveFlowTimeout = 3 * time.Second
testIdleFlowTimeout = 1 * time.Second
testPollInterval = 0 // Not used in these tests, hence 0.
testStaleConnectionTimeout = 5 * time.Minute
)

var testFlowExporterOptions = &flowexporter.FlowExporterOptions{
FlowCollectorAddr: "",
FlowCollectorProto: "",
ActiveFlowTimeout: testActiveFlowTimeout,
IdleFlowTimeout: testIdleFlowTimeout,
StaleConnectionTimeout: testStaleConnectionTimeout,
PollInterval: testPollInterval,
}

func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -70,7 +81,7 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
}
// Create connectionStore
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
connStore := NewConnectionStore(mockIfaceStore, nil, nil, testStaleConnectionTimeout)
connStore := NewConnectionStore(mockIfaceStore, nil, testFlowExporterOptions)
// Add flows to the Connection store
for i, flow := range testFlows {
connStore.connections[*testFlowKeys[i]] = flow
Expand All @@ -97,7 +108,7 @@ func TestConnectionStore_DeleteConnWithoutLock(t *testing.T) {
metrics.InitializeConnectionMetrics()
// test on deny connection store
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
denyConnStore := NewDenyConnectionStore(mockIfaceStore, nil, nil, testStaleConnectionTimeout)
denyConnStore := NewDenyConnectionStore(mockIfaceStore, nil, testFlowExporterOptions)
tuple := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255}
conn := &flowexporter.Connection{
FlowKey: tuple,
Expand All @@ -114,7 +125,7 @@ func TestConnectionStore_DeleteConnWithoutLock(t *testing.T) {

// test on conntrack connection store
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, mockIfaceStore, true, false, nil, nil, testPollInterval, nil, testStaleConnectionTimeout)
conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, true, false, nil, mockIfaceStore, nil, testFlowExporterOptions)
conntrackConnStore.connections[connKey] = conn

metrics.TotalAntreaConnectionsInConnTrackTable.Set(1)
Expand Down
16 changes: 9 additions & 7 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,20 @@ type ConntrackConnectionStore struct {

func NewConntrackConnectionStore(
connTrackDumper ConnTrackDumper,
ifaceStore interfacestore.InterfaceStore,
v4Enabled bool,
v6Enabled bool,
proxier proxy.Proxier,
npQuerier querier.AgentNetworkPolicyInfoQuerier,
pollInterval time.Duration,
expirePriorityQueue *priorityqueue.ExpirePriorityQueue,
staleConnectionTimeout time.Duration,
ifaceStore interfacestore.InterfaceStore,
proxier proxy.Proxier,
o *flowexporter.FlowExporterOptions,
) *ConntrackConnectionStore {
return &ConntrackConnectionStore{
connDumper: connTrackDumper,
v4Enabled: v4Enabled,
v6Enabled: v6Enabled,
networkPolicyQuerier: npQuerier,
pollInterval: pollInterval,
connectionStore: NewConnectionStore(ifaceStore, proxier, expirePriorityQueue, staleConnectionTimeout),
pollInterval: o.PollInterval,
connectionStore: NewConnectionStore(ifaceStore, proxier, o),
}
}

Expand Down Expand Up @@ -290,3 +288,7 @@ func (cs *ConntrackConnectionStore) deleteConnWithoutLock(connKey flowexporter.C
metrics.TotalAntreaConnectionsInConnTrackTable.Dec()
return nil
}

func (cs *ConntrackConnectionStore) GetPriorityQueue() *priorityqueue.ExpirePriorityQueue {
return cs.connectionStore.expirePriorityQueue
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (

"antrea.io/antrea/pkg/agent/flowexporter"
connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing"
"antrea.io/antrea/pkg/agent/flowexporter/priorityqueue"
"antrea.io/antrea/pkg/agent/interfacestore"
interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing"
"antrea.io/antrea/pkg/agent/openflow"
Expand Down Expand Up @@ -105,9 +104,7 @@ func setupConntrackConnStore(b *testing.B) (*ConntrackConnectionStore, *connecti
mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true).AnyTimes()

npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl)
pq := priorityqueue.NewExpirePriorityQueue(testActiveFlowTimeout, testIdleFlowTimeout)
return NewConntrackConnectionStore(mockConnDumper, mockIfaceStore, true, false, mockProxier,
npQuerier, testPollInterval, pq, testStaleConnectionTimeout), mockConnDumper
return NewConntrackConnectionStore(mockConnDumper, true, false, npQuerier, mockIfaceStore, nil, testFlowExporterOptions), mockConnDumper
}

func generateConns() []*flowexporter.Connection {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

"antrea.io/antrea/pkg/agent/flowexporter"
connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing"
"antrea.io/antrea/pkg/agent/flowexporter/priorityqueue"
"antrea.io/antrea/pkg/agent/interfacestore"
interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing"
"antrea.io/antrea/pkg/agent/metrics"
Expand Down Expand Up @@ -211,9 +210,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
mockProxier := proxytest.NewMockProxier(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl)
pq := priorityqueue.NewExpirePriorityQueue(testActiveFlowTimeout, testIdleFlowTimeout)
conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, mockIfaceStore, true, false,
mockProxier, npQuerier, testPollInterval, pq, testStaleConnectionTimeout)
conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, true, false, npQuerier, mockIfaceStore, mockProxier, testFlowExporterOptions)

for _, c := range tc {
t.Run(c.name, func(t *testing.T) {
Expand Down Expand Up @@ -296,7 +293,7 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
metrics.TotalAntreaConnectionsInConnTrackTable.Set(float64(len(testFlows)))
// Create connectionStore
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
connStore := NewConntrackConnectionStore(nil, mockIfaceStore, true, false, nil, nil, testPollInterval, nil, testStaleConnectionTimeout)
connStore := NewConntrackConnectionStore(nil, true, false, nil, mockIfaceStore, nil, testFlowExporterOptions)
// Add flows to the connection store.
for i, flow := range testFlows {
connStore.connections[*testFlowKeys[i]] = flow
Expand All @@ -320,7 +317,7 @@ func TestConnectionStore_MetricSettingInPoll(t *testing.T) {
// Create connectionStore
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, mockIfaceStore, true, false, nil, nil, testPollInterval, nil, testStaleConnectionTimeout)
conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, true, false, nil, mockIfaceStore, nil, testFlowExporterOptions)
// Hard-coded conntrack occupancy metrics for test
TotalConnections := 0
MaxConnections := 300000
Expand Down
9 changes: 6 additions & 3 deletions pkg/agent/flowexporter/connections/deny_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ type DenyConnectionStore struct {
connectionStore
}

func NewDenyConnectionStore(ifaceStore interfacestore.InterfaceStore,
proxier proxy.Proxier, expirePriorityQueue *priorityqueue.ExpirePriorityQueue, staleConnectionTimeout time.Duration) *DenyConnectionStore {
func NewDenyConnectionStore(ifaceStore interfacestore.InterfaceStore, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions) *DenyConnectionStore {
return &DenyConnectionStore{
connectionStore: NewConnectionStore(ifaceStore, proxier, expirePriorityQueue, staleConnectionTimeout),
connectionStore: NewConnectionStore(ifaceStore, proxier, o),
}
}

Expand Down Expand Up @@ -137,3 +136,7 @@ func (ds *DenyConnectionStore) deleteConnWithoutLock(connKey flowexporter.Connec
metrics.TotalDenyConnections.Dec()
return nil
}

func (ds *DenyConnectionStore) GetPriorityQueue() *priorityqueue.ExpirePriorityQueue {
return ds.connectionStore.expirePriorityQueue
}
13 changes: 3 additions & 10 deletions pkg/agent/flowexporter/connections/deny_connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,12 @@ import (
"k8s.io/component-base/metrics/legacyregistry"

"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/flowexporter/priorityqueue"
interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing"
"antrea.io/antrea/pkg/agent/metrics"
proxytest "antrea.io/antrea/pkg/agent/proxy/testing"
k8sproxy "antrea.io/antrea/third_party/proxy"
)

const (
testActiveFlowTimeout = 3 * time.Second
testIdleFlowTimeout = 1 * time.Second
)

func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -75,16 +69,15 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
mockIfaceStore.EXPECT().GetInterfaceByIP(tuple.SourceAddress.String()).Return(nil, false)
mockIfaceStore.EXPECT().GetInterfaceByIP(tuple.DestinationAddress.String()).Return(nil, false)

pq := priorityqueue.NewExpirePriorityQueue(testActiveFlowTimeout, testIdleFlowTimeout)
denyConnStore := NewDenyConnectionStore(mockIfaceStore, mockProxier, pq, testStaleConnectionTimeout)
denyConnStore := NewDenyConnectionStore(mockIfaceStore, mockProxier, testFlowExporterOptions)

denyConnStore.AddOrUpdateConn(&testFlow, refTime.Add(-(time.Second * 20)), uint64(60))
expConn := testFlow
expConn.DestinationServicePortName = servicePortName.String()
actualConn, ok := denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&testFlow))
assert.Equal(t, ok, true, "deny connection should be there in deny connection store")
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.Equal(t, 1, pq.Len(), "Length of the expire priority queue should be 1")
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len(), "Length of the expire priority queue should be 1")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))

denyConnStore.AddOrUpdateConn(&testFlow, refTime.Add(-(time.Second * 10)), uint64(60))
Expand All @@ -95,7 +88,7 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
assert.Equal(t, ok, true, "deny connection should be there in deny connection store")
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.True(t, actualConn.IsActive)
assert.Equal(t, 1, pq.Len())
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len())
checkDenyConnectionMetrics(t, len(denyConnStore.connections))
}

Expand Down
Loading

0 comments on commit 4443094

Please sign in to comment.