Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine FlowExporter resources initialization in agent.go #2854

Merged
merged 1 commit into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 30 additions & 45 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ import (
"antrea.io/antrea/pkg/agent/controller/networkpolicy"
"antrea.io/antrea/pkg/agent/controller/noderoute"
"antrea.io/antrea/pkg/agent/controller/traceflow"
"antrea.io/antrea/pkg/agent/flowexporter/connections"
"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/flowexporter/exporter"
"antrea.io/antrea/pkg/agent/flowexporter/priorityqueue"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/metrics"
npl "antrea.io/antrea/pkg/agent/nodeportlocal"
Expand Down Expand Up @@ -258,14 +257,6 @@ func run(o *Options) error {
statusManagerEnabled := antreaPolicyEnabled
loggingEnabled := antreaPolicyEnabled

var denyConnStore *connections.DenyConnectionStore
var denyPriorityQueue *priorityqueue.ExpirePriorityQueue
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
denyPriorityQueue = priorityqueue.NewExpirePriorityQueue(o.activeFlowTimeout, o.idleFlowTimeout)
denyConnStore = connections.NewDenyConnectionStore(ifaceStore, proxier, denyPriorityQueue, o.staleConnectionTimeout)
go denyConnStore.RunPeriodicDeletion(stopCh)
}

networkPolicyController, err := networkpolicy.NewNetworkPolicyController(
antreaClientProvider,
ofClient,
Expand All @@ -278,7 +269,6 @@ func run(o *Options) error {
antreaProxyEnabled,
statusManagerEnabled,
loggingEnabled,
denyConnStore,
asyncRuleDeleteInterval,
o.config.DNSServerOverride)
if err != nil {
Expand Down Expand Up @@ -364,6 +354,34 @@ func run(o *Options) error {
return err
}

var flowExporter *exporter.FlowExporter
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think this whole block can be moved to the flow exporter package, but this is a good start.
When you think about it, is there a reason for denyConnStore and conntrackConnStore to exist inside this main function? Or could they be initialized as part of NewFlowExporter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any concern if we combine connection stores' initialization with flowExporter's. cc @srikartati for opinions.

And I have an initial implementation of the combination in the new commit. Please take a look to see whether we want to keep the changes. Thanks

flowExporterOptions := &flowexporter.FlowExporterOptions{
FlowCollectorAddr: o.flowCollectorAddr,
FlowCollectorProto: o.flowCollectorProto,
ActiveFlowTimeout: o.activeFlowTimeout,
IdleFlowTimeout: o.idleFlowTimeout,
StaleConnectionTimeout: o.staleConnectionTimeout,
PollInterval: o.pollInterval}
flowExporter, err = exporter.NewFlowExporter(
ifaceStore,
proxier,
k8sClient,
nodeRouteController,
networkConfig.TrafficEncapMode,
nodeConfig,
serviceCIDRNet,
serviceCIDRNetv6,
&ovsDatapathType,
features.DefaultFeatureGate.Enabled(features.AntreaProxy),
networkPolicyController,
flowExporterOptions)
if err != nil {
return fmt.Errorf("error when creating IPFIX flow exporter: %v", err)
}
networkPolicyController.SetDenyConnStore(flowExporter.GetDenyConnStore())
}

// Start the NPL agent.
if features.DefaultFeatureGate.Enabled(features.NodePortLocal) && o.config.NodePortLocal.Enable {
nplController, err := npl.InitializeNPLAgent(
Expand Down Expand Up @@ -479,41 +497,8 @@ func run(o *Options) error {
go ofClient.StartPacketInHandler(packetInReasons, stopCh)
}

// Initialize flow exporter to start go routines to poll conntrack flows and export IPFIX flow records
var conntrackPriorityQueue *priorityqueue.ExpirePriorityQueue
// Start the goroutine to periodically export IPFIX flow records.
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode)
v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode)
isNetworkPolicyOnly := networkConfig.TrafficEncapMode.IsNetworkPolicyOnly()

conntrackPriorityQueue = priorityqueue.NewExpirePriorityQueue(o.activeFlowTimeout, o.idleFlowTimeout)
conntrackConnStore := connections.NewConntrackConnectionStore(
connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, serviceCIDRNetv6, ovsDatapathType, features.DefaultFeatureGate.Enabled(features.AntreaProxy)),
ifaceStore,
v4Enabled,
v6Enabled,
proxier,
networkPolicyController,
o.pollInterval,
conntrackPriorityQueue,
o.staleConnectionTimeout)
go conntrackConnStore.Run(stopCh)

flowExporter, err := exporter.NewFlowExporter(
conntrackConnStore,
denyConnStore,
o.flowCollectorAddr,
o.flowCollectorProto,
v4Enabled,
v6Enabled,
k8sClient,
nodeRouteController,
isNetworkPolicyOnly,
conntrackPriorityQueue,
denyPriorityQueue)
if err != nil {
return fmt.Errorf("error when creating IPFIX flow exporter: %v", err)
}
go flowExporter.Run(stopCh)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
antreaProxyEnabled bool,
statusManagerEnabled bool,
loggingEnabled bool,
denyConnStore *connections.DenyConnectionStore,
asyncRuleDeleteInterval time.Duration,
dnsServerOverride string) (*Controller, error) {
idAllocator := newIDAllocator(asyncRuleDeleteInterval, dnsInterceptRuleID)
Expand All @@ -131,7 +130,6 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
antreaProxyEnabled: antreaProxyEnabled,
statusManagerEnabled: statusManagerEnabled,
loggingEnabled: loggingEnabled,
denyConnStore: denyConnStore,
}
if antreaPolicyEnabled {
var err error
Expand Down Expand Up @@ -408,6 +406,10 @@ func (c *Controller) GetControllerConnectionStatus() bool {
return c.addressGroupWatcher.isConnected() && c.appliedToGroupWatcher.isConnected() && c.networkPolicyWatcher.isConnected()
}

func (c *Controller) SetDenyConnStore(denyConnStore *connections.DenyConnectionStore) {
c.denyConnStore = denyConnStore
}

// Run begins watching and processing Antrea AddressGroups, AppliedToGroups
// and NetworkPolicies, and spawns workers that reconciles NetworkPolicy rules.
// Run will not return until stopCh is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) {
ch2 := make(chan string, 100)
groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(false, ch2)}
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", ch, groupCounters, ch2,
true, true, true, true, nil, testAsyncDeleteInterval, "8.8.8.8:53")
true, true, true, true, testAsyncDeleteInterval, "8.8.8.8:53")
reconciler := newMockReconciler()
controller.reconciler = reconciler
controller.antreaPolicyLogger = nil
Expand Down
8 changes: 3 additions & 5 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,13 @@ type connectionStore struct {
func NewConnectionStore(
ifaceStore interfacestore.InterfaceStore,
proxier proxy.Proxier,
expirePriorityQueue *priorityqueue.ExpirePriorityQueue,
staleConnectionTimeout time.Duration,
) connectionStore {
o *flowexporter.FlowExporterOptions) connectionStore {
return connectionStore{
connections: make(map[flowexporter.ConnectionKey]*flowexporter.Connection),
ifaceStore: ifaceStore,
antreaProxier: proxier,
expirePriorityQueue: expirePriorityQueue,
staleConnectionTimeout: staleConnectionTimeout,
expirePriorityQueue: priorityqueue.NewExpirePriorityQueue(o.ActiveFlowTimeout, o.IdleFlowTimeout),
staleConnectionTimeout: o.StaleConnectionTimeout,
}
}

Expand Down
17 changes: 14 additions & 3 deletions pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,21 @@ import (
)

const (
testActiveFlowTimeout = 3 * time.Second
testIdleFlowTimeout = 1 * time.Second
testPollInterval = 0 // Not used in these tests, hence 0.
testStaleConnectionTimeout = 5 * time.Minute
)

var testFlowExporterOptions = &flowexporter.FlowExporterOptions{
FlowCollectorAddr: "",
FlowCollectorProto: "",
ActiveFlowTimeout: testActiveFlowTimeout,
IdleFlowTimeout: testIdleFlowTimeout,
StaleConnectionTimeout: testStaleConnectionTimeout,
PollInterval: testPollInterval,
}

func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -70,7 +81,7 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
}
// Create connectionStore
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
connStore := NewConnectionStore(mockIfaceStore, nil, nil, testStaleConnectionTimeout)
connStore := NewConnectionStore(mockIfaceStore, nil, testFlowExporterOptions)
// Add flows to the Connection store
for i, flow := range testFlows {
connStore.connections[*testFlowKeys[i]] = flow
Expand All @@ -97,7 +108,7 @@ func TestConnectionStore_DeleteConnWithoutLock(t *testing.T) {
metrics.InitializeConnectionMetrics()
// test on deny connection store
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
denyConnStore := NewDenyConnectionStore(mockIfaceStore, nil, nil, testStaleConnectionTimeout)
denyConnStore := NewDenyConnectionStore(mockIfaceStore, nil, testFlowExporterOptions)
tuple := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255}
conn := &flowexporter.Connection{
FlowKey: tuple,
Expand All @@ -114,7 +125,7 @@ func TestConnectionStore_DeleteConnWithoutLock(t *testing.T) {

// test on conntrack connection store
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, mockIfaceStore, true, false, nil, nil, testPollInterval, nil, testStaleConnectionTimeout)
conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, true, false, nil, mockIfaceStore, nil, testFlowExporterOptions)
conntrackConnStore.connections[connKey] = conn

metrics.TotalAntreaConnectionsInConnTrackTable.Set(1)
Expand Down
16 changes: 9 additions & 7 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,20 @@ type ConntrackConnectionStore struct {

func NewConntrackConnectionStore(
connTrackDumper ConnTrackDumper,
ifaceStore interfacestore.InterfaceStore,
v4Enabled bool,
v6Enabled bool,
proxier proxy.Proxier,
npQuerier querier.AgentNetworkPolicyInfoQuerier,
pollInterval time.Duration,
expirePriorityQueue *priorityqueue.ExpirePriorityQueue,
staleConnectionTimeout time.Duration,
ifaceStore interfacestore.InterfaceStore,
proxier proxy.Proxier,
o *flowexporter.FlowExporterOptions,
) *ConntrackConnectionStore {
return &ConntrackConnectionStore{
connDumper: connTrackDumper,
v4Enabled: v4Enabled,
v6Enabled: v6Enabled,
networkPolicyQuerier: npQuerier,
pollInterval: pollInterval,
connectionStore: NewConnectionStore(ifaceStore, proxier, expirePriorityQueue, staleConnectionTimeout),
pollInterval: o.PollInterval,
connectionStore: NewConnectionStore(ifaceStore, proxier, o),
}
}

Expand Down Expand Up @@ -290,3 +288,7 @@ func (cs *ConntrackConnectionStore) deleteConnWithoutLock(connKey flowexporter.C
metrics.TotalAntreaConnectionsInConnTrackTable.Dec()
return nil
}

func (cs *ConntrackConnectionStore) GetPriorityQueue() *priorityqueue.ExpirePriorityQueue {
return cs.connectionStore.expirePriorityQueue
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (

"antrea.io/antrea/pkg/agent/flowexporter"
connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing"
"antrea.io/antrea/pkg/agent/flowexporter/priorityqueue"
"antrea.io/antrea/pkg/agent/interfacestore"
interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing"
"antrea.io/antrea/pkg/agent/openflow"
Expand Down Expand Up @@ -105,9 +104,7 @@ func setupConntrackConnStore(b *testing.B) (*ConntrackConnectionStore, *connecti
mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true).AnyTimes()

npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl)
pq := priorityqueue.NewExpirePriorityQueue(testActiveFlowTimeout, testIdleFlowTimeout)
return NewConntrackConnectionStore(mockConnDumper, mockIfaceStore, true, false, mockProxier,
npQuerier, testPollInterval, pq, testStaleConnectionTimeout), mockConnDumper
return NewConntrackConnectionStore(mockConnDumper, true, false, npQuerier, mockIfaceStore, nil, testFlowExporterOptions), mockConnDumper
}

func generateConns() []*flowexporter.Connection {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

"antrea.io/antrea/pkg/agent/flowexporter"
connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing"
"antrea.io/antrea/pkg/agent/flowexporter/priorityqueue"
"antrea.io/antrea/pkg/agent/interfacestore"
interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing"
"antrea.io/antrea/pkg/agent/metrics"
Expand Down Expand Up @@ -211,9 +210,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
mockProxier := proxytest.NewMockProxier(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl)
pq := priorityqueue.NewExpirePriorityQueue(testActiveFlowTimeout, testIdleFlowTimeout)
conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, mockIfaceStore, true, false,
mockProxier, npQuerier, testPollInterval, pq, testStaleConnectionTimeout)
conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, true, false, npQuerier, mockIfaceStore, mockProxier, testFlowExporterOptions)

for _, c := range tc {
t.Run(c.name, func(t *testing.T) {
Expand Down Expand Up @@ -296,7 +293,7 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
metrics.TotalAntreaConnectionsInConnTrackTable.Set(float64(len(testFlows)))
// Create connectionStore
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
connStore := NewConntrackConnectionStore(nil, mockIfaceStore, true, false, nil, nil, testPollInterval, nil, testStaleConnectionTimeout)
connStore := NewConntrackConnectionStore(nil, true, false, nil, mockIfaceStore, nil, testFlowExporterOptions)
// Add flows to the connection store.
for i, flow := range testFlows {
connStore.connections[*testFlowKeys[i]] = flow
Expand All @@ -320,7 +317,7 @@ func TestConnectionStore_MetricSettingInPoll(t *testing.T) {
// Create connectionStore
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, mockIfaceStore, true, false, nil, nil, testPollInterval, nil, testStaleConnectionTimeout)
conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, true, false, nil, mockIfaceStore, nil, testFlowExporterOptions)
// Hard-coded conntrack occupancy metrics for test
TotalConnections := 0
MaxConnections := 300000
Expand Down
9 changes: 6 additions & 3 deletions pkg/agent/flowexporter/connections/deny_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ type DenyConnectionStore struct {
connectionStore
}

func NewDenyConnectionStore(ifaceStore interfacestore.InterfaceStore,
proxier proxy.Proxier, expirePriorityQueue *priorityqueue.ExpirePriorityQueue, staleConnectionTimeout time.Duration) *DenyConnectionStore {
func NewDenyConnectionStore(ifaceStore interfacestore.InterfaceStore, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions) *DenyConnectionStore {
return &DenyConnectionStore{
connectionStore: NewConnectionStore(ifaceStore, proxier, expirePriorityQueue, staleConnectionTimeout),
connectionStore: NewConnectionStore(ifaceStore, proxier, o),
}
}

Expand Down Expand Up @@ -137,3 +136,7 @@ func (ds *DenyConnectionStore) deleteConnWithoutLock(connKey flowexporter.Connec
metrics.TotalDenyConnections.Dec()
return nil
}

func (ds *DenyConnectionStore) GetPriorityQueue() *priorityqueue.ExpirePriorityQueue {
return ds.connectionStore.expirePriorityQueue
}
13 changes: 3 additions & 10 deletions pkg/agent/flowexporter/connections/deny_connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,12 @@ import (
"k8s.io/component-base/metrics/legacyregistry"

"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/flowexporter/priorityqueue"
interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing"
"antrea.io/antrea/pkg/agent/metrics"
proxytest "antrea.io/antrea/pkg/agent/proxy/testing"
k8sproxy "antrea.io/antrea/third_party/proxy"
)

const (
testActiveFlowTimeout = 3 * time.Second
testIdleFlowTimeout = 1 * time.Second
)

func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -75,16 +69,15 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
mockIfaceStore.EXPECT().GetInterfaceByIP(tuple.SourceAddress.String()).Return(nil, false)
mockIfaceStore.EXPECT().GetInterfaceByIP(tuple.DestinationAddress.String()).Return(nil, false)

pq := priorityqueue.NewExpirePriorityQueue(testActiveFlowTimeout, testIdleFlowTimeout)
denyConnStore := NewDenyConnectionStore(mockIfaceStore, mockProxier, pq, testStaleConnectionTimeout)
denyConnStore := NewDenyConnectionStore(mockIfaceStore, mockProxier, testFlowExporterOptions)

denyConnStore.AddOrUpdateConn(&testFlow, refTime.Add(-(time.Second * 20)), uint64(60))
expConn := testFlow
expConn.DestinationServicePortName = servicePortName.String()
actualConn, ok := denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&testFlow))
assert.Equal(t, ok, true, "deny connection should be there in deny connection store")
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.Equal(t, 1, pq.Len(), "Length of the expire priority queue should be 1")
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len(), "Length of the expire priority queue should be 1")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))

denyConnStore.AddOrUpdateConn(&testFlow, refTime.Add(-(time.Second * 10)), uint64(60))
Expand All @@ -95,7 +88,7 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
assert.Equal(t, ok, true, "deny connection should be there in deny connection store")
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.True(t, actualConn.IsActive)
assert.Equal(t, 1, pq.Len())
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len())
checkDenyConnectionMetrics(t, len(denyConnStore.connections))
}

Expand Down
Loading