Skip to content

Commit

Permalink
Connection store patch:
Browse files Browse the repository at this point in the history
Addressed comments. Optimized the check of sysctl settings check.
Addressed merge conflicts.
  • Loading branch information
srikartati committed Jul 2, 2020
1 parent 3e86bc6 commit d0edf77
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 89 deletions.
7 changes: 2 additions & 5 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ import (
"github.com/vmware-tanzu/antrea/pkg/agent/config"
"github.com/vmware-tanzu/antrea/pkg/agent/controller/networkpolicy"
"github.com/vmware-tanzu/antrea/pkg/agent/controller/noderoute"
<<<<<<< HEAD
"github.com/vmware-tanzu/antrea/pkg/agent/controller/traceflow"
=======
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/connections"
>>>>>>> Create and maintain connection store for Antrea flows in conntrack
"github.com/vmware-tanzu/antrea/pkg/agent/interfacestore"
"github.com/vmware-tanzu/antrea/pkg/agent/metrics"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
Expand Down Expand Up @@ -240,8 +237,8 @@ func run(o *Options) error {
}
// Create connection store that polls conntrack flows with a given polling interval.
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
connTrack := connections.NewConnTrackDumper(nodeConfig, serviceCIDRNet, connections.NewConnTrackInterfacer())
connStore := connections.NewConnectionStore(connTrack, ifaceStore)
ctDumper := connections.NewConnTrackDumper(nodeConfig, serviceCIDRNet, connections.NewConnTrackInterfacer())
connStore := connections.NewConnectionStore(ctDumper, ifaceStore)
go connStore.Run(stopCh)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ func (cs *connectionStore) Run(stopCh <-chan struct{}) {
case <-ticker.C:
_, err := cs.poll()
if err != nil {
// Not failing here as errors can be transient and could be resolved in future poll cycles.
// TODO: Come up with a backoff/retry mechanism by increasing poll interval and adding retry timeout
klog.Errorf("Error during conntrack poll cycle: %v", err)
break
}
}
}
Expand Down
91 changes: 39 additions & 52 deletions pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,77 +28,64 @@ import (
interfacestoretest "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore/testing"
)

var (
refTime = time.Now()
tuple1 = flowexporter.Tuple{
SourceAddress: net.IP{1, 2, 3, 4},
DestinationAddress: net.IP{4, 3, 2, 1},
Protocol: 6,
SourcePort: 65280,
DestinationPort: 255,
func makeTuple(srcIP *net.IP, dstIP *net.IP, protoID uint8, srcPort uint16, dstPort uint16) (*flowexporter.Tuple, *flowexporter.Tuple) {
tuple := &flowexporter.Tuple{
SourceAddress: *srcIP,
DestinationAddress: *dstIP,
Protocol: protoID,
SourcePort: srcPort,
DestinationPort: dstPort,
}
revTuple1 = flowexporter.Tuple{
SourceAddress: net.IP{4, 3, 2, 1},
DestinationAddress: net.IP{1, 2, 3, 4},
Protocol: 6,
SourcePort: 255,
DestinationPort: 65280,
revTuple := &flowexporter.Tuple{
SourceAddress: *dstIP,
DestinationAddress: *srcIP,
Protocol: protoID,
SourcePort: dstPort,
DestinationPort: srcPort,
}
flow1 = flowexporter.Connection{
return tuple, revTuple
}

func TestConnectionStore_addAndUpdateConn(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
// Create two flows; one is already in connectionStore and other one is new
refTime := time.Now()
// Flow-1, which is already in connectionStore
tuple1, revTuple1 := makeTuple(&net.IP{1, 2, 3, 4}, &net.IP{4, 3, 2, 1}, 6, 65280, 255)
testFlow1 := flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 50)),
StopTime: refTime,
OriginalPackets: 0xffff,
OriginalBytes: 0xbaaaaa0000000000,
ReversePackets: 0xff,
ReverseBytes: 0xbaaa,
TupleOrig: tuple1,
TupleReply: revTuple1,
}

tuple2 = flowexporter.Tuple{
SourceAddress: net.IP{5, 6, 7, 8},
DestinationAddress: net.IP{8, 7, 6, 5},
Protocol: 6,
SourcePort: 60001,
DestinationPort: 200,
TupleOrig: *tuple1,
TupleReply: *revTuple1,
}
revTuple2 = flowexporter.Tuple{
SourceAddress: net.IP{8, 7, 6, 5},
DestinationAddress: net.IP{5, 6, 7, 8},
Protocol: 6,
SourcePort: 200,
DestinationPort: 60001,
}
flow2 = flowexporter.Connection{
// Flow-2, which is not in connectionStore
tuple2, revTuple2 := makeTuple(&net.IP{5, 6, 7, 8}, &net.IP{8, 7, 6, 5}, 6, 60001, 200)
testFlow2 := flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 20)),
StopTime: refTime,
OriginalPackets: 0xbb,
OriginalBytes: 0xcbbb,
ReversePackets: 0xbbbb,
ReverseBytes: 0xcbbbb0000000000,
TupleOrig: tuple2,
TupleReply: revTuple2,
TupleOrig: *tuple2,
TupleReply: *revTuple2,
}
)

func TestConnectionStore_addAndUpdateConn(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
// Two flows; one is already in connectionStore and other one is new
testFlow1 := flow1
testFlow2 := flow2

// Create old conntrack flow for testing purposes.
// This flow is already in connection map.
// Create copy of old conntrack flow for testing purposes.
// This flow is already in connection store.
oldTestFlow1 := flowexporter.Connection{
StartTime: flow1.StartTime,
StopTime: flow1.StopTime.Add(-(time.Second * 30)),
StartTime: testFlow1.StartTime,
StopTime: testFlow1.StopTime.Add(-(time.Second * 30)),
OriginalPackets: 0xfff,
OriginalBytes: 0xbaaaaa00000000,
ReversePackets: 0xf,
ReverseBytes: 0xba,
TupleOrig: tuple1,
TupleReply: revTuple1,
TupleOrig: *tuple1,
TupleReply: *revTuple1,
SourcePodNamespace: "ns1",
SourcePodName: "pod1",
DestinationPodNamespace: "",
Expand Down Expand Up @@ -137,11 +124,11 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
flowTuple := flowexporter.NewConnectionKey(&test.flow)
var expConn flowexporter.Connection
if i == 0 {
expConn = flow1
expConn = test.flow
expConn.SourcePodNamespace = "ns1"
expConn.SourcePodName = "pod1"
} else {
expConn = flow2
expConn = test.flow
expConn.DestinationPodNamespace = "ns2"
expConn.DestinationPodName = "pod2"
iStore.EXPECT().GetInterfaceByIP(test.flow.TupleOrig.SourceAddress.String()).Return(nil, false)
Expand Down
46 changes: 21 additions & 25 deletions pkg/agent/flowexporter/connections/conntrack_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ package connections

import (
"net"
"os"

"github.com/ti-mo/conntrack"
"k8s.io/klog"

"github.com/vmware-tanzu/antrea/pkg/agent/config"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
"github.com/vmware-tanzu/antrea/pkg/util/sysctl"
"github.com/vmware-tanzu/antrea/pkg/agent/util/sysctl"
)

var _ ConnTrackDumper = new(connTrackDumper)
Expand Down Expand Up @@ -55,29 +54,6 @@ func (ctdump *connTrackDumper) DumpFlows(zoneFilter uint16) ([]*flowexporter.Con
return nil, err
}

// Check value of setting net.netfilter.nf_conntrack_acct. Set to value 1, if it is not set.
connTrackAcct, err := sysctl.GetSysctlNet("netfilter/nf_conntrack_acct")
if err != nil {
if !os.IsPermission(err) {
klog.Errorf("Error when getting net.netfilter.nf_conntrack_acct")
return nil, err
} else {
klog.Errorf("Permission denied to access net.netfilter.nf_conntrack_acct: Counters in flow records may not update")
}
} else {
if connTrackAcct == 0 {
err = sysctl.SetSysctlNet("netfilter/nf_conntrack_acct", 1)
if err != nil {
if !os.IsPermission(err) {
klog.Errorf("Error when setting net.netfilter.nf_conntrack_acct")
return nil, err
} else {
klog.Errorf("Permission denied to access net.netfilter.nf_conntrack_acct: Counters in flow records may not update")
}
}
}
}

// ZoneID filter is not supported currently in tl-mo/conntrack library.
// Link to issue: https://github.com/ti-mo/conntrack/issues/23
// Dump all flows in the conntrack table for now.
Expand Down Expand Up @@ -133,6 +109,26 @@ type connTrackSystem struct {
}

func NewConnTrackInterfacer() *connTrackSystem {
// Check value of setting net.netfilter.nf_conntrack_acct. Set to value 1, if it is not set.
connTrackAcct, err := sysctl.GetSysctlNet("netfilter/nf_conntrack_acct")
if err != nil {
// Continue with creation of interfacer object as we can dump flows with no stats and that information can still be useful.
// If permission error, please provide access to net.netfilter.nf_conntrack_acct. This will enable flow exporter to export stats and timestamps of connections.
klog.Errorf("Error when getting net.netfilter.nf_conntrack_acct: %v", err)
} else {
if connTrackAcct == 0 {
err = sysctl.SetSysctlNet("netfilter/nf_conntrack_acct", 1)
if err != nil {
// If permission error, please provide access to net.netfilter.nf_conntrack_acct.
klog.Errorf("Error when setting net.netfilter.nf_conntrack_acct: %v", err)
}
// Set the conntrack timestamp setting to get timestamps of connections
err = sysctl.SetSysctlNet("netfilter/nf_conntrack_timestamp", 1)
if err != nil {
klog.Errorf("Error when setting net.netfilter.nf_conntrack_timestamp: %v", err)
}
}
}
return &connTrackSystem{}
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/agent/flowexporter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ type Tuple struct {

type Connection struct {
// Fields from conntrack flows
ID uint32
Timeout uint32
StartTime time.Time
ID uint32
Timeout uint32
StartTime time.Time
// For invalid and closed connections: StopTime is the time when connection was updated last.
// For established connections: StopTime is latest time when it was polled.
StopTime time.Time
Zone uint16
StatusFlag uint32
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (c *client) connectionTrackFlows(category cookie.Category) []binding.Flow {
Done(),
// Enable NAT.
connectionTrackTable.BuildFlow(priorityNormal).MatchProtocol(binding.ProtocolIP).
Action().CT(false, connectionTrackTable.GetNext(), ctZone).NAT().CTDone().
Action().CT(false, connectionTrackTable.GetNext(), CtZone).NAT().CTDone().
Cookie(c.cookieAllocator.Request(category).Raw()).
Done(),
connectionTrackCommitTable.BuildFlow(priorityLow).MatchProtocol(binding.ProtocolIP).
Expand All @@ -398,7 +398,7 @@ func (c *client) connectionTrackFlows(category cookie.Category) []binding.Flow {
} else {
flows = append(flows,
connectionTrackTable.BuildFlow(priorityNormal).MatchProtocol(binding.ProtocolIP).
Action().CT(false, connectionTrackTable.GetNext(), ctZone).CTDone().
Action().CT(false, connectionTrackTable.GetNext(), CtZone).CTDone().
Cookie(c.cookieAllocator.Request(category).Raw()).
Done(),
)
Expand Down Expand Up @@ -1140,7 +1140,7 @@ func (c *client) endpointDNATFlow(endpointIP net.IP, endpointPort uint16, protoc
MatchProtocol(protocol).
MatchReg(int(endpointIPReg), ipVal).
MatchRegRange(int(endpointPortReg), unionVal, binding.Range{0, 18}).
Action().CT(true, EgressRuleTable, ctZone).
Action().CT(true, EgressRuleTable, CtZone).
DNAT(
&binding.IPRange{StartIP: endpointIP, EndIP: endpointIP},
&binding.PortRange{StartPort: endpointPort, EndPort: endpointPort},
Expand Down
File renamed without changes.

0 comments on commit d0edf77

Please sign in to comment.