Skip to content

Commit

Permalink
Add an initialization function of FlowExporter for testing purpose
Browse files Browse the repository at this point in the history
Signed-off-by: heanlan <hanlan@vmware.com>
  • Loading branch information
heanlan committed Nov 8, 2021
1 parent 5db792d commit c34dacb
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 10 deletions.
8 changes: 5 additions & 3 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,10 @@ func run(o *Options) error {
v4GroupCounter := proxytypes.NewGroupCounter(false, groupIDUpdates)
v6GroupCounter := proxytypes.NewGroupCounter(true, groupIDUpdates)

v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode)
v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode)
var proxier proxy.Proxier
if features.DefaultFeatureGate.Enabled(features.AntreaProxy) {
v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode)
v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode)
proxyAll := o.config.AntreaProxy.ProxyAll
skipServices := o.config.AntreaProxy.SkipServices

Expand Down Expand Up @@ -368,8 +368,10 @@ func run(o *Options) error {
proxier,
k8sClient,
nodeRouteController,
networkConfig.TrafficEncapMode,
nodeConfig,
v4Enabled,
v6Enabled,
networkConfig.TrafficEncapMode,
serviceCIDRNet,
serviceCIDRNetv6,
&ovsDatapathType,
Expand Down
7 changes: 2 additions & 5 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func prepareExporterInputArgs(collectorAddr, collectorProto, nodeName string) ex
}

func NewFlowExporter(ifaceStore interfacestore.InterfaceStore, proxier proxy.Proxier, k8sClient kubernetes.Interface, nodeRouteController *noderoute.Controller,
trafficEncapMode config.TrafficEncapModeType, nodeConfig *config.NodeConfig, serviceCIDRNet, serviceCIDRNetv6 *net.IPNet, ovsDatapathType *ovsconfig.OVSDatapathType,
nodeConfig *config.NodeConfig, v4Enabled, v6Enabled bool, trafficEncapModeType config.TrafficEncapModeType, serviceCIDRNet, serviceCIDRNetv6 *net.IPNet, ovsDatapathType *ovsconfig.OVSDatapathType,
proxyEnabled bool, npQuerier querier.AgentNetworkPolicyInfoQuerier, o *flowexporter.FlowExporterOptions) (*FlowExporter, error) {
// Initialize IPFIX registry
registry := ipfix.NewIPFIXRegistry()
Expand All @@ -162,9 +162,6 @@ func NewFlowExporter(ifaceStore interfacestore.InterfaceStore, proxier proxy.Pro
}
expInput := prepareExporterInputArgs(o.FlowCollectorAddr, o.FlowCollectorProto, nodeName)

v4Enabled := config.IsIPv4Enabled(nodeConfig, trafficEncapMode)
v6Enabled := config.IsIPv6Enabled(nodeConfig, trafficEncapMode)

connTrackDumper := connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, serviceCIDRNetv6, *ovsDatapathType, proxyEnabled)
denyConnStore := connections.NewDenyConnectionStore(ifaceStore, proxier, o)
conntrackConnStore := connections.NewConntrackConnectionStore(connTrackDumper, v4Enabled, v6Enabled, npQuerier, ifaceStore, proxier, o)
Expand All @@ -179,7 +176,7 @@ func NewFlowExporter(ifaceStore interfacestore.InterfaceStore, proxier proxy.Pro
ipfixSet: ipfixentities.NewSet(false),
k8sClient: k8sClient,
nodeRouteController: nodeRouteController,
isNetworkPolicyOnly: trafficEncapMode.IsNetworkPolicyOnly(),
isNetworkPolicyOnly: trafficEncapModeType.IsNetworkPolicyOnly(),
nodeName: nodeName,
conntrackPriorityQueue: conntrackConnStore.GetPriorityQueue(),
denyPriorityQueue: denyConnStore.GetPriorityQueue(),
Expand Down
38 changes: 36 additions & 2 deletions pkg/agent/flowexporter/exporter/exporter_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ import (
"time"

"github.com/golang/mock/gomock"
ipfixentities "github.com/vmware/go-ipfix/pkg/entities"
"github.com/vmware/go-ipfix/pkg/registry"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/flowexporter/connections"
"antrea.io/antrea/pkg/agent/flowexporter/priorityqueue"
"antrea.io/antrea/pkg/ipfix"
)

const (
Expand Down Expand Up @@ -145,6 +146,39 @@ func BenchmarkExportDenyConns(b *testing.B) {

}

func NewFlowExporterForTest(o *flowexporter.FlowExporterOptions) (*FlowExporter, error) {
// Initialize IPFIX registry
registry := ipfix.NewIPFIXRegistry()
registry.LoadRegistry()

// Prepare input args for IPFIX exporting process.
nodeName := "test-node"
expInput := prepareExporterInputArgs(o.FlowCollectorAddr, o.FlowCollectorProto, nodeName)

v4Enabled := true
v6Enabled := false

denyConnStore := connections.NewDenyConnectionStore(nil, nil, o)
conntrackConnStore := connections.NewConntrackConnectionStore(nil, v4Enabled, v6Enabled, nil, nil, nil, o)

return &FlowExporter{
conntrackConnStore: conntrackConnStore,
denyConnStore: denyConnStore,
registry: registry,
v4Enabled: v4Enabled,
v6Enabled: v6Enabled,
exporterInput: expInput,
ipfixSet: ipfixentities.NewSet(false),
k8sClient: nil,
nodeRouteController: nil,
isNetworkPolicyOnly: false,
nodeName: nodeName,
conntrackPriorityQueue: conntrackConnStore.GetPriorityQueue(),
denyPriorityQueue: denyConnStore.GetPriorityQueue(),
expiredConns: make([]flowexporter.Connection, 0, maxConnsToExport*2),
}, nil
}

func setupExporter(isConntrackConn bool) (*FlowExporter, error) {
var err error
collectorAddr, err := startLocalServer()
Expand All @@ -160,7 +194,7 @@ func setupExporter(isConntrackConn bool) (*FlowExporter, error) {
IdleFlowTimeout: testIdleFlowTimeout,
StaleConnectionTimeout: 1,
PollInterval: 1}
exp, _ := NewFlowExporter(nil, nil, nil, nil, config.TrafficEncapModeEncap, nil, nil, nil, nil, false, nil, o)
exp, _ := NewFlowExporterForTest(o)
if isConntrackConn {
addConns(exp.conntrackConnStore, exp.conntrackConnStore.GetPriorityQueue())
} else {
Expand Down

0 comments on commit c34dacb

Please sign in to comment.