Skip to content

Commit

Permalink
Refactor for egress package (#2824)
Browse files Browse the repository at this point in the history
Decouple ExternalIPPool controller and MemberList controller from Egress controller.

Signed-off-by: Xu Liu <xliu2@vmware.com>
  • Loading branch information
xliuxu authored Dec 7, 2021
1 parent 3fed840 commit b87b76c
Show file tree
Hide file tree
Showing 24 changed files with 1,623 additions and 519 deletions.
23 changes: 22 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/flowexporter/exporter"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/ipassigner"
"antrea.io/antrea/pkg/agent/memberlist"
"antrea.io/antrea/pkg/agent/metrics"
npl "antrea.io/antrea/pkg/agent/nodeportlocal"
"antrea.io/antrea/pkg/agent/openflow"
Expand All @@ -45,6 +47,7 @@ import (
"antrea.io/antrea/pkg/agent/stats"
"antrea.io/antrea/pkg/agent/types"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions"
"antrea.io/antrea/pkg/controller/externalippool"
"antrea.io/antrea/pkg/features"
"antrea.io/antrea/pkg/log"
"antrea.io/antrea/pkg/monitor"
Expand Down Expand Up @@ -291,10 +294,25 @@ func run(o *Options) error {
} else {
return fmt.Errorf("invalid Node Transport IPAddr in Node config: %v", nodeConfig)
}

var externalIPPoolController *externalippool.ExternalIPPoolController
var memberlistCluster *memberlist.Cluster
var localIPDetector ipassigner.LocalIPDetector

if features.DefaultFeatureGate.Enabled(features.Egress) {
externalIPPoolController = externalippool.NewExternalIPPoolController(
crdClient, externalIPPoolInformer,
)
localIPDetector = ipassigner.NewLocalIPDetector()
memberlistCluster, err = memberlist.NewCluster(o.config.ClusterMembershipPort,
nodeConfig.Name, nodeInformer, externalIPPoolInformer,
)
if err != nil {
return fmt.Errorf("error creating new MemberList cluster: %v", err)
}
egressController, err = egress.NewEgressController(
ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeTransportIP,
o.config.ClusterMembershipPort, egressInformer, nodeInformer, externalIPPoolInformer,
memberlistCluster, egressInformer, nodeInformer, localIPDetector,
)
if err != nil {
return fmt.Errorf("error creating new Egress controller: %v", err)
Expand Down Expand Up @@ -426,6 +444,9 @@ func run(o *Options) error {
go networkPolicyController.Run(stopCh)

if features.DefaultFeatureGate.Enabled(features.Egress) {
go externalIPPoolController.Run(stopCh)
go localIPDetector.Run(stopCh)
go memberlistCluster.Run(stopCh)
go egressController.Run(stopCh)
}

Expand Down
9 changes: 8 additions & 1 deletion cmd/antrea-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"antrea.io/antrea/pkg/controller/crdmirroring/crdhandler"
"antrea.io/antrea/pkg/controller/egress"
egressstore "antrea.io/antrea/pkg/controller/egress/store"
"antrea.io/antrea/pkg/controller/externalippool"
"antrea.io/antrea/pkg/controller/grouping"
"antrea.io/antrea/pkg/controller/metrics"
"antrea.io/antrea/pkg/controller/networkpolicy"
Expand Down Expand Up @@ -232,8 +233,12 @@ func run(o *Options) error {
controllerMonitor := monitor.NewControllerMonitor(crdClient, legacyCRDClient, nodeInformer, controllerQuerier)

var egressController *egress.EgressController
var externalIPPoolController *externalippool.ExternalIPPoolController
if features.DefaultFeatureGate.Enabled(features.Egress) {
egressController = egress.NewEgressController(crdClient, groupEntityIndex, egressInformer, externalIPPoolInformer, egressGroupStore)
externalIPPoolController = externalippool.NewExternalIPPoolController(
crdClient, externalIPPoolInformer,
)
egressController = egress.NewEgressController(crdClient, groupEntityIndex, egressInformer, externalIPPoolController, egressGroupStore)
}

var traceflowController *traceflow.Controller
Expand Down Expand Up @@ -368,7 +373,9 @@ func run(o *Options) error {
go eeMirroringController.Run(stopCh)
}
}

if features.DefaultFeatureGate.Enabled(features.Egress) {
go externalIPPoolController.Run(stopCh)
go egressController.Run(stopCh)
}

Expand Down
2 changes: 1 addition & 1 deletion hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ MOCKGEN_TARGETS=(
"pkg/agent/proxy Proxier testing"
"pkg/agent/querier AgentQuerier testing"
"pkg/agent/route Interface testing"
"pkg/agent/controller/egress/ipassigner IPAssigner testing"
"pkg/agent/ipassigner IPAssigner testing"
"pkg/antctl AntctlClient ."
"pkg/controller/networkpolicy EndpointQuerier testing"
"pkg/controller/querier ControllerQuerier testing"
Expand Down
22 changes: 7 additions & 15 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ import (
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent"
"antrea.io/antrea/pkg/agent/controller/egress/ipassigner"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/ipassigner"
"antrea.io/antrea/pkg/agent/memberlist"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/route"
Expand Down Expand Up @@ -121,7 +121,7 @@ type EgressController struct {
queue workqueue.RateLimitingInterface

// Use an interface for IP detector to enable testing.
localIPDetector LocalIPDetector
localIPDetector ipassigner.LocalIPDetector
ifaceStore interfacestore.InterfaceStore
nodeName string
idAllocator *idAllocator
Expand Down Expand Up @@ -152,12 +152,11 @@ func NewEgressController(
routeClient route.Interface,
nodeName string,
nodeTransportIP net.IP,
clusterPort int,
cluster *memberlist.Cluster,
egressInformer crdinformers.EgressInformer,
nodeInformer coreinformers.NodeInformer,
externalIPPoolInformer crdinformers.ExternalIPPoolInformer,
localIPDetector ipassigner.LocalIPDetector,
) (*EgressController, error) {
localIPDetector := NewLocalIPDetector()
c := &EgressController{
ofClient: ofClient,
routeClient: routeClient,
Expand All @@ -175,19 +174,14 @@ func NewEgressController(
egressBindings: map[string]*egressBinding{},
localIPDetector: localIPDetector,
idAllocator: newIDAllocator(minEgressMark, maxEgressMark),
cluster: cluster,
}
ipAssigner, err := ipassigner.NewIPAssigner(nodeTransportIP, egressDummyDevice)
if err != nil {
return nil, fmt.Errorf("initializing egressIP assigner failed: %v", err)
}
c.ipAssigner = ipAssigner

cluster, err := memberlist.NewCluster(clusterPort, nodeName, nodeInformer, externalIPPoolInformer)
if err != nil {
return nil, fmt.Errorf("initializing memberlist cluster failed: %v", err)
}
c.cluster = cluster

c.egressInformer.AddIndexers(cache.Indexers{egressIPIndex: func(obj interface{}) ([]string, error) {
egress, ok := obj.(*crdv1a2.Egress)
if !ok {
Expand All @@ -214,7 +208,7 @@ func NewEgressController(
},
resyncPeriod,
)
localIPDetector.AddEventHandler(c.onLocalIPUpdate)
c.localIPDetector.AddEventHandler(c.onLocalIPUpdate)
c.cluster.AddClusterEventHandler(c.enqueueEgressesByExternalIPPool)
return c, nil
}
Expand Down Expand Up @@ -304,8 +298,6 @@ func (c *EgressController) Run(stopCh <-chan struct{}) {

c.removeStaleEgressIPs()

go c.cluster.Run(stopCh)

go wait.NonSlidingUntil(c.watchEgressGroup, 5*time.Second, stopCh)

for i := 0; i < defaultWorkers; i++ {
Expand Down Expand Up @@ -618,7 +610,7 @@ func (c *EgressController) syncEgress(egressName string) error {
eState = c.newEgressState(egressName, egress.Spec.EgressIP)
}

localNodeSelected, err := c.cluster.ShouldSelectEgress(egress)
localNodeSelected, err := c.cluster.ShouldSelectIP(egress.Spec.EgressIP, egress.Spec.ExternalIPPool)
if err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/agent/controller/egress/egress_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ import (
k8stesting "k8s.io/client-go/testing"
"k8s.io/client-go/util/workqueue"

ipassignertest "antrea.io/antrea/pkg/agent/controller/egress/ipassigner/testing"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/ipassigner"
ipassignertest "antrea.io/antrea/pkg/agent/ipassigner/testing"
openflowtest "antrea.io/antrea/pkg/agent/openflow/testing"
routetest "antrea.io/antrea/pkg/agent/route/testing"
"antrea.io/antrea/pkg/agent/util"
Expand Down Expand Up @@ -65,15 +66,15 @@ func (d *fakeLocalIPDetector) Run(stopCh <-chan struct{}) {
<-stopCh
}

func (d *fakeLocalIPDetector) AddEventHandler(handler eventHandler) {
func (d *fakeLocalIPDetector) AddEventHandler(handler ipassigner.LocalIPEventHandler) {
return
}

func (d *fakeLocalIPDetector) HasSynced() bool {
return true
}

var _ LocalIPDetector = &fakeLocalIPDetector{}
var _ ipassigner.LocalIPDetector = &fakeLocalIPDetector{}

type antreaClientGetter struct {
clientset versioned.Interface
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package egress
package ipassigner

type eventHandler func(ip string, added bool)
type LocalIPEventHandler func(ip string, added bool)

type LocalIPDetector interface {
IsLocalIP(ip string) bool
Expand All @@ -24,7 +24,7 @@ type LocalIPDetector interface {

// AddEventHandler registers an eventHandler of IP address update. It's not thread-safe and should be called before
// starting the detector.
AddEventHandler(handler eventHandler)
AddEventHandler(handler LocalIPEventHandler)

// HasSynced returns true if the cache has been initialized with the full lists of IP addresses.
HasSynced() bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package egress
package ipassigner

import (
"sync"
Expand All @@ -31,7 +31,7 @@ type localIPDetector struct {
mutex sync.RWMutex
localIPs sets.String
cacheSynced bool
eventHandlers []eventHandler
eventHandlers []LocalIPEventHandler
}

func NewLocalIPDetector() *localIPDetector {
Expand All @@ -51,7 +51,7 @@ func (d *localIPDetector) HasSynced() bool {
return d.cacheSynced
}

func (d *localIPDetector) AddEventHandler(handler eventHandler) {
func (d *localIPDetector) AddEventHandler(handler LocalIPEventHandler) {
d.eventHandlers = append(d.eventHandlers, handler)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package egress
package ipassigner

// Not implemented yet. The feature gate verification will protect this from being run.
type localIPDetector struct{}
Expand All @@ -25,7 +25,7 @@ func (d *localIPDetector) Run(stopCh <-chan struct{}) {
return
}

func (d *localIPDetector) AddEventHandler(handler eventHandler) {
func (d *localIPDetector) AddEventHandler(handler LocalIPEventHandler) {
return
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 13 additions & 10 deletions pkg/agent/memberlist/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,22 +458,25 @@ func (c *Cluster) aliveNodes() sets.String {
return nodes
}

// ShouldSelectEgress returns true if the local Node selected as the owner Node of the Egress,
// the local Node in the cluster holds the same consistent hash ring for each ExternalIPPool,
// consistentHash.Get gets the closest item (Node name) in the hash to the provided key(egressIP),
// if the name of the local Node is equal to the name of the selected Node, returns true.
func (c *Cluster) ShouldSelectEgress(egress *v1alpha2.Egress) (bool, error) {
eipName := egress.Spec.ExternalIPPool
if eipName == "" || egress.Spec.EgressIP == "" {
// ShouldSelectIP returns true if the local Node selected as the owner Node of the IP in the specific
// ExternalIPPool. The local Node in the cluster holds the same consistent hash ring for each ExternalIPPool,
// consistentHash.Get gets the closest item (Node name) in the hash to the provided key (IP), if the name of
// the local Node is equal to the name of the selected Node, returns true.
func (c *Cluster) ShouldSelectIP(ip, externalIPPool string) (bool, error) {
if externalIPPool == "" || ip == "" {
return false, nil
}
c.consistentHashRWMutex.RLock()
defer c.consistentHashRWMutex.RUnlock()
consistentHash, ok := c.consistentHashMap[eipName]
consistentHash, ok := c.consistentHashMap[externalIPPool]
if !ok {
return false, fmt.Errorf("local Node consistentHashMap has not synced, ExternalIPPool %s", eipName)
return false, fmt.Errorf("local Node consistentHashMap has not synced, ExternalIPPool %s", externalIPPool)
}
return consistentHash.Get(egress.Spec.EgressIP) == c.nodeName, nil
node := consistentHash.Get(ip)
if node == "" {
klog.Warningf("No valid Node chosen for IP %s in externalIPPool %s", ip, externalIPPool)
}
return node == c.nodeName, nil
}

func (c *Cluster) notify(objName string) {
Expand Down
15 changes: 7 additions & 8 deletions pkg/agent/memberlist/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func newFakeCluster(nodeConfig *config.NodeConfig, stopCh <-chan struct{}, i int
informerFactory := informers.NewSharedInformerFactory(clientset, 0)

nodeInformer := informerFactory.Core().V1().Nodes()

crdClient := fakeversioned.NewSimpleClientset([]runtime.Object{}...)
crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0)
ipPoolInformer := crdInformerFactory.Crd().V1alpha2().ExternalIPPools()
Expand Down Expand Up @@ -159,7 +158,7 @@ func TestCluster_Run(t *testing.T) {
}))

tCase.egress.Spec.ExternalIPPool = eip.Name
res, err := fakeCluster.cluster.ShouldSelectEgress(tCase.egress)
res, err := fakeCluster.cluster.ShouldSelectIP(tCase.egress.Spec.EgressIP, eip.Name)
// Cluster should hold the same consistent hash ring for each ExternalIPPool.
assert.NoError(t, err)
allMembers, err := fakeCluster.cluster.allClusterMembers()
Expand Down Expand Up @@ -259,7 +258,7 @@ func TestCluster_RunClusterEvents(t *testing.T) {
t.Run(tCase.name, func(t *testing.T) {
localNode.Labels = tCase.newNodeLabels
updateNode(localNode)
res, err := fakeCluster.cluster.ShouldSelectEgress(tCase.egress)
res, err := fakeCluster.cluster.ShouldSelectIP(tCase.egress.Spec.EgressIP, tCase.egress.Spec.ExternalIPPool)
assert.NoError(t, err)
assert.Equal(t, tCase.expectEgressSelectResult, res, "select Node for Egress result not match")
})
Expand Down Expand Up @@ -305,7 +304,7 @@ func TestCluster_RunClusterEvents(t *testing.T) {
newEIP, _ := fakeCluster.cluster.externalIPPoolLister.Get(fakeEIP1.Name)
return reflect.DeepEqual(fakeEIP1, newEIP), nil
}))
res, err := fakeCluster.cluster.ShouldSelectEgress(fakeEgress1)
res, err := fakeCluster.cluster.ShouldSelectIP(fakeEgress1.Spec.EgressIP, fakeEgress1.Spec.ExternalIPPool)
assert.NoError(t, err)
assert.Equal(t, tCase.expectEgressSelectResult, res, "select Node for Egress result not match")
})
Expand All @@ -332,7 +331,7 @@ func TestCluster_RunClusterEvents(t *testing.T) {
return reflect.DeepEqual(newEIP, fakeEIP2), nil
}))
assertEgressSelectResult := func(egress *crdv1a2.Egress, expectedRes bool, hasSyncedErr bool) {
res, err := fakeCluster.cluster.ShouldSelectEgress(egress)
res, err := fakeCluster.cluster.ShouldSelectIP(egress.Spec.EgressIP, egress.Spec.ExternalIPPool)
if !hasSyncedErr {
assert.NoError(t, err)
}
Expand Down Expand Up @@ -466,7 +465,7 @@ func TestCluster_ConsistentHashDistribute(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "fakeEgress"},
Spec: crdv1a2.EgressSpec{ExternalIPPool: fakeEIPName, EgressIP: fmt.Sprintf("10.1.1.%d", i)},
}
selected, err := fakeCluster.ShouldSelectEgress(fakeEgress)
selected, err := fakeCluster.ShouldSelectIP(fakeEgress.Spec.EgressIP, fakeEgress.Spec.ExternalIPPool)
assert.NoError(t, err)
if selected {
selectedNodes = append(selectedNodes, i)
Expand Down Expand Up @@ -535,7 +534,7 @@ func TestCluster_ShouldSelectEgress(t *testing.T) {
for i := 0; i < tCase.nodeNum; i++ {
node := fmt.Sprintf("node-%d", i)
fakeCluster.nodeName = node
selected, err := fakeCluster.ShouldSelectEgress(fakeEgress)
selected, err := fakeCluster.ShouldSelectIP(fakeEgress.Spec.EgressIP, fakeEgress.Spec.ExternalIPPool)
assert.NoError(t, err)
assert.Equal(t, node == tCase.expectedNode, selected, "Selected Node for Egress not match")
}
Expand Down Expand Up @@ -574,7 +573,7 @@ func BenchmarkCluster_ShouldSelect(b *testing.B) {
b.Run(fmt.Sprintf("%s-nodeSelectedForEgress", bc.name), func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
fakeCluster.ShouldSelectEgress(fakeEgress)
fakeCluster.ShouldSelectIP(fakeEgress.Spec.EgressIP, fakeEgress.Spec.ExternalIPPool)
}
})
}
Expand Down
Loading

0 comments on commit b87b76c

Please sign in to comment.