From 36eb7ea925799c42350ee4ead0009529fac369b9 Mon Sep 17 00:00:00 2001 From: graysonwu Date: Tue, 5 Sep 2023 23:47:41 -0700 Subject: [PATCH] UT Signed-off-by: graysonwu --- docs/egress.md | 13 ++ .../controller/egress/egress_controller.go | 75 +++++-- .../egress/egress_controller_test.go | 169 ++++++++++++++-- pkg/agent/openflow/client.go | 8 +- pkg/agent/openflow/network_policy_test.go | 20 +- pkg/agent/openflow/pipeline.go | 8 +- pkg/apis/crd/v1beta1/types.go | 2 +- pkg/controller/egress/controller_test.go | 3 +- pkg/controller/egress/validate.go | 8 +- pkg/controller/egress/validate_test.go | 80 +++++++- test/e2e/egress_test.go | 185 +++++++++++++++++- test/e2e/flowaggregator_test.go | 4 +- test/e2e/framework.go | 8 + test/e2e/proxy_test.go | 2 +- test/e2e/traceflow_test.go | 2 +- 15 files changed, 515 insertions(+), 72 deletions(-) diff --git a/docs/egress.md b/docs/egress.md index 447c9c42ba7..9d961d46cdb 100644 --- a/docs/egress.md +++ b/docs/egress.md @@ -79,6 +79,9 @@ spec: role: web egressIP: 10.10.0.8 # can be populated by Antrea after assigning an IP from the pool below externalIPPool: prod-external-ip-pool + rateLimit: + peak: 300 + burst: 500 status: egressNode: node01 ``` @@ -127,6 +130,16 @@ The `externalIPPool` field specifies the name of the `ExternalIPPool` that the be assigned to. It can be empty, which means users should assign the `egressIP` to one Node manually. +### RateLimit + +The `rateLimit` field specifies north-south egress traffic of the Egress. `peak` +specifies the maximum mbps. `burst` specifies maximum burst mbps for throttle. +All backend workloads selected by a rate-limited Egress share the same bandwidth +while sending egress traffic via this Egress. + +**Note**: An Egress with `rateLimit` specified cannot share EgressIP with any +other Egresses. + ## The ExternalIPPool resource ExternalIPPool defines one or multiple IP ranges that can be used in the diff --git a/pkg/agent/controller/egress/egress_controller.go b/pkg/agent/controller/egress/egress_controller.go index 5c54a7589af..a32129406e8 100644 --- a/pkg/agent/controller/egress/egress_controller.go +++ b/pkg/agent/controller/egress/egress_controller.go @@ -17,7 +17,6 @@ package egress import ( "context" "fmt" - "k8s.io/klog/v2" "net" "reflect" "strings" @@ -35,6 +34,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/retry" "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent" "antrea.io/antrea/pkg/agent/interfacestore" @@ -89,6 +89,8 @@ type egressState struct { ofPorts sets.Set[int32] // The actual Pods of the Egress. Used to identify stale Pods when updating or deleting an Egress. pods sets.Set[string] + // Whether the Egress has rate-limit. + hasRateLimit bool } // egressIPState keeps the actual state of an Egress IP. It's maintained separately from egressState because @@ -147,6 +149,9 @@ type EgressController struct { ipAssigner ipassigner.IPAssigner egressIPScheduler *egressIPScheduler + + meterTracker map[uint32]*crdv1b1.RateLimit + meterTrackerMutex sync.RWMutex } func NewEgressController( @@ -181,6 +186,7 @@ func NewEgressController( localIPDetector: ipassigner.NewLocalIPDetector(), idAllocator: newIDAllocator(minEgressMark, maxEgressMark), cluster: cluster, + meterTracker: map[uint32]*crdv1b1.RateLimit{}, } ipAssigner, err := newIPAssigner(nodeTransportInterface, egressDummyDevice) if err != nil { @@ -388,7 +394,7 @@ func (c *EgressController) processNextWorkItem() bool { // and iptables rule for this IP and the mark. // If the Egress IP is changed from local to non local, it uninstalls flows and iptables rule and releases the mark. // The method returns the mark on success. Non local Egresses use 0 as the mark. -func (c *EgressController) realizeEgressIP(egressName, egressIP string, rateLimit *crdv1b1.RateLimit) (uint32, error) { +func (c *EgressController) realizeEgressIP(egressName, egressIP string, rateLimit *crdv1b1.RateLimit, rateLimitChanged bool) (uint32, error) { isLocalIP := c.localIPDetector.IsLocalIP(egressIP) c.egressIPStatesMutex.Lock() @@ -416,13 +422,12 @@ func (c *EgressController) realizeEgressIP(egressName, egressIP string, rateLimi } } if rateLimit != nil { - klog.Infof("installing egress qos meter: %s", ipState.mark) - if err := c.ofClient.InstallEgressQoSMeter(ipState.mark, rateLimit.Peak, rateLimit.Burst); err != nil { + if err := c.installEgressQoSMeter(ipState.mark, rateLimit); err != nil { return 0, err } } // Ensure datapath is installed properly. - if !ipState.flowsInstalled { + if rateLimitChanged || !ipState.flowsInstalled { if err := c.ofClient.InstallSNATMarkFlows(ipState.egressIP, ipState.mark); err != nil { return 0, fmt.Errorf("error installing SNAT mark flows for IP %s: %v", ipState.egressIP, err) } @@ -496,6 +501,37 @@ func (c *EgressController) unrealizeEgressIP(egressName, egressIP string) error return nil } +func (c *EgressController) uninstallEgressQoSMeter(meterID uint32) error { + if meterID == 0 { + return nil + } + c.meterTrackerMutex.Lock() + defer c.meterTrackerMutex.Unlock() + if _, exist := c.meterTracker[meterID]; exist { + if err := c.ofClient.UninstallEgressQoSMeter(meterID); err != nil { + return err + } + delete(c.meterTracker, meterID) + } + return nil +} + +func (c *EgressController) installEgressQoSMeter(meterID uint32, rateLimit *crdv1b1.RateLimit) error { + if rateLimit == nil { + return nil + } + c.meterTrackerMutex.Lock() + defer c.meterTrackerMutex.Unlock() + meter, exist := c.meterTracker[meterID] + if !exist || meter.Peak != rateLimit.Peak || meter.Burst != rateLimit.Burst { + if err := c.ofClient.InstallEgressQoSMeter(meterID, rateLimit.Peak, rateLimit.Burst); err != nil { + return err + } + c.meterTracker[meterID] = rateLimit + } + return nil +} + func (c *EgressController) getEgressState(egressName string) (*egressState, bool) { c.egressStatesMutex.RLock() defer c.egressStatesMutex.RUnlock() @@ -677,25 +713,23 @@ func (c *EgressController) syncEgress(egressName string) error { } } + // rateLimitChanged will be true if this Egress update its rateLimit or this is a new Egress. + rateLimitChanged := eState.hasRateLimit != (egress.Spec.RateLimit != nil) // Realize the latest EgressIP and get the desired mark. - mark, err := c.realizeEgressIP(egressName, desiredEgressIP, egress.Spec.RateLimit) + mark, err := c.realizeEgressIP(egressName, desiredEgressIP, egress.Spec.RateLimit, rateLimitChanged) if err != nil { return err } // If the mark changes, uninstall all of the Egress's Pod flows first, then installs them with new mark. - // It could happen when the Egress IP is added to or removed from the Node. + // It could happen when the Egress IP is added to or removed from the Node. And record the old mark for + // meter deletion later. + oldMark := eState.mark if eState.mark != mark { // Uninstall all of its Pod flows. if err := c.uninstallPodFlows(egressName, eState, eState.ofPorts, eState.pods); err != nil { return err } - if egress.Spec.RateLimit != nil { - // Uninstall its meter and QoS flow. - if err := c.ofClient.UninstallEgressQoSMeter(eState.mark); err != nil { - return err - } - } eState.mark = mark } @@ -741,7 +775,9 @@ func (c *EgressController) syncEgress(egressName string) error { ofPort := ifaces[0].OFPort if eState.ofPorts.Has(ofPort) { staleOFPorts.Delete(ofPort) - continue + if !rateLimitChanged { + continue + } } if err := c.ofClient.InstallPodSNATFlows(uint32(ofPort), egressIP, mark); err != nil { return err @@ -753,6 +789,13 @@ func (c *EgressController) syncEgress(egressName string) error { if err := c.uninstallPodFlows(egressName, eState, staleOFPorts, stalePods); err != nil { return err } + + if egress.Spec.RateLimit == nil || !c.localIPDetector.IsLocalIP(desiredEgressIP) { + if err := c.uninstallEgressQoSMeter(oldMark); err != nil { + return err + } + } + eState.hasRateLimit = egress.Spec.RateLimit != nil return nil } @@ -765,8 +808,8 @@ func (c *EgressController) uninstallEgress(egressName string, eState *egressStat if err := c.unrealizeEgressIP(egressName, eState.egressIP); err != nil { return err } - // Uninstall Egress QoS OF Meter. - if err := c.ofClient.UninstallEgressQoSMeter(eState.mark); err != nil { + // Uninstall its meter. + if err := c.uninstallEgressQoSMeter(eState.mark); err != nil { return err } // Unassign the Egress IP from the local Node if it was assigned by the agent. diff --git a/pkg/agent/controller/egress/egress_controller_test.go b/pkg/agent/controller/egress/egress_controller_test.go index f956f280070..26e2712b15c 100644 --- a/pkg/agent/controller/egress/egress_controller_test.go +++ b/pkg/agent/controller/egress/egress_controller_test.go @@ -59,6 +59,17 @@ const ( fakeNode = "node1" ) +var ( + fakeRateLimit = crdv1b1.RateLimit{ + Peak: 500, + Burst: 1000, + } + newFakeRateLimit = crdv1b1.RateLimit{ + Peak: 200, + Burst: 400, + } +) + type fakeLocalIPDetector struct { localIPs sets.Set[string] } @@ -207,11 +218,11 @@ func TestSyncEgress(t *testing.T) { name: "Local IP becomes non local", existingEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, }, newEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, }, existingEgressGroup: &cpv1b2.EgressGroup{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, @@ -231,10 +242,11 @@ func TestSyncEgress(t *testing.T) { expectedEgresses: []*crdv1b1.Egress{ { ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, }, }, expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) { + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), fakeRateLimit.Peak, fakeRateLimit.Burst) mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(2), net.ParseIP(fakeLocalEgressIP1), uint32(1)) @@ -250,17 +262,18 @@ func TestSyncEgress(t *testing.T) { mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(0)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(3), net.ParseIP(fakeLocalEgressIP1), uint32(0)) mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1) + mockOFClient.EXPECT().UninstallEgressQoSMeter(uint32(1)) }, }, { name: "Non local IP becomes local", existingEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1, RateLimit: &fakeRateLimit}, }, newEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1, RateLimit: &fakeRateLimit}, }, existingEgressGroup: &cpv1b2.EgressGroup{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, @@ -280,7 +293,7 @@ func TestSyncEgress(t *testing.T) { expectedEgresses: []*crdv1b1.Egress{ { ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1, RateLimit: &fakeRateLimit}, Status: crdv1b1.EgressStatus{EgressIP: fakeRemoteEgressIP1, EgressNode: fakeNode}, }, }, @@ -293,6 +306,7 @@ func TestSyncEgress(t *testing.T) { mockOFClient.EXPECT().UninstallPodSNATFlows(uint32(2)) mockIPAssigner.EXPECT().UnassignIP(fakeRemoteEgressIP1) + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), fakeRateLimit.Peak, fakeRateLimit.Burst) mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeRemoteEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeRemoteEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(3), net.ParseIP(fakeRemoteEgressIP1), uint32(1)) @@ -304,11 +318,11 @@ func TestSyncEgress(t *testing.T) { name: "Change from local Egress IP to another local one", existingEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, }, newEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP2}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP2, RateLimit: &fakeRateLimit}, }, existingEgressGroup: &cpv1b2.EgressGroup{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, @@ -327,11 +341,12 @@ func TestSyncEgress(t *testing.T) { expectedEgresses: []*crdv1b1.Egress{ { ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP2}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP2, RateLimit: &fakeRateLimit}, Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP2, EgressNode: fakeNode}, }, }, expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) { + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), fakeRateLimit.Peak, fakeRateLimit.Burst) mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(2), net.ParseIP(fakeLocalEgressIP1), uint32(1)) @@ -345,22 +360,24 @@ func TestSyncEgress(t *testing.T) { mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1) mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP2) + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), fakeRateLimit.Peak, fakeRateLimit.Burst) mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP2), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP2), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(3), net.ParseIP(fakeLocalEgressIP2), uint32(1)) mockRouteClient.EXPECT().AddSNATRule(net.ParseIP(fakeLocalEgressIP2), uint32(1)) mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP2) + mockOFClient.EXPECT().UninstallEgressQoSMeter(uint32(1)) }, }, { name: "Change from local Egress IP to a remote one", existingEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, }, newEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1, RateLimit: &fakeRateLimit}, }, existingEgressGroup: &cpv1b2.EgressGroup{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, @@ -379,10 +396,11 @@ func TestSyncEgress(t *testing.T) { expectedEgresses: []*crdv1b1.Egress{ { ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1, RateLimit: &fakeRateLimit}, }, }, expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) { + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), fakeRateLimit.Peak, fakeRateLimit.Burst) mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(2), net.ParseIP(fakeLocalEgressIP1), uint32(1)) @@ -399,17 +417,18 @@ func TestSyncEgress(t *testing.T) { mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeRemoteEgressIP1), uint32(0)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(3), net.ParseIP(fakeRemoteEgressIP1), uint32(0)) mockIPAssigner.EXPECT().UnassignIP(fakeRemoteEgressIP1) + mockOFClient.EXPECT().UninstallEgressQoSMeter(uint32(1)) }, }, { name: "Change from remote Egress IP to a local one", existingEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1, RateLimit: &fakeRateLimit}, }, newEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, }, existingEgressGroup: &cpv1b2.EgressGroup{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, @@ -428,7 +447,7 @@ func TestSyncEgress(t *testing.T) { expectedEgresses: []*crdv1b1.Egress{ { ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP1, EgressNode: fakeNode}, }, }, @@ -442,6 +461,7 @@ func TestSyncEgress(t *testing.T) { mockIPAssigner.EXPECT().UnassignIP(fakeRemoteEgressIP1) mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1) + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), fakeRateLimit.Peak, fakeRateLimit.Burst) mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(3), net.ParseIP(fakeLocalEgressIP1), uint32(1)) @@ -688,6 +708,125 @@ func TestSyncEgress(t *testing.T) { mockRouteClient.EXPECT().DeleteSNATRule(uint32(1)) }, }, + { + name: "Update Egress from non-rate-limited to rate-limited", + existingEgress: &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + }, + newEgress: &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, + }, + existingEgressGroup: &cpv1b2.EgressGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + GroupMembers: []cpv1b2.GroupMember{ + {Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}}, + }, + }, + newEgressGroup: &cpv1b2.EgressGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + GroupMembers: []cpv1b2.GroupMember{ + {Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}}, + }, + }, + expectedEgresses: []*crdv1b1.Egress{ + { + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, + Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP1, EgressNode: fakeNode}, + }, + }, + expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) { + mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockRouteClient.EXPECT().AddSNATRule(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1).Times(3) + + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), fakeRateLimit.Peak, fakeRateLimit.Burst) + mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) + }, + }, + { + name: "Update Egress from rate-limited to non-rate-limited", + existingEgress: &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, + }, + newEgress: &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + }, + existingEgressGroup: &cpv1b2.EgressGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + GroupMembers: []cpv1b2.GroupMember{ + {Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}}, + }, + }, + newEgressGroup: &cpv1b2.EgressGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + GroupMembers: []cpv1b2.GroupMember{ + {Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}}, + }, + }, + expectedEgresses: []*crdv1b1.Egress{ + { + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP1, EgressNode: fakeNode}, + }, + }, + expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) { + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), fakeRateLimit.Peak, fakeRateLimit.Burst) + mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockRouteClient.EXPECT().AddSNATRule(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1).Times(3) + + mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockOFClient.EXPECT().UninstallEgressQoSMeter(uint32(1)) + }, + }, + { + name: "Update Egress rate-limited config", + existingEgress: &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, + }, + newEgress: &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &newFakeRateLimit}, + }, + existingEgressGroup: &cpv1b2.EgressGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + GroupMembers: []cpv1b2.GroupMember{ + {Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}}, + }, + }, + newEgressGroup: &cpv1b2.EgressGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + GroupMembers: []cpv1b2.GroupMember{ + {Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}}, + }, + }, + expectedEgresses: []*crdv1b1.Egress{ + { + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &newFakeRateLimit}, + Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP1, EgressNode: fakeNode}, + }, + }, + expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) { + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), fakeRateLimit.Peak, fakeRateLimit.Burst) + mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockRouteClient.EXPECT().AddSNATRule(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1).Times(3) + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), newFakeRateLimit.Peak, newFakeRateLimit.Burst) + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index a54eebf7901..8f62bafe3f1 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -805,7 +805,7 @@ func (c *client) initialize() error { if err := c.genOFMeter(PacketInMeterIDTF, ofctrl.MeterBurst|ofctrl.MeterPktps, PacketInMeterRateTF, 2*PacketInMeterRateTF).Add(); err != nil { return fmt.Errorf("failed to install OpenFlow meter entry (meterID:%d, rate:%d) for TraceFlow packet-in rate limiting: %v", PacketInMeterIDTF, PacketInMeterRateTF, err) } - if err := c.genPacketInMeter(PacketInMeterIDDNS, PacketInMeterRateDNS).Add(); err != nil { + if err := c.genOFMeter(PacketInMeterIDDNS, ofctrl.MeterBurst|ofctrl.MeterPktps, PacketInMeterRateDNS, 2*PacketInMeterRateDNS).Add(); err != nil { return fmt.Errorf("failed to install OpenFlow meter entry (meterID:%d, rate:%d) for DNS interception packet-in rate limiting: %v", PacketInMeterIDDNS, PacketInMeterRateDNS, err) } } @@ -1002,7 +1002,7 @@ func (c *client) InstallSNATMarkFlows(snatIP net.IP, mark uint32) error { cacheKey := fmt.Sprintf("s%x", mark) c.replayMutex.RLock() defer c.replayMutex.RUnlock() - return c.addFlows(c.featureEgress.cachedFlows, cacheKey, []binding.Flow{flow}) + return c.modifyFlows(c.featureEgress.cachedFlows, cacheKey, []binding.Flow{flow}) } func (c *client) UninstallSNATMarkFlows(mark uint32) error { @@ -1017,7 +1017,7 @@ func (c *client) InstallPodSNATFlows(ofPort uint32, snatIP net.IP, snatMark uint cacheKey := fmt.Sprintf("p%x", ofPort) c.replayMutex.RLock() defer c.replayMutex.RUnlock() - return c.addFlows(c.featureEgress.cachedFlows, cacheKey, flows) + return c.modifyFlows(c.featureEgress.cachedFlows, cacheKey, flows) } func (c *client) UninstallPodSNATFlows(ofPort uint32) error { @@ -1059,12 +1059,10 @@ func (c *client) UninstallEgressQoSMeter(meterID uint32) error { mCache, ok := c.featureEgress.cachedMeter.Load(meterID) if ok { meter := mCache.(binding.Meter) - klog.Infof("deleting meter %s ...", meterID) if err := meter.Delete(); err != nil { return fmt.Errorf("error when deleting Egress QoS OF Meter %d: %w", meterID, err) } c.featureEgress.cachedMeter.Delete(meterID) - klog.Infof("deleted meter") } return nil } diff --git a/pkg/agent/openflow/network_policy_test.go b/pkg/agent/openflow/network_policy_test.go index 833cd0edb9c..e5e15375e92 100644 --- a/pkg/agent/openflow/network_policy_test.go +++ b/pkg/agent/openflow/network_policy_test.go @@ -1373,13 +1373,13 @@ func networkPolicyInitFlows(ovsMeterSupported, externalNodeEnabled, l7NetworkPol } if ovsMeterSupported { loggingFlows = []string{ - "cookie=0x1020000000000, table=Output, priority=200,reg0=0x2400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.01,max_len=128)", - "cookie=0x1020000000000, table=Output, priority=200,reg0=0x4400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.02,max_len=128)", - "cookie=0x1020000000000, table=Output, priority=200,reg0=0x6400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.03,max_len=128)", - "cookie=0x1020000000000, table=Output, priority=200,reg0=0x8400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.04,max_len=128)", - "cookie=0x1020000000000, table=Output, priority=200,reg0=0xa400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.05,max_len=128)", - "cookie=0x1020000000000, table=Output, priority=200,reg0=0xc400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.06,max_len=128)", - "cookie=0x1020000000000, table=Output, priority=200,reg0=0xe400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.07,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0x2400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.01,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0x4400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.02,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0x6400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.03,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0x8400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.04,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0xa400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.05,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0xc400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.06,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0xe400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.07,max_len=128)", } } if externalNodeEnabled { @@ -1455,7 +1455,7 @@ func Test_NewDNSPacketInConjunction(t *testing.T) { } if ovsMetersSupported { ipv4ExpFlows = []string{ - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:3,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:258,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp,tp_src=53 actions=conjunction(1,1/2)", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp,tp_src=53 actions=conjunction(1,1/2)", } @@ -1467,7 +1467,7 @@ func Test_NewDNSPacketInConjunction(t *testing.T) { } if ovsMetersSupported { ipv6ExpFlows = []string{ - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:3,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:258,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp6,tp_src=53 actions=conjunction(1,1/2)", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp6,tp_src=53 actions=conjunction(1,1/2)", } @@ -1481,7 +1481,7 @@ func Test_NewDNSPacketInConjunction(t *testing.T) { } if ovsMetersSupported { dsExpFlows = []string{ - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:3,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:258,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp,tp_src=53 actions=conjunction(1,1/2)", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp,tp_src=53 actions=conjunction(1,1/2)", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp6,tp_src=53 actions=conjunction(1,1/2)", diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 829e74005ac..af41e063fca 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -2300,7 +2300,9 @@ func (f *featureEgress) snatIPFromTunnelFlow(snatIP net.IP, mark uint32) binding MatchCTStateTrk(true). MatchTunnelDst(snatIP) if f.ovsMetersAreSupported { - fb = fb.Action().Meter(mark) + if _, ok := f.cachedMeter.Load(mark); ok { + fb = fb.Action().Meter(mark) + } } return fb.Action().LoadPktMarkRange(mark, snatPktMarkRange). Action().LoadRegMark(ToGatewayRegMark). @@ -2323,7 +2325,9 @@ func (f *featureEgress) snatRuleFlow(ofPort uint32, snatIP net.IP, snatMark uint MatchCTStateTrk(true). MatchInPort(ofPort) if f.ovsMetersAreSupported { - fb = fb.Action().Meter(snatMark) + if _, ok := f.cachedMeter.Load(snatMark); ok { + fb = fb.Action().Meter(snatMark) + } } return fb.Action().LoadPktMarkRange(snatMark, snatPktMarkRange). Action().LoadRegMark(ToGatewayRegMark). diff --git a/pkg/apis/crd/v1beta1/types.go b/pkg/apis/crd/v1beta1/types.go index e22bce81892..0b79a54660a 100644 --- a/pkg/apis/crd/v1beta1/types.go +++ b/pkg/apis/crd/v1beta1/types.go @@ -867,7 +867,7 @@ type EgressSpec struct { type RateLimit struct { // Rate specifies the maximum mbps. Peak uint32 `json:"peak"` - // Burst specifies maximum burst for throttle. + // Burst specifies maximum burst mbps for throttle. Burst uint32 `json:"burst"` } diff --git a/pkg/controller/egress/controller_test.go b/pkg/controller/egress/controller_test.go index e0be9bd0edc..b4bdac926f7 100644 --- a/pkg/controller/egress/controller_test.go +++ b/pkg/controller/egress/controller_test.go @@ -67,7 +67,7 @@ var ( eipFoo2 = newExternalIPPool("pool2", "", "2.2.2.10", "2.2.2.20") ) -func newEgress(name, egressIP, externalIPPool string, podSelector, namespaceSelector *metav1.LabelSelector) *v1beta1.Egress { +func newEgress(name, egressIP, externalIPPool string, podSelector, namespaceSelector *metav1.LabelSelector, rateLimit *v1beta1.RateLimit) *v1beta1.Egress { egress := &v1beta1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: name}, Spec: v1beta1.EgressSpec{ @@ -77,6 +77,7 @@ func newEgress(name, egressIP, externalIPPool string, podSelector, namespaceSele }, EgressIP: egressIP, ExternalIPPool: externalIPPool, + RateLimit: rateLimit, }, } return egress diff --git a/pkg/controller/egress/validate.go b/pkg/controller/egress/validate.go index 080bf8293dd..d73a085c826 100644 --- a/pkg/controller/egress/validate.go +++ b/pkg/controller/egress/validate.go @@ -57,12 +57,14 @@ func (c *EgressController) ValidateEgress(review *admv1.AdmissionReview) *admv1. if newEgress.Spec.EgressIP != "" { objs, _ := c.egressInformer.Informer().GetIndexer().ByIndex(egressIPIndex, newEgress.Spec.EgressIP) if newEgress.Spec.RateLimit != nil && len(objs) > 0 { - return false, fmt.Sprintf("Egress with rate-limit can't share EgressIP %s with other Egress", newEgress.Spec.EgressIP) + if len(objs) > 1 || objs[0].(*crdv1beta1.Egress).Name != newEgress.Name { + return false, fmt.Sprintf("Egress with rate-limit can't share EgressIP %s with other Egresses", newEgress.Spec.EgressIP) + } } for _, obj := range objs { egress := obj.(*crdv1beta1.Egress) - if egress.Spec.RateLimit != nil { - return false, fmt.Sprintf("Egress with rate-limit can't share EgressIP %s with other Egress", newEgress.Spec.EgressIP) + if egress.Spec.RateLimit != nil && egress.Name != newEgress.Name { + return false, fmt.Sprintf("EgressIP %s used by other Egress with rate-limit can't be shared", newEgress.Spec.EgressIP) } } } diff --git a/pkg/controller/egress/validate_test.go b/pkg/controller/egress/validate_test.go index 29105707337..a4e14d9c372 100644 --- a/pkg/controller/egress/validate_test.go +++ b/pkg/controller/egress/validate_test.go @@ -34,9 +34,16 @@ func marshal(object runtime.Object) []byte { } func TestEgressControllerValidateEgress(t *testing.T) { + var ( + rateLimit = crdv1beta1.RateLimit{ + Peak: 500, + Burst: 1000, + } + ) tests := []struct { name string existingExternalIPPool *crdv1beta1.ExternalIPPool + existingEgress *crdv1beta1.Egress request *admv1.AdmissionRequest expectedResponse *admv1.AdmissionResponse }{ @@ -46,7 +53,7 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "CREATE", - Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "nonExistingPool", nil, nil))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "nonExistingPool", nil, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{ Allowed: false, @@ -61,7 +68,7 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "CREATE", - Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.11.1", "bar", nil, nil))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.11.1", "bar", nil, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{ Allowed: false, @@ -76,7 +83,7 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "CREATE", - Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{Allowed: true}, }, @@ -86,8 +93,8 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "UPDATE", - OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil))}, - Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.11.1", "bar", nil, nil))}, + OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil, nil))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.11.1", "bar", nil, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{ Allowed: false, @@ -102,8 +109,8 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "UPDATE", - OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil))}, - Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.2", "bar", nil, nil))}, + OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil, nil))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.2", "bar", nil, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{Allowed: true}, }, @@ -113,10 +120,10 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "UPDATE", - OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil))}, + OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil, nil))}, Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.2", "bar", &metav1.LabelSelector{ MatchLabels: map[string]string{"foo": "bar"}, - }, nil))}, + }, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{Allowed: true}, }, @@ -125,7 +132,57 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "DELETE", - Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.2", "bar", nil, nil))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.2", "bar", nil, nil, nil))}, + }, + expectedResponse: &admv1.AdmissionResponse{Allowed: true}, + }, + { + name: "Creating an Egress with rate-limiting sharing IP with exist Egresses should not be allowed", + request: &admv1.AdmissionRequest{ + Name: "foo", + Operation: "CREATE", + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "", nil, nil, &rateLimit))}, + }, + existingEgress: newEgress("bar", "10.10.10.1", "", nil, nil, nil), + expectedResponse: &admv1.AdmissionResponse{ + Allowed: false, + Result: &metav1.Status{ + Message: "Egress with rate-limit can't share EgressIP 10.10.10.1 with other Egresses", + }, + }, + }, + { + name: "Creating an Egress sharing IP with an exist Egress with rate-limit should not be allowed", + request: &admv1.AdmissionRequest{ + Name: "foo", + Operation: "CREATE", + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "", nil, nil, nil))}, + }, + existingEgress: newEgress("bar", "10.10.10.1", "", nil, nil, &rateLimit), + expectedResponse: &admv1.AdmissionResponse{ + Allowed: false, + Result: &metav1.Status{ + Message: "EgressIP 10.10.10.1 used by other Egress with rate-limit can't be shared", + }, + }, + }, + { + name: "Creating an Egress with rate-limiting not sharing IP with exist Egresses should be allowed", + request: &admv1.AdmissionRequest{ + Name: "foo", + Operation: "CREATE", + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "", nil, nil, &rateLimit))}, + }, + existingEgress: newEgress("bar", "10.10.10.2", "", nil, nil, nil), + expectedResponse: &admv1.AdmissionResponse{Allowed: true}, + }, + { + name: "Update an Egress rate-limiting config should be allowed", + request: &admv1.AdmissionRequest{ + Name: "foo", + Operation: "UPDATE", + OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "", nil, nil, &rateLimit))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "", nil, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{Allowed: true}, }, @@ -138,6 +195,9 @@ func TestEgressControllerValidateEgress(t *testing.T) { if tt.existingExternalIPPool != nil { objs = append(objs, tt.existingExternalIPPool) } + if tt.existingEgress != nil { + objs = append(objs, tt.existingEgress) + } controller := newController(nil, objs) controller.informerFactory.Start(stopCh) controller.crdInformerFactory.Start(stopCh) diff --git a/test/e2e/egress_test.go b/test/e2e/egress_test.go index e9c01691779..d0dd3786abb 100644 --- a/test/e2e/egress_test.go +++ b/test/e2e/egress_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "net" + "strconv" "strings" "testing" "time" @@ -63,6 +64,7 @@ func TestEgress(t *testing.T) { t.Run("testEgressUpdateNodeSelector", func(t *testing.T) { testEgressUpdateNodeSelector(t, data) }) t.Run("testEgressNodeFailure", func(t *testing.T) { testEgressNodeFailure(t, data) }) t.Run("testCreateExternalIPPool", func(t *testing.T) { testCreateExternalIPPool(t, data) }) + t.Run("testUpdateRateLimit", func(t *testing.T) { testEgressUpdateRateLimit(t, data) }) } func testCreateExternalIPPool(t *testing.T, data *TestData) { @@ -203,7 +205,7 @@ func testEgressClientIP(t *testing.T, data *TestData) { Operator: metav1.LabelSelectorOpExists, }, } - egress := data.createEgress(t, "egress-", matchExpressions, nil, "", egressNodeIP) + egress := data.createEgress(t, "egress-", matchExpressions, nil, "", egressNodeIP, nil) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) assertClientIP(localPod, egressNodeIP) assertClientIP(remotePod, egressNodeIP) @@ -355,7 +357,7 @@ func testEgressCRUD(t *testing.T, data *TestData) { pool := data.createExternalIPPool(t, "crud-pool-", tt.ipRange, tt.nodeSelector.MatchExpressions, tt.nodeSelector.MatchLabels) defer data.crdClient.CrdV1beta1().ExternalIPPools().Delete(context.TODO(), pool.Name, metav1.DeleteOptions{}) - egress := data.createEgress(t, "crud-egress-", nil, map[string]string{"foo": "bar"}, pool.Name, "") + egress := data.createEgress(t, "crud-egress-", nil, map[string]string{"foo": "bar"}, pool.Name, "", nil) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) // Use Poll to wait the interval before the first run to detect the case that the IP is assigned to any Node // when it's not supposed to. @@ -472,7 +474,7 @@ func testEgressUpdateEgressIP(t *testing.T, data *TestData) { newPool := data.createExternalIPPool(t, "newpool-", tt.newIPRange, nil, map[string]string{v1.LabelHostname: tt.newNode}) defer data.crdClient.CrdV1beta1().ExternalIPPools().Delete(context.TODO(), newPool.Name, metav1.DeleteOptions{}) - egress := data.createEgress(t, "egress-", nil, map[string]string{"foo": "bar"}, originalPool.Name, "") + egress := data.createEgress(t, "egress-", nil, map[string]string{"foo": "bar"}, originalPool.Name, "", nil) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) egress, err := data.checkEgressState(egress.Name, tt.originalEgressIP, tt.originalNode, "", time.Second) require.NoError(t, err) @@ -620,7 +622,7 @@ func testEgressMigration(t *testing.T, data *TestData, triggerFunc, revertFunc f externalIPPoolTwoNodes := data.createExternalIPPool(t, "pool-with-two-nodes-", *ipRange, matchExpressions, nil) defer data.crdClient.CrdV1beta1().ExternalIPPools().Delete(context.TODO(), externalIPPoolTwoNodes.Name, metav1.DeleteOptions{}) - egress := data.createEgress(t, "migration-egress-", nil, map[string]string{"foo": "bar"}, externalIPPoolTwoNodes.Name, "") + egress := data.createEgress(t, "migration-egress-", nil, map[string]string{"foo": "bar"}, externalIPPoolTwoNodes.Name, "", nil) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) var err error @@ -664,6 +666,178 @@ func testEgressMigration(t *testing.T, data *TestData, triggerFunc, revertFunc f checkIPNeighbor(fromNode) } +func testEgressUpdateRateLimit(t *testing.T, data *TestData) { + var ( + rateLimit1 = &v1beta1.RateLimit{ + Peak: 100, + Burst: 200, + } + rateLimit2 = &v1beta1.RateLimit{ + Peak: 500, + Burst: 1000, + } + ) + + egressNode := controlPlaneNodeName() + var egressNodeIP string + if clusterInfo.podV4NetworkCIDR != "" { + egressNodeIP = controlPlaneNodeIPv4() + } else { + egressNodeIP = controlPlaneNodeIPv6() + } + egressNodeAntreaPodName, err := data.getAntreaPodOnNode(egressNode) + nonEgressNodeAntreaPodName, err := data.getAntreaPodOnNode(workerNodeName(1)) + if err != nil { + t.Fatalf("Failed to get AntreaPod on Node: %v", err) + } + + if err := data.createBusyboxPodOnNode("local-pod", data.testNamespace, egressNode, false); err != nil { + t.Fatalf("Failed to create local Pod: %v", err) + } + defer deletePodWrapper(t, data, data.testNamespace, "local-pod") + if err := data.podWaitForRunning(defaultTimeout, "local-pod", data.testNamespace); err != nil { + t.Fatalf("Error when waiting for Pod 'local-pod' to be in the Running state") + } + if err := data.createBusyboxPodOnNode("remote-pod", data.testNamespace, workerNodeName(1), false); err != nil { + t.Fatalf("Failed to create remote Pod: %v", err) + } + defer deletePodWrapper(t, data, data.testNamespace, "remote-pod") + if err := data.podWaitForRunning(defaultTimeout, "remote-pod", data.testNamespace); err != nil { + t.Fatalf("Error when waiting for Pod 'remote-pod' to be in the Running state") + } + // Match all E2E Pods expression + matchExpressions := []metav1.LabelSelectorRequirement{ + { + Key: "antrea-e2e", + Operator: metav1.LabelSelectorOpExists, + }, + } + + tests := []struct { + name string + originalRateLimit *v1beta1.RateLimit + newRateLimit *v1beta1.RateLimit + }{ + { + name: "add rate-limit", + originalRateLimit: nil, + newRateLimit: rateLimit1, + }, + { + name: "remove rate-limit", + originalRateLimit: rateLimit1, + newRateLimit: nil, + }, + { + name: "change rate-limit", + originalRateLimit: rateLimit1, + newRateLimit: rateLimit2, + }, + } + checkRateLimitInstall := func(rateLimit *v1beta1.RateLimit) { + meterID := data.getEgressMeterID(egressNodeAntreaPodName, rateLimit.Peak, rateLimit.Burst) + if meterID == 0 { + t.Fatalf("Meter installation failed") + } + meterID = data.getEgressMeterID(nonEgressNodeAntreaPodName, rateLimit.Peak, rateLimit.Burst) + if meterID != 0 { + t.Fatalf("Unexpected meters present on non-EgressNode") + } + meterFlowNum := data.getEgressMeterFlowNum(egressNodeAntreaPodName, meterID) + if meterFlowNum != 2 { + t.Fatalf("Flows with meter installation failed") + } + meterFlowNum = data.getEgressMeterFlowNum(nonEgressNodeAntreaPodName, meterID) + if meterFlowNum != 0 { + t.Fatalf("Unexpected flows with meter on non-EgressNode") + } + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + egress := data.createEgress(t, "egress-qos-", matchExpressions, nil, "", egressNodeIP, tt.originalRateLimit) + defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) + _, err := data.waitForEgressRealized(egress) + if err != nil { + t.Fatalf("Error when waiting for Egress to be realized: %v", err) + } + time.Sleep(60 * time.Second) + if tt.originalRateLimit != nil { + checkRateLimitInstall(tt.originalRateLimit) + } + + toUpdate := egress.DeepCopy() + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + toUpdate.Spec.RateLimit = tt.newRateLimit + _, err := data.crdClient.CrdV1beta1().Egresses().Update(context.TODO(), toUpdate, metav1.UpdateOptions{}) + if err != nil && errors.IsConflict(err) { + toUpdate, _ = data.crdClient.CrdV1beta1().Egresses().Get(context.TODO(), egress.Name, metav1.GetOptions{}) + } + return err + }) + require.NoError(t, err, "Failed to update Egress") + + if tt.originalRateLimit != nil { + meterID := data.getEgressMeterID(egressNodeAntreaPodName, tt.originalRateLimit.Peak, tt.originalRateLimit.Burst) + if meterID != 0 { + t.Fatalf("Meter update failed") + } + + meterFlowNum := data.getEgressMeterFlowNum(egressNodeAntreaPodName, meterID) + if meterFlowNum != 0 { + t.Fatalf("Flows with meter update failed") + } + } + if tt.newRateLimit != nil { + checkRateLimitInstall(tt.newRateLimit) + } + }) + } +} + +func (data *TestData) getEgressMeterFlowNum(antreaPodName string, meterID int) int { + // dump-flows output should look like below: + // cookie=0x7040000000000, duration=17.079s, table=EgressMark, n_packets=0, n_bytes=0, idle_age=17, priority=200,ct_state=+new+trk,ip,tun_dst=172.18.0.3 actions=meter:1,set_field:0x1/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc + // cookie=0x7040000000000, duration=17.073s, table=EgressMark, n_packets=0, n_bytes=0, idle_age=17, priority=200,ct_state=+new+trk,ip,in_port="se1-5584-d7ca50" actions=meter:1,set_field:0x1/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc + cmd := []string{ + "ovs-ofctl", "dump-flows", defaultBridgeName, "table=EgressMark", "-O openflow15", + "|", fmt.Sprintf("grep \"meter:%d\" -c", meterID), + } + stdout, _, err := data.RunCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd) + if err != nil { + return -1 + } + flowNum, err := strconv.Atoi(stdout) + if err != nil { + return -1 + } + return flowNum +} + +func (data *TestData) getEgressMeterID(antreaPodName string, peak, burst uint32) int { + // dump-meters output should look like below: + // meter=1 kbps burst bands= + // type=drop rate=100000 burst_size=200000 + // + // meter=256 pktps burst bands= + // type=drop rate=500 burst_size=1000 + cmd := []string{ + "ovs-ofctl", "dump-meters", defaultBridgeName, "-O openflow15", + "|", fmt.Sprintf("grep -B 1 \"%d burst_size=%d\"", peak*1000, burst*1000), + "|", "grep kbps", + } + stdout, _, _ := data.RunCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd) + if stdout == "" { + return 0 + } + meterStr := strings.Split(stdout, " ")[0] + meterIDStr := strings.Split(meterStr, "=")[1] + meterID, err := strconv.Atoi(meterIDStr) + if err != nil { + return 0 + } + return meterID +} + func (data *TestData) checkEgressState(egressName, expectedIP, expectedNode, otherNode string, timeout time.Duration) (*v1beta1.Egress, error) { var egress *v1beta1.Egress var expectedNodeHasIP, otherNodeHasIP bool @@ -790,7 +964,7 @@ func (data *TestData) createExternalIPPool(t *testing.T, generateName string, ip return pool } -func (data *TestData) createEgress(t *testing.T, generateName string, matchExpressions []metav1.LabelSelectorRequirement, matchLabels map[string]string, externalPoolName string, egressIP string) *v1beta1.Egress { +func (data *TestData) createEgress(t *testing.T, generateName string, matchExpressions []metav1.LabelSelectorRequirement, matchLabels map[string]string, externalPoolName string, egressIP string, rateLimit *v1beta1.RateLimit) *v1beta1.Egress { egress := &v1beta1.Egress{ ObjectMeta: metav1.ObjectMeta{GenerateName: generateName}, Spec: v1beta1.EgressSpec{ @@ -802,6 +976,7 @@ func (data *TestData) createEgress(t *testing.T, generateName string, matchExpre }, ExternalIPPool: externalPoolName, EgressIP: egressIP, + RateLimit: rateLimit, }, } egress, err := data.crdClient.CrdV1beta1().Egresses().Create(context.TODO(), egress, metav1.CreateOptions{}) diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 2ff2d7a60ab..e4da3b2491b 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -553,7 +553,7 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs } else { egressNodeIP = nodeIPv6(0) } - egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "busybox"}, "", egressNodeIP) + egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "busybox"}, "", egressNodeIP, nil) egress, err := data.waitForEgressRealized(egress) if err != nil { t.Fatalf("Error when waiting for Egress to be realized: %v", err) @@ -593,7 +593,7 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs } else { egressNodeIP = nodeIPv6(1) } - egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "busybox"}, "", egressNodeIP) + egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "busybox"}, "", egressNodeIP, nil) egress, err := data.waitForEgressRealized(egress) if err != nil { t.Fatalf("Error when waiting for Egress to be realized: %v", err) diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 744c46de926..08b4db3c327 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -1760,6 +1760,14 @@ func (data *TestData) RunCommandFromAntreaPodOnNode(nodeName string, cmd []strin return data.RunCommandFromPod(antreaNamespace, antreaPodName, agentContainerName, cmd) } +func (data *TestData) RunCommandFromAntreaOVSOnNode(nodeName string, cmd []string) (string, string, error) { + antreaPodName, err := data.getAntreaPodOnNode(nodeName) + if err != nil { + return "", "", err + } + return data.RunCommandFromPod(antreaNamespace, antreaPodName, agentContainerName, cmd) +} + // getFlowAggregator retrieves the name of the Flow-Aggregator Pod (flow-aggregator-*) running on a specific Node. func (data *TestData) getFlowAggregator() (*corev1.Pod, error) { listOptions := metav1.ListOptions{ diff --git a/test/e2e/proxy_test.go b/test/e2e/proxy_test.go index b08a98631da..a4b0274ada9 100644 --- a/test/e2e/proxy_test.go +++ b/test/e2e/proxy_test.go @@ -423,7 +423,7 @@ func TestNodePortAndEgressWithTheSameBackendPod(t *testing.T) { // Create an Egress whose external IP is on worker Node. egressNodeIP := workerNodeIPv4(1) - egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "nginx"}, "", egressNodeIP) + egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "nginx"}, "", egressNodeIP, nil) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) // Create the backend Pod on control plane Node. diff --git a/test/e2e/traceflow_test.go b/test/e2e/traceflow_test.go index e95bb458609..0e1d37ebd0e 100644 --- a/test/e2e/traceflow_test.go +++ b/test/e2e/traceflow_test.go @@ -2052,7 +2052,7 @@ func testTraceflowEgress(t *testing.T, data *TestData) { }, } - egress := data.createEgress(t, "egress-", matchExpressions, nil, "", egressIP) + egress := data.createEgress(t, "egress-", matchExpressions, nil, "", egressIP, nil) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) testcaseLocalEgress := testcase{