diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 521b0a09599..d5be639a785 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -29,6 +29,7 @@ import ( "github.com/vmware-tanzu/antrea/pkg/agent/config" "github.com/vmware-tanzu/antrea/pkg/agent/controller/networkpolicy" "github.com/vmware-tanzu/antrea/pkg/agent/controller/noderoute" + "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/connections" "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore" "github.com/vmware-tanzu/antrea/pkg/agent/metrics" "github.com/vmware-tanzu/antrea/pkg/agent/openflow" @@ -192,6 +193,13 @@ func run(o *Options) error { } go apiServer.Run(stopCh) + // Create connection store that polls conntrack flows with a given polling interval. + if o.config.EnableFlowExporter { + connTrack := connections.NewConnTrackPoller(nodeConfig, connections.NewConnTrack()) + connStore := connections.NewConnectionStore(connTrack, ifaceStore) + go connStore.Run(stopCh) + } + <-stopCh klog.Info("Stopping Antrea agent") return nil diff --git a/cmd/antrea-agent/config.go b/cmd/antrea-agent/config.go index e0dc1b4d1d1..1b2552a0ce6 100644 --- a/cmd/antrea-agent/config.go +++ b/cmd/antrea-agent/config.go @@ -83,4 +83,7 @@ type AgentConfig struct { // Enable metrics exposure via Prometheus. Initializes Prometheus metrics listener // Defaults to false. EnablePrometheusMetrics bool `yaml:"enablePrometheusMetrics,omitempty"` + // Enable flow exporter that exports IPFIX flow records of conntrack flows on OVS bridge + // Defaults to false. + EnableFlowExporter bool `yaml:"enableFlowExporter,omitempty"` } diff --git a/cmd/antrea-agent/options.go b/cmd/antrea-agent/options.go index ecde33a29f1..dd987ac3f0c 100644 --- a/cmd/antrea-agent/options.go +++ b/cmd/antrea-agent/options.go @@ -101,6 +101,9 @@ func (o *Options) validate(args []string) error { if encapMode.SupportsNoEncap() && o.config.EnableIPSecTunnel { return fmt.Errorf("IPSec tunnel may only be enabled on %s mode", config.TrafficEncapModeEncap) } + if o.config.OVSDatapathType == ovsconfig.OVSDatapathNetdev && o.config.EnableFlowExporter { + return fmt.Errorf("Flow exporter is not supported for OVS datapath type %s", o.config.OVSDatapathType) + } return nil } diff --git a/go.mod b/go.mod index 12c7970c799..86d67ed12cc 100644 --- a/go.mod +++ b/go.mod @@ -37,13 +37,13 @@ require ( github.com/spf13/pflag v1.0.5 github.com/streamrail/concurrent-map v0.0.0-20160823150647-8bf1e9bacbf6 // indirect github.com/stretchr/testify v1.4.0 + github.com/ti-mo/conntrack v0.3.0 github.com/vishvananda/netlink v1.1.0 github.com/vmware-tanzu/octant v0.10.2 golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975 golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495 - golang.org/x/net v0.0.0-20191126235420-ef20fe5d7933 // indirect golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e - golang.org/x/sys v0.0.0-20200122134326-e047566fdf82 + golang.org/x/sys v0.0.0-20200331124033-c3d80250170d golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 google.golang.org/grpc v1.23.1 gopkg.in/yaml.v2 v2.2.8 diff --git a/go.sum b/go.sum index 6ef5ae17ad3..f9d3b0b5ae8 100644 --- a/go.sum +++ b/go.sum @@ -207,8 +207,10 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= -github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -269,6 +271,9 @@ github.com/j-keck/arping v1.0.0/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivF github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= +github.com/jsimonetti/rtnetlink v0.0.0-20190606172950-9527aa82566a/go.mod h1:Oz+70psSo5OFh8DBl0Zv2ACw7Esh6pPUphlvZG9x7uw= +github.com/jsimonetti/rtnetlink v0.0.0-20200117123717-f846d4f6c1f4 h1:nwOc1YaOrYJ37sEBrtWZrdqzK22hiJs3GpDmP3sR2Yw= +github.com/jsimonetti/rtnetlink v0.0.0-20200117123717-f846d4f6c1f4/go.mod h1:WGuG/smIU4J/54PblvSbh+xvCZmpJnFgr3ds6Z55XMQ= github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -317,6 +322,10 @@ github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/mdlayher/netlink v0.0.0-20190409211403-11939a169225/go.mod h1:eQB3mZE4aiYnlUsyGGCOpPETfdQq4Jhsgf1fk3cwQaA= +github.com/mdlayher/netlink v1.0.0/go.mod h1:KxeJAFOFLG6AjpyDkQ/iIhxygIUKD+vcwqcnu43w/+M= +github.com/mdlayher/netlink v1.1.0 h1:mpdLgm+brq10nI9zM1BpX1kpDbh3NLl3RSnVq6ZSkfg= +github.com/mdlayher/netlink v1.1.0/go.mod h1:H4WCitaheIsdF9yOYu8CFmCgQthAPIWZmcKp9uZHgmY= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/go-testing-interface v1.0.0 h1:fzU/JVNcaqHQEcVFAKeR41fkiLdIPrefOvVG1VZ96U0= @@ -365,8 +374,9 @@ github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+v github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -441,6 +451,10 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/ti-mo/conntrack v0.3.0 h1:572/72R9la2FVvO6CbsLiCmR48U3pgCvIlLKoUrExDU= +github.com/ti-mo/conntrack v0.3.0/go.mod h1:tPSYNx21TnjxGz99pLD/lAN4fuEViaJZz+pliMqnovk= +github.com/ti-mo/netfilter v0.3.1 h1:+ZTmeTx+64Jw2N/1gmqm42kruDWjQ90SMjWEB1e6VDs= +github.com/ti-mo/netfilter v0.3.1/go.mod h1:t/5HvCCHA1LAYj/AZF2fWcJ23BQTA7lzTPCuwwi7xQY= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= @@ -520,8 +534,10 @@ golang.org/x/net v0.0.0-20190812203447-cdfb69ac37fc/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20191126235420-ef20fe5d7933 h1:e6HwijUxhDe+hPNjZQQn9bA5PW3vNmnN64U2ZW759Lk= -golang.org/x/net v0.0.0-20191126235420-ef20fe5d7933/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191007182048-72f939374954/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e h1:3G+cUijn7XD+S4eJFddp53Pv7+slrESplyjG25HgL+k= +golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 h1:SVwTIAaPC2U/AvvLNZ2a7OVsmBpC8L5BlwK1whH3hm0= @@ -546,6 +562,7 @@ golang.org/x/sys v0.0.0-20190209173611-3b5209105503/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190411185658-b44545bcd369/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -558,8 +575,10 @@ golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191128015809-6d18c012aee9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200122134326-e047566fdf82 h1:ywK/j/KkyTHcdyYSZNXGjMwgmDSfjglYZ3vStQ/gSCU= -golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200331124033-c3d80250170d h1:nc5K6ox/4lTFbMVSL9WRR81ixkcwXThoiF6yf+R9scA= +golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -587,6 +606,8 @@ golang.org/x/tools v0.0.0-20190614205625-5aca471b1d59/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.0.0-20190331200053-3d26580ed485/go.mod h1:2ltnJ7xHfj0zHS40VVPYEAAMTa3ZGguvHGBSJeRWqE0= gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= gonum.org/v1/netlib v0.0.0-20190331212654-76723241ea4e/go.mod h1:kS+toOQn6AQKjmKJ7gzohV1XkqsFehRA2FbsbkopSuQ= diff --git a/hack/update-codegen-dockerized.sh b/hack/update-codegen-dockerized.sh index fe645e8cf7a..efa41c30766 100755 --- a/hack/update-codegen-dockerized.sh +++ b/hack/update-codegen-dockerized.sh @@ -88,6 +88,7 @@ MOCKGEN_TARGETS=( "pkg/agent/querier AgentQuerier" "pkg/controller/querier ControllerQuerier" "pkg/querier AgentNetworkPolicyInfoQuerier" + "pkg/agent/flowexporter/connections ConnTrackPoller,ConnTrack" ) # Command mockgen does not automatically replace variable YEAR with current year diff --git a/pkg/agent/flowexporter/connections/connections.go b/pkg/agent/flowexporter/connections/connections.go new file mode 100644 index 00000000000..afaa1566461 --- /dev/null +++ b/pkg/agent/flowexporter/connections/connections.go @@ -0,0 +1,116 @@ +package connections + +import ( + "sync" + "time" + + "k8s.io/klog" + + "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter" + "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore" + "github.com/vmware-tanzu/antrea/pkg/agent/openflow" +) + +var _ ConnectionStore = new(connectionStore) + +type ConnectionStore interface { + Run(stopCh <-chan struct{}) +} + +type connectionStore struct { + connections map[flowexporter.ConnectionKey]flowexporter.Connection // Add 5-tuple as string array + connTrackPoller ConnTrackPoller + ifaceStore interfacestore.InterfaceStore + mutex sync.Mutex +} + +func NewConnectionStore(connTrack ConnTrackPoller, ifaceStore interfacestore.InterfaceStore) *connectionStore { + return &connectionStore{ + connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection), + connTrackPoller: connTrack, + ifaceStore: ifaceStore, + } +} + +// Run polls the conntrack module periodically to get connections. These connections are used +// to build connection store. +func (cs *connectionStore) Run(stopCh <-chan struct{}) { + klog.Infof("Starting conntrack polling") + for { + select { + case <-stopCh: + break + case <-time.After(flowexporter.PollInterval): + _, err := cs.poll() + if err != nil { + klog.Errorf("Error during conntrack poll cycle: %v", err) + } + } + } +} + +// addOrUpdateConn updates the connection if it is already present, i.e., update timestamp, counters etc., +// or add a new Connection by 5-tuple of the flow along with local Pod and PodNameSpace. +func (cs *connectionStore) addOrUpdateConn(conn *flowexporter.Connection) { + connKey := flowexporter.NewConnectionKey(conn) + + existingConn, exists := cs.getConnByKey(connKey) + + cs.mutex.Lock() + defer cs.mutex.Unlock() + if exists { + // Update the necessary fields that are used in generating flow records. + // Can same 5-tuple flow get deleted and added to conntrack table? If so use ID. + existingConn.StopTime = conn.StopTime + existingConn.OriginalBytes = conn.OriginalBytes + existingConn.OriginalPackets = conn.OriginalPackets + existingConn.ReverseBytes = conn.ReverseBytes + existingConn.ReversePackets = conn.ReversePackets + // Reassign the flow to update the map + cs.connections[connKey] = *existingConn + klog.V(4).Infof("Antrea flow updated: %v", existingConn) + } else { + var srcFound, dstFound bool + sIface, srcFound := cs.ifaceStore.GetInterfaceByIP(conn.TupleOrig.SourceAddress.String()) + dIface, dstFound := cs.ifaceStore.GetInterfaceByIP(conn.TupleReply.SourceAddress.String()) + if !srcFound && !dstFound { + klog.Warningf("Cannot map any of the IP %s or %s to a local Pod", conn.TupleOrig.SourceAddress.String(), conn.TupleReply.SourceAddress.String()) + } + if srcFound && sIface.Type == interfacestore.ContainerInterface { + conn.SourcePodName = sIface.ContainerInterfaceConfig.PodName + conn.SourcePodNamespace = sIface.ContainerInterfaceConfig.PodNamespace + } + if dstFound && dIface.Type == interfacestore.ContainerInterface { + conn.DestinationPodName = dIface.ContainerInterfaceConfig.PodName + conn.DestinationPodNamespace = dIface.ContainerInterfaceConfig.PodNamespace + } + klog.V(2).Infof("New Antrea flow added: %v", conn) + // Add new antrea connection to connection store + cs.connections[connKey] = *conn + } +} + +func (cs *connectionStore) getConnByKey(flowTuple flowexporter.ConnectionKey) (*flowexporter.Connection, bool) { + cs.mutex.Lock() + defer cs.mutex.Unlock() + conn, found := cs.connections[flowTuple] + return &conn, found +} + +// poll returns number of filtered connections after poll cycle +func (cs *connectionStore) poll() (int, error) { + klog.V(2).Infof("Polling conntrack") + + filteredConns, err := cs.connTrackPoller.DumpFlows(openflow.CtZone) + if err != nil { + klog.Errorf("Error when dumping flows from conntrack: %v", err) + return 0, err + } + // Update only the Connection store. IPFIX records are generated based on Connection store. + for _, conn := range filteredConns { + cs.addOrUpdateConn(conn) + } + klog.V(2).Infof("Conntrack polling successful") + + return len(filteredConns), nil +} diff --git a/pkg/agent/flowexporter/connections/connections_test.go b/pkg/agent/flowexporter/connections/connections_test.go new file mode 100644 index 00000000000..4ee859d726a --- /dev/null +++ b/pkg/agent/flowexporter/connections/connections_test.go @@ -0,0 +1,140 @@ +package connections + +import ( + "net" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + + "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter" + connectionstest "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/connections/testing" + "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore" + interfacestoretest "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore/testing" +) + +var ( + refTime = time.Now() + tuple1 = flowexporter.Tuple{ + SourceAddress: net.IP{1, 2, 3, 4}, + DestinationAddress: net.IP{4, 3, 2, 1}, + Protocol: 6, + SourcePort: 65280, + DestinationPort: 255, + } + revTuple1 = flowexporter.Tuple{ + SourceAddress: net.IP{4, 3, 2, 1}, + DestinationAddress: net.IP{1, 2, 3, 4}, + Protocol: 6, + SourcePort: 255, + DestinationPort: 65280, + } + flow1 = flowexporter.Connection{ + StartTime: refTime.Add(-(time.Second * 50)), + StopTime: refTime, + OriginalPackets: 0xffff, + OriginalBytes: 0xbaaaaa0000000000, + ReversePackets: 0xff, + ReverseBytes: 0xbaaa, + TupleOrig: tuple1, + TupleReply: revTuple1, + } + + tuple2 = flowexporter.Tuple{ + SourceAddress: net.IP{5, 6, 7, 8}, + DestinationAddress: net.IP{8, 7, 6, 5}, + Protocol: 6, + SourcePort: 60001, + DestinationPort: 200, + } + revTuple2 = flowexporter.Tuple{ + SourceAddress: net.IP{8, 7, 6, 5}, + DestinationAddress: net.IP{5, 6, 7, 8}, + Protocol: 6, + SourcePort: 200, + DestinationPort: 60001, + } + flow2 = flowexporter.Connection{ + StartTime: refTime.Add(-(time.Second * 20)), + StopTime: refTime, + OriginalPackets: 0xbb, + OriginalBytes: 0xcbbb, + ReversePackets: 0xbbbb, + ReverseBytes: 0xcbbbb0000000000, + TupleOrig: tuple2, + TupleReply: revTuple2, + } +) + +func TestConnectionStore_addAndUpdateConn(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + // Two flows; one is already in connectionStore and other one is new + testFlow1 := flow1 + testFlow2 := flow2 + + // Create old conntrack flow for testing purposes. + // This flow is already in connection map. + oldTestFlow1 := flowexporter.Connection{ + StartTime: flow1.StartTime, + StopTime: flow1.StopTime.Add(-(time.Second * 30)), + OriginalPackets: 0xfff, + OriginalBytes: 0xbaaaaa00000000, + ReversePackets: 0xf, + ReverseBytes: 0xba, + TupleOrig: tuple1, + TupleReply: revTuple1, + SourcePodNamespace: "ns1", + SourcePodName: "pod1", + DestinationPodNamespace: "", + DestinationPodName: "", + } + podConfigFlow2 := &interfacestore.ContainerInterfaceConfig{ + ContainerID: "2", + PodName: "pod2", + PodNamespace: "ns2", + } + interfaceFlow2 := &interfacestore.InterfaceConfig{ + InterfaceName: "interface2", + IP: net.IP{8, 7, 6, 5}, + ContainerInterfaceConfig: podConfigFlow2, + } + // Mock interface store with one of the couple of IPs correspond to Pods + iStore := interfacestoretest.NewMockInterfaceStore(ctrl) + mockCTPoller := connectionstest.NewMockConnTrackPoller(ctrl) + // Create connectionStore + connStore := &connectionStore{ + connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection), + connTrackPoller: mockCTPoller, + ifaceStore: iStore, + } + // Add flow1conn to the Connection map + testFlow1Tuple := flowexporter.NewConnectionKey(&testFlow1) + connStore.connections[testFlow1Tuple] = oldTestFlow1 + + updateConnTests := []struct { + flow flowexporter.Connection + }{ + {testFlow1}, // To test update part of function + {testFlow2}, // To test add part of function + } + for i, test := range updateConnTests { + flowTuple := flowexporter.NewConnectionKey(&test.flow) + var expConn flowexporter.Connection + if i == 0 { + expConn = flow1 + expConn.SourcePodNamespace = "ns1" + expConn.SourcePodName = "pod1" + } else { + expConn = flow2 + expConn.DestinationPodNamespace = "ns2" + expConn.DestinationPodName = "pod2" + iStore.EXPECT().GetInterfaceByIP(test.flow.TupleOrig.SourceAddress.String()).Return(nil, false) + iStore.EXPECT().GetInterfaceByIP(test.flow.TupleReply.SourceAddress.String()).Return(interfaceFlow2, true) + } + connStore.addOrUpdateConn(&test.flow) + actualConn, _ := connStore.getConnByKey(flowTuple) + assert.Equal(t, expConn, *actualConn, "Connections should be equal") + } +} diff --git a/pkg/agent/flowexporter/connections/conntrack_linux.go b/pkg/agent/flowexporter/connections/conntrack_linux.go new file mode 100644 index 00000000000..5f7c99316b5 --- /dev/null +++ b/pkg/agent/flowexporter/connections/conntrack_linux.go @@ -0,0 +1,145 @@ +// +build linux + +package connections + +import ( + "k8s.io/klog" + + "github.com/ti-mo/conntrack" + + "github.com/vmware-tanzu/antrea/pkg/agent/config" + "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter" + "github.com/vmware-tanzu/antrea/pkg/agent/openflow" +) + +var _ ConnTrackPoller = new(connTrackPoller) + +type connTrackPoller struct { + nodeConfig *config.NodeConfig + conntrack ConnTrack +} + +func NewConnTrackPoller(nodeConfig *config.NodeConfig, conntrack ConnTrack) *connTrackPoller { + return &connTrackPoller{ + nodeConfig, + conntrack, + } +} + +// DumpFlows opens netlink connection and dumps all the flows in Antrea ZoneID +// of conntrack table, i.e., corresponding to Antrea OVS bridge. +func (cp *connTrackPoller) DumpFlows(zoneFilter uint16) ([]*flowexporter.Connection, error) { + // Get netlink Connection to netfilter + err := cp.conntrack.Dial() + if err != nil { + klog.Errorf("Error when getting netlink conn: %v", err) + return nil, err + } + + // ZoneID filter is not supported currently in tl-mo/conntrack library. + // Link to issue: https://github.com/ti-mo/conntrack/issues/23 + // Dump all flows in the conntrack table for now. + conns, err := cp.conntrack.DumpFilter(conntrack.Filter{}) + if err != nil { + klog.Errorf("Error when dumping flows from conntrack: %v", err) + return nil, err + } + + filteredConns := make([]*flowexporter.Connection, 0, len(conns)) + for _, conn := range conns { + if conn.Zone != openflow.CtZone { + continue + } + srcIP := conn.TupleOrig.IP.SourceAddress + dstIP := conn.TupleReply.IP.SourceAddress + // Only get Pod-to-Pod flows. Pod-to-externalService flows are ignored for now. + // Note that Pod-to-Pod flows include From/To flows to k8s service with clusterIP. + if srcIP.Equal(cp.nodeConfig.GatewayConfig.IP) { + continue + } + if dstIP.Equal(cp.nodeConfig.GatewayConfig.IP) { + continue + } + filteredConns = append(filteredConns, createAntreaConn(&conn)) + } + + klog.V(2).Infof("Finished poll cycle -- total flows: %d flows in Antrea zoneID: %d", len(conns), len(filteredConns)) + + return filteredConns, nil +} + +// ConnTrack is an interface created to consume the required functions from the third party +// conntrack library. This is helpful in writing unit tests. +var _ ConnTrack = new(connTrack) + +type ConnTrack interface { + Dial() error + DumpFilter(filter conntrack.Filter) ([]conntrack.Flow, error) +} + +type connTrack struct { + netlinkConn *conntrack.Conn +} + +func NewConnTrack() *connTrack { + return &connTrack{} +} + +func (c *connTrack) Dial() error { + // Get conntrack in current namespace + conn, err := conntrack.Dial(nil) + if err != nil { + klog.Errorf("Error when dialing conntrack: %v", err) + return err + } + c.netlinkConn = conn + return nil +} + +func (c *connTrack) DumpFilter(filter conntrack.Filter) ([]conntrack.Flow, error) { + conns, err := c.netlinkConn.DumpFilter(filter) + if err != nil { + klog.Errorf("Error when dumping flows from conntrack: %v", err) + return nil, err + } + return conns, nil +} + +func createAntreaConn(conn *conntrack.Flow) *flowexporter.Connection { + tupleOrig := flowexporter.Tuple{ + SourceAddress: conn.TupleOrig.IP.SourceAddress, + DestinationAddress: conn.TupleOrig.IP.DestinationAddress, + Protocol: conn.TupleOrig.Proto.Protocol, + SourcePort: conn.TupleOrig.Proto.SourcePort, + DestinationPort: conn.TupleOrig.Proto.DestinationPort, + } + + tupleReply := flowexporter.Tuple{ + SourceAddress: conn.TupleReply.IP.SourceAddress, + DestinationAddress: conn.TupleReply.IP.DestinationAddress, + Protocol: conn.TupleReply.Proto.Protocol, + SourcePort: conn.TupleReply.Proto.SourcePort, + DestinationPort: conn.TupleReply.Proto.DestinationPort, + } + // Assign all the applicable fields + newConn := flowexporter.Connection{ + conn.ID, + conn.Timeout, + conn.Timestamp.Start, + conn.Timestamp.Stop, + conn.Zone, + uint32(conn.Status.Value), + tupleOrig, + tupleReply, + conn.CountersOrig.Packets, + conn.CountersOrig.Bytes, + conn.CountersReply.Packets, + conn.CountersReply.Bytes, + "", + "", + "", + "", + } + + return &newConn +} diff --git a/pkg/agent/flowexporter/connections/conntrack_test.go b/pkg/agent/flowexporter/connections/conntrack_test.go new file mode 100644 index 00000000000..83008ce4ab3 --- /dev/null +++ b/pkg/agent/flowexporter/connections/conntrack_test.go @@ -0,0 +1,110 @@ +// +build linux + +package connections + +import ( + "net" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/ti-mo/conntrack" + + "github.com/vmware-tanzu/antrea/pkg/agent/config" + connectionstest "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/connections/testing" + "github.com/vmware-tanzu/antrea/pkg/agent/openflow" +) + +var ( + tuple3 = conntrack.Tuple{ + IP: conntrack.IPTuple{ + SourceAddress: net.IP{1, 2, 3, 4}, + DestinationAddress: net.IP{4, 3, 2, 1}, + }, + Proto: conntrack.ProtoTuple{ + Protocol: 6, + SourcePort: 65280, + DestinationPort: 255, + }, + } + revTuple3 = conntrack.Tuple{ + IP: conntrack.IPTuple{ + SourceAddress: net.IP{4, 3, 2, 1}, + DestinationAddress: net.IP{1, 2, 3, 4}, + }, + Proto: conntrack.ProtoTuple{ + Protocol: 6, + SourcePort: 255, + DestinationPort: 65280, + }, + } + tuple4 = conntrack.Tuple{ + IP: conntrack.IPTuple{ + SourceAddress: net.IP{5, 6, 7, 8}, + DestinationAddress: net.IP{8, 7, 6, 5}, + }, + Proto: conntrack.ProtoTuple{ + Protocol: 6, + SourcePort: 60001, + DestinationPort: 200, + }, + } + revTuple4 = conntrack.Tuple{ + IP: conntrack.IPTuple{ + SourceAddress: net.IP{8, 7, 6, 5}, + DestinationAddress: net.IP{5, 6, 7, 8}, + }, + Proto: conntrack.ProtoTuple{ + Protocol: 6, + SourcePort: 200, + DestinationPort: 60001, + }, + } +) + +func TestConnTrack_DumpFilter(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + // Create flows to test + antreaFlow := conntrack.Flow{ + TupleOrig: tuple3, + TupleReply: revTuple3, + Zone: openflow.CtZone, + } + + antreaGWFlow := conntrack.Flow{ + TupleOrig: tuple4, + TupleReply: revTuple4, + Zone: openflow.CtZone, + } + nonAntreaFlow := conntrack.Flow{ + TupleOrig: tuple4, + TupleReply: revTuple4, + Zone: 100, + } + + testFlows := []conntrack.Flow{antreaFlow, antreaGWFlow, nonAntreaFlow} + + // Create mock ConnTrack interface + mockCT := connectionstest.NewMockConnTrack(ctrl) + + // Create nodeConfig and gateWayConfig + // Set antreaGWFlow.TupleOrig.IP.DestinationAddress as gateway IP + gwConfig := &config.GatewayConfig{ + IP: net.IP{8, 7, 6, 5}, + } + nodeConfig := &config.NodeConfig{ + GatewayConfig: gwConfig, + } + // set expects for mocks + mockCT.EXPECT().Dial().Return(nil) + mockCT.EXPECT().DumpFilter(conntrack.Filter{}).Return(testFlows, nil) + + connTrackPoller := NewConnTrackPoller(nodeConfig, mockCT) + + conns, err := connTrackPoller.DumpFlows(openflow.CtZone) + if err != nil { + t.Errorf("Dump filter function returned error: %v", err) + } + assert.Equal(t, 1, len(conns), "number of filtered connections should be equal") +} diff --git a/pkg/agent/flowexporter/connections/conntrack_windows.go b/pkg/agent/flowexporter/connections/conntrack_windows.go new file mode 100644 index 00000000000..2714783a1c4 --- /dev/null +++ b/pkg/agent/flowexporter/connections/conntrack_windows.go @@ -0,0 +1,37 @@ +// +build windows + +package connections + +import ( + "github.com/vmware-tanzu/antrea/pkg/agent/config" + "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter" +) + +var _ ConnTrackPoller = new(connTrackPoller) + +type connTrackPoller struct { + nodeConfig *config.NodeConfig + conntrack ConnTrack +} + +func NewConnTrackPoller(nodeConfig *config.NodeConfig, conntrack ConnTrack) *connTrackPoller { + return &connTrackPoller{ + nodeConfig, + conntrack, + } +} + +// TODO: These will be defined when polling from ovs-dpctl dump conntrack is supported +var _ ConnTrack = new(connTrack) + +type ConnTrack interface{} + +type connTrack struct{} + +func NewConnTrack() *connTrack { + return &connTrack{} +} + +func (c *connTrackPoller) DumpFlows(zoneFilter uint16) ([]*flowexporter.Connection, error) { + return nil, nil +} diff --git a/pkg/agent/flowexporter/connections/interface.go b/pkg/agent/flowexporter/connections/interface.go new file mode 100644 index 00000000000..472d3ffd459 --- /dev/null +++ b/pkg/agent/flowexporter/connections/interface.go @@ -0,0 +1,11 @@ +package connections + +import ( + "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter" +) + +// ConnTrackPoller is an interface that is used to poll and dump connections from +// conntrack module. +type ConnTrackPoller interface { + DumpFlows(zoneFilter uint16) ([]*flowexporter.Connection, error) +} diff --git a/pkg/agent/flowexporter/connections/testing/mock_connections.go b/pkg/agent/flowexporter/connections/testing/mock_connections.go new file mode 100644 index 00000000000..5a01b771f41 --- /dev/null +++ b/pkg/agent/flowexporter/connections/testing/mock_connections.go @@ -0,0 +1,117 @@ +// Copyright 2020 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/connections (interfaces: ConnTrackPoller,ConnTrack) + +// Package testing is a generated GoMock package. +package testing + +import ( + gomock "github.com/golang/mock/gomock" + conntrack "github.com/ti-mo/conntrack" + flowexporter "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter" + reflect "reflect" +) + +// MockConnTrackPoller is a mock of ConnTrackPoller interface +type MockConnTrackPoller struct { + ctrl *gomock.Controller + recorder *MockConnTrackPollerMockRecorder +} + +// MockConnTrackPollerMockRecorder is the mock recorder for MockConnTrackPoller +type MockConnTrackPollerMockRecorder struct { + mock *MockConnTrackPoller +} + +// NewMockConnTrackPoller creates a new mock instance +func NewMockConnTrackPoller(ctrl *gomock.Controller) *MockConnTrackPoller { + mock := &MockConnTrackPoller{ctrl: ctrl} + mock.recorder = &MockConnTrackPollerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockConnTrackPoller) EXPECT() *MockConnTrackPollerMockRecorder { + return m.recorder +} + +// DumpFlows mocks base method +func (m *MockConnTrackPoller) DumpFlows(arg0 uint16) ([]*flowexporter.Connection, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DumpFlows", arg0) + ret0, _ := ret[0].([]*flowexporter.Connection) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DumpFlows indicates an expected call of DumpFlows +func (mr *MockConnTrackPollerMockRecorder) DumpFlows(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DumpFlows", reflect.TypeOf((*MockConnTrackPoller)(nil).DumpFlows), arg0) +} + +// MockConnTrack is a mock of ConnTrack interface +type MockConnTrack struct { + ctrl *gomock.Controller + recorder *MockConnTrackMockRecorder +} + +// MockConnTrackMockRecorder is the mock recorder for MockConnTrack +type MockConnTrackMockRecorder struct { + mock *MockConnTrack +} + +// NewMockConnTrack creates a new mock instance +func NewMockConnTrack(ctrl *gomock.Controller) *MockConnTrack { + mock := &MockConnTrack{ctrl: ctrl} + mock.recorder = &MockConnTrackMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockConnTrack) EXPECT() *MockConnTrackMockRecorder { + return m.recorder +} + +// Dial mocks base method +func (m *MockConnTrack) Dial() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Dial") + ret0, _ := ret[0].(error) + return ret0 +} + +// Dial indicates an expected call of Dial +func (mr *MockConnTrackMockRecorder) Dial() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Dial", reflect.TypeOf((*MockConnTrack)(nil).Dial)) +} + +// DumpFilter mocks base method +func (m *MockConnTrack) DumpFilter(arg0 conntrack.Filter) ([]conntrack.Flow, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DumpFilter", arg0) + ret0, _ := ret[0].([]conntrack.Flow) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DumpFilter indicates an expected call of DumpFilter +func (mr *MockConnTrackMockRecorder) DumpFilter(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DumpFilter", reflect.TypeOf((*MockConnTrack)(nil).DumpFilter), arg0) +} diff --git a/pkg/agent/flowexporter/types.go b/pkg/agent/flowexporter/types.go new file mode 100644 index 00000000000..1f84b7c4315 --- /dev/null +++ b/pkg/agent/flowexporter/types.go @@ -0,0 +1,38 @@ +package flowexporter + +import ( + "net" + "time" +) + +const ( + PollInterval = 5 * time.Second +) + +type ConnectionKey [5]string + +type Tuple struct { + SourceAddress net.IP + DestinationAddress net.IP + Protocol uint8 + SourcePort uint16 + DestinationPort uint16 +} + +type Connection struct { + // Fields from conntrack flows + ID uint32 + Timeout uint32 + StartTime time.Time + StopTime time.Time + Zone uint16 + StatusFlag uint32 + TupleOrig, TupleReply Tuple + OriginalPackets, OriginalBytes uint64 + ReversePackets, ReverseBytes uint64 + // Fields specific to Antrea + SourcePodNamespace string + SourcePodName string + DestinationPodNamespace string + DestinationPodName string +} diff --git a/pkg/agent/flowexporter/utils.go b/pkg/agent/flowexporter/utils.go new file mode 100644 index 00000000000..7cf99715c8e --- /dev/null +++ b/pkg/agent/flowexporter/utils.go @@ -0,0 +1,13 @@ +package flowexporter + +import "strconv" + +// NewConnectionKey creates 5-tuple of flow as connection key +func NewConnectionKey(conn *Connection) ConnectionKey { + return ConnectionKey{conn.TupleOrig.SourceAddress.String(), + strconv.FormatUint(uint64(conn.TupleOrig.SourcePort), 10), + conn.TupleReply.SourceAddress.String(), + strconv.FormatUint(uint64(conn.TupleReply.SourcePort), 10), + strconv.FormatUint(uint64(conn.TupleOrig.Protocol), 10), + } +} diff --git a/pkg/agent/interfacestore/interface_cache.go b/pkg/agent/interfacestore/interface_cache.go index 64d399ff1a2..4be6f2dd6ee 100644 --- a/pkg/agent/interfacestore/interface_cache.go +++ b/pkg/agent/interfacestore/interface_cache.go @@ -43,6 +43,8 @@ import ( type interfaceCache struct { sync.RWMutex cache map[string]*InterfaceConfig + // Interfaces without IP are not stored in ipCache + ipCache map[string]*InterfaceConfig } func (c *interfaceCache) Initialize(interfaces []*InterfaceConfig) { @@ -52,6 +54,9 @@ func (c *interfaceCache) Initialize(interfaces []*InterfaceConfig) { if intf.Type == ContainerInterface { metrics.PodCount.Inc() } + if intf.IP != nil { + c.ipCache[intf.IP.String()] = intf + } } } @@ -76,9 +81,14 @@ func (c *interfaceCache) AddInterface(interfaceConfig *InterfaceConfig) { c.Lock() defer c.Unlock() c.cache[key] = interfaceConfig + if interfaceConfig.Type == ContainerInterface { metrics.PodCount.Inc() } + + if interfaceConfig.IP != nil { + c.ipCache[interfaceConfig.IP.String()] = interfaceConfig + } } // DeleteInterface deletes interface from local cache. @@ -86,10 +96,14 @@ func (c *interfaceCache) DeleteInterface(interfaceConfig *InterfaceConfig) { key := getInterfaceKey(interfaceConfig) c.Lock() defer c.Unlock() + delete(c.cache, key) + if interfaceConfig.Type == ContainerInterface { metrics.PodCount.Dec() } + + delete(c.ipCache, interfaceConfig.IP.String()) } // GetInterface retrieves interface from local cache given the interface key. @@ -113,6 +127,14 @@ func (c *interfaceCache) GetInterfaceByName(interfaceName string) (*InterfaceCon return nil, false } +// GetInterfaceByIP retrieves interface from local cache given the interface IP. +func (c *interfaceCache) GetInterfaceByIP(interfaceIP string) (*InterfaceConfig, bool) { + c.RLock() + defer c.RUnlock() + iface, found := c.ipCache[interfaceIP] + return iface, found +} + func (c *interfaceCache) GetContainerInterfaceNum() int { num := 0 c.RLock() @@ -185,5 +207,8 @@ func (c *interfaceCache) GetNodeTunnelInterface(nodeName string) (*InterfaceConf } func NewInterfaceStore() InterfaceStore { - return &interfaceCache{cache: map[string]*InterfaceConfig{}} + return &interfaceCache{ + cache: map[string]*InterfaceConfig{}, + ipCache: map[string]*InterfaceConfig{}, + } } diff --git a/pkg/agent/interfacestore/testing/mock_interfacestore.go b/pkg/agent/interfacestore/testing/mock_interfacestore.go index 73f6211f320..64da276c3cb 100644 --- a/pkg/agent/interfacestore/testing/mock_interfacestore.go +++ b/pkg/agent/interfacestore/testing/mock_interfacestore.go @@ -116,6 +116,21 @@ func (mr *MockInterfaceStoreMockRecorder) GetInterface(arg0 interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetInterface", reflect.TypeOf((*MockInterfaceStore)(nil).GetInterface), arg0) } +// GetInterfaceByIP mocks base method +func (m *MockInterfaceStore) GetInterfaceByIP(arg0 string) (*interfacestore.InterfaceConfig, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetInterfaceByIP", arg0) + ret0, _ := ret[0].(*interfacestore.InterfaceConfig) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// GetInterfaceByIP indicates an expected call of GetInterfaceByIP +func (mr *MockInterfaceStoreMockRecorder) GetInterfaceByIP(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetInterfaceByIP", reflect.TypeOf((*MockInterfaceStore)(nil).GetInterfaceByIP), arg0) +} + // GetInterfaceByName mocks base method func (m *MockInterfaceStore) GetInterfaceByName(arg0 string) (*interfacestore.InterfaceConfig, bool) { m.ctrl.T.Helper() diff --git a/pkg/agent/interfacestore/types.go b/pkg/agent/interfacestore/types.go index 8ec67148c8b..e098e4717f4 100644 --- a/pkg/agent/interfacestore/types.go +++ b/pkg/agent/interfacestore/types.go @@ -74,6 +74,7 @@ type InterfaceStore interface { DeleteInterface(interfaceConfig *InterfaceConfig) GetInterface(interfaceKey string) (*InterfaceConfig, bool) GetInterfaceByName(interfaceName string) (*InterfaceConfig, bool) + GetInterfaceByIP(interfaceIP string) (*InterfaceConfig, bool) GetContainerInterface(podName string, podNamespace string) (*InterfaceConfig, bool) GetNodeTunnelInterface(nodeName string) (*InterfaceConfig, bool) GetContainerInterfaceNum() int diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index f8ce64016f0..d57bd904759 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -124,7 +124,7 @@ const ( portCacheReg regType = 1 swapReg regType = 2 - ctZone = 0xfff0 + CtZone = 0xfff0 portFoundMark = 0x1 snatRequiredMark = 0x1 @@ -303,7 +303,7 @@ func (c *client) connectionTrackFlows(category cookie.Category) (flows []binding connectionTrackCommitTable := c.pipeline[conntrackCommitTable] flows = []binding.Flow{ connectionTrackTable.BuildFlow(priorityNormal).MatchProtocol(binding.ProtocolIP). - Action().CT(false, connectionTrackTable.GetNext(), ctZone).CTDone(). + Action().CT(false, connectionTrackTable.GetNext(), CtZone).CTDone(). Cookie(c.cookieAllocator.Request(category).Raw()). Done(), connectionTrackStateTable.BuildFlow(priorityHigh).MatchProtocol(binding.ProtocolIP). @@ -321,12 +321,12 @@ func (c *client) connectionTrackFlows(category cookie.Category) (flows []binding connectionTrackCommitTable.BuildFlow(priorityNormal).MatchProtocol(binding.ProtocolIP). MatchRegRange(int(marksReg), markTrafficFromGateway, binding.Range{0, 15}). MatchCTStateNew(true).MatchCTStateTrk(true). - Action().CT(true, connectionTrackCommitTable.GetNext(), ctZone).LoadToMark(gatewayCTMark).CTDone(). + Action().CT(true, connectionTrackCommitTable.GetNext(), CtZone).LoadToMark(gatewayCTMark).CTDone(). Cookie(c.cookieAllocator.Request(category).Raw()). Done(), connectionTrackCommitTable.BuildFlow(priorityLow).MatchProtocol(binding.ProtocolIP). MatchCTStateNew(true).MatchCTStateTrk(true). - Action().CT(true, connectionTrackCommitTable.GetNext(), ctZone).CTDone(). + Action().CT(true, connectionTrackCommitTable.GetNext(), CtZone).CTDone(). Cookie(c.cookieAllocator.Request(category).Raw()). Done(), } @@ -732,7 +732,7 @@ func (c *client) bridgeAndUplinkFlows(uplinkOfport uint32, bridgeLocalPort uint3 // Enforce IP packet into the conntrack zone with SNAT. If the connection is SNATed, the reply packet should use // Pod IP as the destination, and then is forwarded to conntrackStateTable. c.pipeline[conntrackTable].BuildFlow(priorityNormal).MatchProtocol(binding.ProtocolIP). - Action().CT(false, conntrackStateTable, ctZone).NAT().CTDone(). + Action().CT(false, conntrackStateTable, CtZone).NAT().CTDone(). Cookie(c.cookieAllocator.Request(category).Raw()). Done(), // Rewrite dMAC with the global vMAC if the packet is a reply to the Pod from the external address. @@ -767,7 +767,7 @@ func (c *client) bridgeAndUplinkFlows(uplinkOfport uint32, bridgeLocalPort uint3 MatchProtocol(binding.ProtocolIP). MatchCTStateNew(true).MatchCTStateTrk(true). MatchRegRange(int(marksReg), snatRequiredMark, snatMarkRange). - Action().CT(true, l2ForwardingOutTable, ctZone). + Action().CT(true, l2ForwardingOutTable, CtZone). SNAT(snatIPRange, nil). LoadToMark(snatCTMark).CTDone(). Cookie(c.cookieAllocator.Request(category).Raw()).