Skip to content

Commit

Permalink
UT
Browse files Browse the repository at this point in the history
Signed-off-by: graysonwu <wgrayson@vmware.com>
  • Loading branch information
GraysonWu committed Sep 12, 2023
1 parent 79a988d commit 36eb7ea
Show file tree
Hide file tree
Showing 15 changed files with 515 additions and 72 deletions.
13 changes: 13 additions & 0 deletions docs/egress.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ spec:
role: web
egressIP: 10.10.0.8 # can be populated by Antrea after assigning an IP from the pool below
externalIPPool: prod-external-ip-pool
rateLimit:
peak: 300
burst: 500
status:
egressNode: node01
```
Expand Down Expand Up @@ -127,6 +130,16 @@ The `externalIPPool` field specifies the name of the `ExternalIPPool` that the
be assigned to. It can be empty, which means users should assign the `egressIP`
to one Node manually.

### RateLimit

The `rateLimit` field specifies north-south egress traffic of the Egress. `peak`
specifies the maximum mbps. `burst` specifies maximum burst mbps for throttle.
All backend workloads selected by a rate-limited Egress share the same bandwidth
while sending egress traffic via this Egress.

**Note**: An Egress with `rateLimit` specified cannot share EgressIP with any
other Egresses.

## The ExternalIPPool resource

ExternalIPPool defines one or multiple IP ranges that can be used in the
Expand Down
75 changes: 59 additions & 16 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package egress
import (
"context"
"fmt"
"k8s.io/klog/v2"
"net"
"reflect"
"strings"
Expand All @@ -35,6 +34,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent"
"antrea.io/antrea/pkg/agent/interfacestore"
Expand Down Expand Up @@ -89,6 +89,8 @@ type egressState struct {
ofPorts sets.Set[int32]
// The actual Pods of the Egress. Used to identify stale Pods when updating or deleting an Egress.
pods sets.Set[string]
// Whether the Egress has rate-limit.
hasRateLimit bool
}

// egressIPState keeps the actual state of an Egress IP. It's maintained separately from egressState because
Expand Down Expand Up @@ -147,6 +149,9 @@ type EgressController struct {
ipAssigner ipassigner.IPAssigner

egressIPScheduler *egressIPScheduler

meterTracker map[uint32]*crdv1b1.RateLimit
meterTrackerMutex sync.RWMutex
}

func NewEgressController(
Expand Down Expand Up @@ -181,6 +186,7 @@ func NewEgressController(
localIPDetector: ipassigner.NewLocalIPDetector(),
idAllocator: newIDAllocator(minEgressMark, maxEgressMark),
cluster: cluster,
meterTracker: map[uint32]*crdv1b1.RateLimit{},
}
ipAssigner, err := newIPAssigner(nodeTransportInterface, egressDummyDevice)
if err != nil {
Expand Down Expand Up @@ -388,7 +394,7 @@ func (c *EgressController) processNextWorkItem() bool {
// and iptables rule for this IP and the mark.
// If the Egress IP is changed from local to non local, it uninstalls flows and iptables rule and releases the mark.
// The method returns the mark on success. Non local Egresses use 0 as the mark.
func (c *EgressController) realizeEgressIP(egressName, egressIP string, rateLimit *crdv1b1.RateLimit) (uint32, error) {
func (c *EgressController) realizeEgressIP(egressName, egressIP string, rateLimit *crdv1b1.RateLimit, rateLimitChanged bool) (uint32, error) {
isLocalIP := c.localIPDetector.IsLocalIP(egressIP)

c.egressIPStatesMutex.Lock()
Expand Down Expand Up @@ -416,13 +422,12 @@ func (c *EgressController) realizeEgressIP(egressName, egressIP string, rateLimi
}
}
if rateLimit != nil {
klog.Infof("installing egress qos meter: %s", ipState.mark)
if err := c.ofClient.InstallEgressQoSMeter(ipState.mark, rateLimit.Peak, rateLimit.Burst); err != nil {
if err := c.installEgressQoSMeter(ipState.mark, rateLimit); err != nil {
return 0, err
}
}
// Ensure datapath is installed properly.
if !ipState.flowsInstalled {
if rateLimitChanged || !ipState.flowsInstalled {
if err := c.ofClient.InstallSNATMarkFlows(ipState.egressIP, ipState.mark); err != nil {
return 0, fmt.Errorf("error installing SNAT mark flows for IP %s: %v", ipState.egressIP, err)
}
Expand Down Expand Up @@ -496,6 +501,37 @@ func (c *EgressController) unrealizeEgressIP(egressName, egressIP string) error
return nil
}

func (c *EgressController) uninstallEgressQoSMeter(meterID uint32) error {
if meterID == 0 {
return nil
}
c.meterTrackerMutex.Lock()
defer c.meterTrackerMutex.Unlock()
if _, exist := c.meterTracker[meterID]; exist {
if err := c.ofClient.UninstallEgressQoSMeter(meterID); err != nil {
return err
}
delete(c.meterTracker, meterID)
}
return nil
}

func (c *EgressController) installEgressQoSMeter(meterID uint32, rateLimit *crdv1b1.RateLimit) error {
if rateLimit == nil {
return nil
}
c.meterTrackerMutex.Lock()
defer c.meterTrackerMutex.Unlock()
meter, exist := c.meterTracker[meterID]
if !exist || meter.Peak != rateLimit.Peak || meter.Burst != rateLimit.Burst {
if err := c.ofClient.InstallEgressQoSMeter(meterID, rateLimit.Peak, rateLimit.Burst); err != nil {
return err
}
c.meterTracker[meterID] = rateLimit
}
return nil
}

func (c *EgressController) getEgressState(egressName string) (*egressState, bool) {
c.egressStatesMutex.RLock()
defer c.egressStatesMutex.RUnlock()
Expand Down Expand Up @@ -677,25 +713,23 @@ func (c *EgressController) syncEgress(egressName string) error {
}
}

// rateLimitChanged will be true if this Egress update its rateLimit or this is a new Egress.
rateLimitChanged := eState.hasRateLimit != (egress.Spec.RateLimit != nil)
// Realize the latest EgressIP and get the desired mark.
mark, err := c.realizeEgressIP(egressName, desiredEgressIP, egress.Spec.RateLimit)
mark, err := c.realizeEgressIP(egressName, desiredEgressIP, egress.Spec.RateLimit, rateLimitChanged)
if err != nil {
return err
}

// If the mark changes, uninstall all of the Egress's Pod flows first, then installs them with new mark.
// It could happen when the Egress IP is added to or removed from the Node.
// It could happen when the Egress IP is added to or removed from the Node. And record the old mark for
// meter deletion later.
oldMark := eState.mark
if eState.mark != mark {
// Uninstall all of its Pod flows.
if err := c.uninstallPodFlows(egressName, eState, eState.ofPorts, eState.pods); err != nil {
return err
}
if egress.Spec.RateLimit != nil {
// Uninstall its meter and QoS flow.
if err := c.ofClient.UninstallEgressQoSMeter(eState.mark); err != nil {
return err
}
}
eState.mark = mark
}

Expand Down Expand Up @@ -741,7 +775,9 @@ func (c *EgressController) syncEgress(egressName string) error {
ofPort := ifaces[0].OFPort
if eState.ofPorts.Has(ofPort) {
staleOFPorts.Delete(ofPort)
continue
if !rateLimitChanged {
continue
}
}
if err := c.ofClient.InstallPodSNATFlows(uint32(ofPort), egressIP, mark); err != nil {
return err
Expand All @@ -753,6 +789,13 @@ func (c *EgressController) syncEgress(egressName string) error {
if err := c.uninstallPodFlows(egressName, eState, staleOFPorts, stalePods); err != nil {
return err
}

if egress.Spec.RateLimit == nil || !c.localIPDetector.IsLocalIP(desiredEgressIP) {
if err := c.uninstallEgressQoSMeter(oldMark); err != nil {
return err
}
}
eState.hasRateLimit = egress.Spec.RateLimit != nil
return nil
}

Expand All @@ -765,8 +808,8 @@ func (c *EgressController) uninstallEgress(egressName string, eState *egressStat
if err := c.unrealizeEgressIP(egressName, eState.egressIP); err != nil {
return err
}
// Uninstall Egress QoS OF Meter.
if err := c.ofClient.UninstallEgressQoSMeter(eState.mark); err != nil {
// Uninstall its meter.
if err := c.uninstallEgressQoSMeter(eState.mark); err != nil {
return err
}
// Unassign the Egress IP from the local Node if it was assigned by the agent.
Expand Down
Loading

0 comments on commit 36eb7ea

Please sign in to comment.