From 3495e11af587770da7edda9198965f4b4b59e2ed Mon Sep 17 00:00:00 2001 From: graysonwu Date: Fri, 18 Aug 2023 21:46:43 -0700 Subject: [PATCH] Egress QoS support Add rate-limit config to Egress specifying the rate limit of north-south egress traffic of this Egress. All backend workloads selected by a rate-limited Egress share the same bandwidth while sending egress traffic via this Egress. An Egress with `rateLimit` specified cannot share EgressIP with any other Egresses. Signed-off-by: graysonwu --- build/charts/antrea/crds/egress.yaml | 7 + build/yamls/antrea-aks.yml | 7 + build/yamls/antrea-crds.yml | 7 + build/yamls/antrea-eks.yml | 7 + build/yamls/antrea-gke.yml | 7 + build/yamls/antrea-ipsec.yml | 7 + build/yamls/antrea.yml | 7 + docs/egress.md | 14 ++ .../controller/egress/egress_controller.go | 69 +++++- .../egress/egress_controller_test.go | 169 +++++++++++++-- pkg/agent/openflow/client.go | 66 +++++- pkg/agent/openflow/client_test.go | 2 +- pkg/agent/openflow/egress.go | 37 ++-- .../openflow/externalnode_connectivity.go | 4 +- pkg/agent/openflow/framework.go | 8 +- pkg/agent/openflow/multicast.go | 4 +- pkg/agent/openflow/multicluster.go | 4 +- pkg/agent/openflow/network_policy.go | 12 +- pkg/agent/openflow/network_policy_test.go | 20 +- pkg/agent/openflow/packetin.go | 9 +- pkg/agent/openflow/pipeline.go | 33 +-- pkg/agent/openflow/pod_connectivity.go | 4 +- pkg/agent/openflow/service.go | 4 +- pkg/agent/openflow/testing/mock_openflow.go | 28 +++ pkg/agent/openflow/traceflow.go | 4 +- pkg/apis/crd/v1beta1/types.go | 10 + pkg/apis/crd/v1beta1/zz_generated.deepcopy.go | 21 ++ pkg/apiserver/openapi/zz_generated.openapi.go | 38 +++- pkg/controller/egress/controller.go | 19 ++ pkg/controller/egress/controller_test.go | 3 +- pkg/controller/egress/validate.go | 15 ++ pkg/controller/egress/validate_test.go | 80 ++++++- test/e2e/egress_test.go | 197 +++++++++++++++++- test/e2e/flowaggregator_test.go | 4 +- test/e2e/framework.go | 8 + test/e2e/proxy_test.go | 2 +- test/e2e/traceflow_test.go | 2 +- 37 files changed, 828 insertions(+), 111 deletions(-) diff --git a/build/charts/antrea/crds/egress.yaml b/build/charts/antrea/crds/egress.yaml index d773c23f7f7..e395d4289e6 100644 --- a/build/charts/antrea/crds/egress.yaml +++ b/build/charts/antrea/crds/egress.yaml @@ -224,6 +224,13 @@ spec: type: array items: type: string + rateLimit: + type: object + properties: + peak: + type: integer + burst: + type: integer status: type: object properties: diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index ad6d8f6fb56..aabbeb4c084 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -2360,6 +2360,13 @@ spec: type: array items: type: string + rateLimit: + type: object + properties: + peak: + type: integer + burst: + type: integer status: type: object properties: diff --git a/build/yamls/antrea-crds.yml b/build/yamls/antrea-crds.yml index c4270f1d663..346e105eefa 100644 --- a/build/yamls/antrea-crds.yml +++ b/build/yamls/antrea-crds.yml @@ -2351,6 +2351,13 @@ spec: type: array items: type: string + rateLimit: + type: object + properties: + peak: + type: integer + burst: + type: integer status: type: object properties: diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index b1bfdc60b67..10fcbbc2633 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -2360,6 +2360,13 @@ spec: type: array items: type: string + rateLimit: + type: object + properties: + peak: + type: integer + burst: + type: integer status: type: object properties: diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 0efa7a61c61..c7895822f42 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -2360,6 +2360,13 @@ spec: type: array items: type: string + rateLimit: + type: object + properties: + peak: + type: integer + burst: + type: integer status: type: object properties: diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 8c2670efa95..582f374595b 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -2360,6 +2360,13 @@ spec: type: array items: type: string + rateLimit: + type: object + properties: + peak: + type: integer + burst: + type: integer status: type: object properties: diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 42611ddb053..5aa97dd7202 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -2360,6 +2360,13 @@ spec: type: array items: type: string + rateLimit: + type: object + properties: + peak: + type: integer + burst: + type: integer status: type: object properties: diff --git a/docs/egress.md b/docs/egress.md index 447c9c42ba7..0e7715e3776 100644 --- a/docs/egress.md +++ b/docs/egress.md @@ -9,6 +9,7 @@ - [AppliedTo](#appliedto) - [EgressIP](#egressip) - [ExternalIPPool](#externalippool) + - [RateLimit](#ratelimit) - [The ExternalIPPool resource](#the-externalippool-resource) - [IPRanges](#ipranges) - [NodeSelector](#nodeselector) @@ -79,6 +80,9 @@ spec: role: web egressIP: 10.10.0.8 # can be populated by Antrea after assigning an IP from the pool below externalIPPool: prod-external-ip-pool + rateLimit: + peak: 300 + burst: 500 status: egressNode: node01 ``` @@ -127,6 +131,16 @@ The `externalIPPool` field specifies the name of the `ExternalIPPool` that the be assigned to. It can be empty, which means users should assign the `egressIP` to one Node manually. +### RateLimit + +The `rateLimit` field specifies north-south egress traffic of the Egress. `peak` +specifies the maximum mbps. `burst` specifies maximum burst mbps for throttle. +All backend workloads selected by a rate-limited Egress share the same bandwidth +while sending egress traffic via this Egress. + +**Note**: An Egress with `rateLimit` specified cannot share EgressIP with any +other Egresses. + ## The ExternalIPPool resource ExternalIPPool defines one or multiple IP ranges that can be used in the diff --git a/pkg/agent/controller/egress/egress_controller.go b/pkg/agent/controller/egress/egress_controller.go index d310c757cf3..a32129406e8 100644 --- a/pkg/agent/controller/egress/egress_controller.go +++ b/pkg/agent/controller/egress/egress_controller.go @@ -89,6 +89,8 @@ type egressState struct { ofPorts sets.Set[int32] // The actual Pods of the Egress. Used to identify stale Pods when updating or deleting an Egress. pods sets.Set[string] + // Whether the Egress has rate-limit. + hasRateLimit bool } // egressIPState keeps the actual state of an Egress IP. It's maintained separately from egressState because @@ -147,6 +149,9 @@ type EgressController struct { ipAssigner ipassigner.IPAssigner egressIPScheduler *egressIPScheduler + + meterTracker map[uint32]*crdv1b1.RateLimit + meterTrackerMutex sync.RWMutex } func NewEgressController( @@ -181,6 +186,7 @@ func NewEgressController( localIPDetector: ipassigner.NewLocalIPDetector(), idAllocator: newIDAllocator(minEgressMark, maxEgressMark), cluster: cluster, + meterTracker: map[uint32]*crdv1b1.RateLimit{}, } ipAssigner, err := newIPAssigner(nodeTransportInterface, egressDummyDevice) if err != nil { @@ -388,7 +394,7 @@ func (c *EgressController) processNextWorkItem() bool { // and iptables rule for this IP and the mark. // If the Egress IP is changed from local to non local, it uninstalls flows and iptables rule and releases the mark. // The method returns the mark on success. Non local Egresses use 0 as the mark. -func (c *EgressController) realizeEgressIP(egressName, egressIP string) (uint32, error) { +func (c *EgressController) realizeEgressIP(egressName, egressIP string, rateLimit *crdv1b1.RateLimit, rateLimitChanged bool) (uint32, error) { isLocalIP := c.localIPDetector.IsLocalIP(egressIP) c.egressIPStatesMutex.Lock() @@ -415,8 +421,13 @@ func (c *EgressController) realizeEgressIP(egressName, egressIP string) (uint32, return 0, fmt.Errorf("error allocating mark for IP %s: %v", egressIP, err) } } + if rateLimit != nil { + if err := c.installEgressQoSMeter(ipState.mark, rateLimit); err != nil { + return 0, err + } + } // Ensure datapath is installed properly. - if !ipState.flowsInstalled { + if rateLimitChanged || !ipState.flowsInstalled { if err := c.ofClient.InstallSNATMarkFlows(ipState.egressIP, ipState.mark); err != nil { return 0, fmt.Errorf("error installing SNAT mark flows for IP %s: %v", ipState.egressIP, err) } @@ -490,6 +501,37 @@ func (c *EgressController) unrealizeEgressIP(egressName, egressIP string) error return nil } +func (c *EgressController) uninstallEgressQoSMeter(meterID uint32) error { + if meterID == 0 { + return nil + } + c.meterTrackerMutex.Lock() + defer c.meterTrackerMutex.Unlock() + if _, exist := c.meterTracker[meterID]; exist { + if err := c.ofClient.UninstallEgressQoSMeter(meterID); err != nil { + return err + } + delete(c.meterTracker, meterID) + } + return nil +} + +func (c *EgressController) installEgressQoSMeter(meterID uint32, rateLimit *crdv1b1.RateLimit) error { + if rateLimit == nil { + return nil + } + c.meterTrackerMutex.Lock() + defer c.meterTrackerMutex.Unlock() + meter, exist := c.meterTracker[meterID] + if !exist || meter.Peak != rateLimit.Peak || meter.Burst != rateLimit.Burst { + if err := c.ofClient.InstallEgressQoSMeter(meterID, rateLimit.Peak, rateLimit.Burst); err != nil { + return err + } + c.meterTracker[meterID] = rateLimit + } + return nil +} + func (c *EgressController) getEgressState(egressName string) (*egressState, bool) { c.egressStatesMutex.RLock() defer c.egressStatesMutex.RUnlock() @@ -671,14 +713,18 @@ func (c *EgressController) syncEgress(egressName string) error { } } + // rateLimitChanged will be true if this Egress update its rateLimit or this is a new Egress. + rateLimitChanged := eState.hasRateLimit != (egress.Spec.RateLimit != nil) // Realize the latest EgressIP and get the desired mark. - mark, err := c.realizeEgressIP(egressName, desiredEgressIP) + mark, err := c.realizeEgressIP(egressName, desiredEgressIP, egress.Spec.RateLimit, rateLimitChanged) if err != nil { return err } // If the mark changes, uninstall all of the Egress's Pod flows first, then installs them with new mark. - // It could happen when the Egress IP is added to or removed from the Node. + // It could happen when the Egress IP is added to or removed from the Node. And record the old mark for + // meter deletion later. + oldMark := eState.mark if eState.mark != mark { // Uninstall all of its Pod flows. if err := c.uninstallPodFlows(egressName, eState, eState.ofPorts, eState.pods); err != nil { @@ -729,7 +775,9 @@ func (c *EgressController) syncEgress(egressName string) error { ofPort := ifaces[0].OFPort if eState.ofPorts.Has(ofPort) { staleOFPorts.Delete(ofPort) - continue + if !rateLimitChanged { + continue + } } if err := c.ofClient.InstallPodSNATFlows(uint32(ofPort), egressIP, mark); err != nil { return err @@ -741,6 +789,13 @@ func (c *EgressController) syncEgress(egressName string) error { if err := c.uninstallPodFlows(egressName, eState, staleOFPorts, stalePods); err != nil { return err } + + if egress.Spec.RateLimit == nil || !c.localIPDetector.IsLocalIP(desiredEgressIP) { + if err := c.uninstallEgressQoSMeter(oldMark); err != nil { + return err + } + } + eState.hasRateLimit = egress.Spec.RateLimit != nil return nil } @@ -753,6 +808,10 @@ func (c *EgressController) uninstallEgress(egressName string, eState *egressStat if err := c.unrealizeEgressIP(egressName, eState.egressIP); err != nil { return err } + // Uninstall its meter. + if err := c.uninstallEgressQoSMeter(eState.mark); err != nil { + return err + } // Unassign the Egress IP from the local Node if it was assigned by the agent. if err := c.ipAssigner.UnassignIP(eState.egressIP); err != nil { return err diff --git a/pkg/agent/controller/egress/egress_controller_test.go b/pkg/agent/controller/egress/egress_controller_test.go index f956f280070..26e2712b15c 100644 --- a/pkg/agent/controller/egress/egress_controller_test.go +++ b/pkg/agent/controller/egress/egress_controller_test.go @@ -59,6 +59,17 @@ const ( fakeNode = "node1" ) +var ( + fakeRateLimit = crdv1b1.RateLimit{ + Peak: 500, + Burst: 1000, + } + newFakeRateLimit = crdv1b1.RateLimit{ + Peak: 200, + Burst: 400, + } +) + type fakeLocalIPDetector struct { localIPs sets.Set[string] } @@ -207,11 +218,11 @@ func TestSyncEgress(t *testing.T) { name: "Local IP becomes non local", existingEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, }, newEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, }, existingEgressGroup: &cpv1b2.EgressGroup{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, @@ -231,10 +242,11 @@ func TestSyncEgress(t *testing.T) { expectedEgresses: []*crdv1b1.Egress{ { ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, }, }, expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) { + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), fakeRateLimit.Peak, fakeRateLimit.Burst) mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(2), net.ParseIP(fakeLocalEgressIP1), uint32(1)) @@ -250,17 +262,18 @@ func TestSyncEgress(t *testing.T) { mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(0)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(3), net.ParseIP(fakeLocalEgressIP1), uint32(0)) mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1) + mockOFClient.EXPECT().UninstallEgressQoSMeter(uint32(1)) }, }, { name: "Non local IP becomes local", existingEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1, RateLimit: &fakeRateLimit}, }, newEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1, RateLimit: &fakeRateLimit}, }, existingEgressGroup: &cpv1b2.EgressGroup{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, @@ -280,7 +293,7 @@ func TestSyncEgress(t *testing.T) { expectedEgresses: []*crdv1b1.Egress{ { ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1, RateLimit: &fakeRateLimit}, Status: crdv1b1.EgressStatus{EgressIP: fakeRemoteEgressIP1, EgressNode: fakeNode}, }, }, @@ -293,6 +306,7 @@ func TestSyncEgress(t *testing.T) { mockOFClient.EXPECT().UninstallPodSNATFlows(uint32(2)) mockIPAssigner.EXPECT().UnassignIP(fakeRemoteEgressIP1) + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), fakeRateLimit.Peak, fakeRateLimit.Burst) mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeRemoteEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeRemoteEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(3), net.ParseIP(fakeRemoteEgressIP1), uint32(1)) @@ -304,11 +318,11 @@ func TestSyncEgress(t *testing.T) { name: "Change from local Egress IP to another local one", existingEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, }, newEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP2}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP2, RateLimit: &fakeRateLimit}, }, existingEgressGroup: &cpv1b2.EgressGroup{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, @@ -327,11 +341,12 @@ func TestSyncEgress(t *testing.T) { expectedEgresses: []*crdv1b1.Egress{ { ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP2}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP2, RateLimit: &fakeRateLimit}, Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP2, EgressNode: fakeNode}, }, }, expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) { + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), fakeRateLimit.Peak, fakeRateLimit.Burst) mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(2), net.ParseIP(fakeLocalEgressIP1), uint32(1)) @@ -345,22 +360,24 @@ func TestSyncEgress(t *testing.T) { mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1) mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP2) + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), fakeRateLimit.Peak, fakeRateLimit.Burst) mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP2), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP2), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(3), net.ParseIP(fakeLocalEgressIP2), uint32(1)) mockRouteClient.EXPECT().AddSNATRule(net.ParseIP(fakeLocalEgressIP2), uint32(1)) mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP2) + mockOFClient.EXPECT().UninstallEgressQoSMeter(uint32(1)) }, }, { name: "Change from local Egress IP to a remote one", existingEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, }, newEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1, RateLimit: &fakeRateLimit}, }, existingEgressGroup: &cpv1b2.EgressGroup{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, @@ -379,10 +396,11 @@ func TestSyncEgress(t *testing.T) { expectedEgresses: []*crdv1b1.Egress{ { ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1, RateLimit: &fakeRateLimit}, }, }, expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) { + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), fakeRateLimit.Peak, fakeRateLimit.Burst) mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(2), net.ParseIP(fakeLocalEgressIP1), uint32(1)) @@ -399,17 +417,18 @@ func TestSyncEgress(t *testing.T) { mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeRemoteEgressIP1), uint32(0)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(3), net.ParseIP(fakeRemoteEgressIP1), uint32(0)) mockIPAssigner.EXPECT().UnassignIP(fakeRemoteEgressIP1) + mockOFClient.EXPECT().UninstallEgressQoSMeter(uint32(1)) }, }, { name: "Change from remote Egress IP to a local one", existingEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1, RateLimit: &fakeRateLimit}, }, newEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, }, existingEgressGroup: &cpv1b2.EgressGroup{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, @@ -428,7 +447,7 @@ func TestSyncEgress(t *testing.T) { expectedEgresses: []*crdv1b1.Egress{ { ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP1, EgressNode: fakeNode}, }, }, @@ -442,6 +461,7 @@ func TestSyncEgress(t *testing.T) { mockIPAssigner.EXPECT().UnassignIP(fakeRemoteEgressIP1) mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1) + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), fakeRateLimit.Peak, fakeRateLimit.Burst) mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(3), net.ParseIP(fakeLocalEgressIP1), uint32(1)) @@ -688,6 +708,125 @@ func TestSyncEgress(t *testing.T) { mockRouteClient.EXPECT().DeleteSNATRule(uint32(1)) }, }, + { + name: "Update Egress from non-rate-limited to rate-limited", + existingEgress: &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + }, + newEgress: &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, + }, + existingEgressGroup: &cpv1b2.EgressGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + GroupMembers: []cpv1b2.GroupMember{ + {Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}}, + }, + }, + newEgressGroup: &cpv1b2.EgressGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + GroupMembers: []cpv1b2.GroupMember{ + {Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}}, + }, + }, + expectedEgresses: []*crdv1b1.Egress{ + { + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, + Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP1, EgressNode: fakeNode}, + }, + }, + expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) { + mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockRouteClient.EXPECT().AddSNATRule(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1).Times(3) + + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), fakeRateLimit.Peak, fakeRateLimit.Burst) + mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) + }, + }, + { + name: "Update Egress from rate-limited to non-rate-limited", + existingEgress: &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, + }, + newEgress: &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + }, + existingEgressGroup: &cpv1b2.EgressGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + GroupMembers: []cpv1b2.GroupMember{ + {Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}}, + }, + }, + newEgressGroup: &cpv1b2.EgressGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + GroupMembers: []cpv1b2.GroupMember{ + {Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}}, + }, + }, + expectedEgresses: []*crdv1b1.Egress{ + { + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP1, EgressNode: fakeNode}, + }, + }, + expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) { + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), fakeRateLimit.Peak, fakeRateLimit.Burst) + mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockRouteClient.EXPECT().AddSNATRule(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1).Times(3) + + mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockOFClient.EXPECT().UninstallEgressQoSMeter(uint32(1)) + }, + }, + { + name: "Update Egress rate-limited config", + existingEgress: &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &fakeRateLimit}, + }, + newEgress: &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &newFakeRateLimit}, + }, + existingEgressGroup: &cpv1b2.EgressGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + GroupMembers: []cpv1b2.GroupMember{ + {Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}}, + }, + }, + newEgressGroup: &cpv1b2.EgressGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + GroupMembers: []cpv1b2.GroupMember{ + {Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}}, + }, + }, + expectedEgresses: []*crdv1b1.Egress{ + { + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, RateLimit: &newFakeRateLimit}, + Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP1, EgressNode: fakeNode}, + }, + }, + expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) { + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), fakeRateLimit.Peak, fakeRateLimit.Burst) + mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockRouteClient.EXPECT().AddSNATRule(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1).Times(3) + mockOFClient.EXPECT().InstallEgressQoSMeter(uint32(1), newFakeRateLimit.Peak, newFakeRateLimit.Burst) + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 0347a5b0112..8f62bafe3f1 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -157,6 +157,14 @@ type Client interface { // UninstallPodSNATFlows removes the SNAT flows for the local Pod. UninstallPodSNATFlows(ofPort uint32) error + // InstallEgressQoSMeter installs a OF meter with specific meterID, rate + // and burst used for QoS of Egress. + // `rate` and `burst` are represented as number of mbps. + InstallEgressQoSMeter(meterID, rate, burst uint32) error + + // UninstallEgressQoSMeter removes the OF meter used by QoS of Egress. + UninstallEgressQoSMeter(meterID uint32) error + // Disconnect disconnects the connection between client and OFSwitch. Disconnect() error @@ -791,19 +799,19 @@ func (c *client) initialize() error { } if c.ovsMetersAreSupported { - if err := c.genPacketInMeter(PacketInMeterIDNP, PacketInMeterRateNP).Add(); err != nil { + if err := c.genOFMeter(PacketInMeterIDNP, ofctrl.MeterBurst|ofctrl.MeterPktps, PacketInMeterRateNP, 2*PacketInMeterRateNP).Add(); err != nil { return fmt.Errorf("failed to install OpenFlow meter entry (meterID:%d, rate:%d) for NetworkPolicy packet-in rate limiting: %v", PacketInMeterIDNP, PacketInMeterRateNP, err) } - if err := c.genPacketInMeter(PacketInMeterIDTF, PacketInMeterRateTF).Add(); err != nil { + if err := c.genOFMeter(PacketInMeterIDTF, ofctrl.MeterBurst|ofctrl.MeterPktps, PacketInMeterRateTF, 2*PacketInMeterRateTF).Add(); err != nil { return fmt.Errorf("failed to install OpenFlow meter entry (meterID:%d, rate:%d) for TraceFlow packet-in rate limiting: %v", PacketInMeterIDTF, PacketInMeterRateTF, err) } - if err := c.genPacketInMeter(PacketInMeterIDDNS, PacketInMeterRateDNS).Add(); err != nil { + if err := c.genOFMeter(PacketInMeterIDDNS, ofctrl.MeterBurst|ofctrl.MeterPktps, PacketInMeterRateDNS, 2*PacketInMeterRateDNS).Add(); err != nil { return fmt.Errorf("failed to install OpenFlow meter entry (meterID:%d, rate:%d) for DNS interception packet-in rate limiting: %v", PacketInMeterIDDNS, PacketInMeterRateDNS, err) } } for _, activeFeature := range c.activatedFeatures { - if err := c.ofEntryOperations.AddOFEntries(activeFeature.initGroups()); err != nil { + if err := c.ofEntryOperations.AddOFEntries(activeFeature.initOFEntries()); err != nil { return fmt.Errorf("failed to install feature %v initial groups: %v", activeFeature.getFeatureName(), err) } if err := c.ofEntryOperations.AddAll(activeFeature.initFlows()); err != nil { @@ -923,7 +931,7 @@ func (c *client) generatePipelines() { c.traceableFeatures = append(c.traceableFeatures, c.featureNetworkPolicy) if c.enableEgress { - c.featureEgress = newFeatureEgress(c.cookieAllocator, c.ipProtocols, c.nodeConfig, c.egressConfig) + c.featureEgress = newFeatureEgress(c.cookieAllocator, c.ipProtocols, c.nodeConfig, c.egressConfig, c.ovsMetersAreSupported) c.activatedFeatures = append(c.activatedFeatures, c.featureEgress) } @@ -994,7 +1002,7 @@ func (c *client) InstallSNATMarkFlows(snatIP net.IP, mark uint32) error { cacheKey := fmt.Sprintf("s%x", mark) c.replayMutex.RLock() defer c.replayMutex.RUnlock() - return c.addFlows(c.featureEgress.cachedFlows, cacheKey, []binding.Flow{flow}) + return c.modifyFlows(c.featureEgress.cachedFlows, cacheKey, []binding.Flow{flow}) } func (c *client) UninstallSNATMarkFlows(mark uint32) error { @@ -1009,7 +1017,7 @@ func (c *client) InstallPodSNATFlows(ofPort uint32, snatIP net.IP, snatMark uint cacheKey := fmt.Sprintf("p%x", ofPort) c.replayMutex.RLock() defer c.replayMutex.RUnlock() - return c.addFlows(c.featureEgress.cachedFlows, cacheKey, flows) + return c.modifyFlows(c.featureEgress.cachedFlows, cacheKey, flows) } func (c *client) UninstallPodSNATFlows(ofPort uint32) error { @@ -1019,6 +1027,46 @@ func (c *client) UninstallPodSNATFlows(ofPort uint32) error { return c.deleteFlows(c.featureEgress.cachedFlows, cacheKey) } +func (c *client) InstallEgressQoSMeter(meterID, rate, burst uint32) error { + if !c.ovsMetersAreSupported { + return nil + } + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + + // `rate` and `burst` are represented as number of mbps. So times 1000 to match kbps meterFlag. + meter := c.genOFMeter(binding.MeterIDType(meterID), ofctrl.MeterBurst|ofctrl.MeterKbps, 1000*rate, 1000*burst) + _, installed := c.featureEgress.cachedMeter.Load(meterID) + if !installed { + if err := meter.Add(); err != nil { + return fmt.Errorf("error when installing Egress QoS OF Meter %d: %w", meterID, err) + } + } else { + if err := meter.Modify(); err != nil { + return fmt.Errorf("error when modifying Egress QoS OF Meter %d: %w", meterID, err) + } + } + c.featureEgress.cachedMeter.Store(meterID, meter) + return nil +} + +func (c *client) UninstallEgressQoSMeter(meterID uint32) error { + if !c.ovsMetersAreSupported { + return nil + } + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + mCache, ok := c.featureEgress.cachedMeter.Load(meterID) + if ok { + meter := mCache.(binding.Meter) + if err := meter.Delete(); err != nil { + return fmt.Errorf("error when deleting Egress QoS OF Meter %d: %w", meterID, err) + } + c.featureEgress.cachedMeter.Delete(meterID) + } + return nil +} + func (c *client) ReplayFlows() { c.replayMutex.Lock() defer c.replayMutex.Unlock() @@ -1028,8 +1076,8 @@ func (c *client) ReplayFlows() { } for _, activeFeature := range c.activatedFeatures { - if err := c.ofEntryOperations.AddOFEntries(activeFeature.replayGroups()); err != nil { - klog.ErrorS(err, "Error when replaying feature groups", "feature", activeFeature.getFeatureName()) + if err := c.ofEntryOperations.AddOFEntries(activeFeature.replayOFEntries()); err != nil { + klog.ErrorS(err, "Error when replaying feature OF entries", "feature", activeFeature.getFeatureName()) } if err := c.ofEntryOperations.AddAll(activeFeature.replayFlows()); err != nil { klog.ErrorS(err, "Error when replaying feature flows", "feature", activeFeature.getFeatureName()) diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index 0e14c82734d..859ced209cb 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -2522,7 +2522,7 @@ func Test_client_ReplayFlows(t *testing.T) { "cookie=0x1060000000000, table=L3Forwarding, priority=199,ip,reg0=0x2000/0x2000,nw_dst=192.168.78.101 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:f0->eth_dst,set_field:192.168.78.101->tun_dst,set_field:0x10/0xf0->reg0,goto_table:L3DecTTL", ) // Feature Network Policy replays flows. - fc.featureNetworkPolicy.initGroups() + fc.featureNetworkPolicy.initOFEntries() expectedGroups := []string{ "group_id=1,type=all,bucket=bucket_id:0,actions=resubmit:EgressRule,bucket=bucket_id:1,actions=set_field:0x400000/0x600000->reg0,resubmit:Output", "group_id=2,type=all,bucket=bucket_id:0,actions=resubmit:EgressMetric,bucket=bucket_id:1,actions=set_field:0x400000/0x600000->reg0,resubmit:Output", diff --git a/pkg/agent/openflow/egress.go b/pkg/agent/openflow/egress.go index f5f42d3276e..a83790d0761 100644 --- a/pkg/agent/openflow/egress.go +++ b/pkg/agent/openflow/egress.go @@ -16,6 +16,7 @@ package openflow import ( "net" + "sync" "antrea.io/libOpenflow/openflow15" @@ -29,12 +30,14 @@ type featureEgress struct { ipProtocols []binding.Protocol cachedFlows *flowCategoryCache + cachedMeter sync.Map exceptCIDRs map[binding.Protocol][]net.IPNet nodeIPs map[binding.Protocol]net.IP gatewayMAC net.HardwareAddr - category cookie.Category + category cookie.Category + ovsMetersAreSupported bool } func (f *featureEgress) getFeatureName() string { @@ -44,7 +47,8 @@ func (f *featureEgress) getFeatureName() string { func newFeatureEgress(cookieAllocator cookie.Allocator, ipProtocols []binding.Protocol, nodeConfig *config.NodeConfig, - egressConfig *config.EgressConfig) *featureEgress { + egressConfig *config.EgressConfig, + ovsMetersAreSupported bool) *featureEgress { exceptCIDRs := make(map[binding.Protocol][]net.IPNet) for _, cidr := range egressConfig.ExceptCIDRs { if cidr.IP.To4() == nil { @@ -63,13 +67,15 @@ func newFeatureEgress(cookieAllocator cookie.Allocator, } } return &featureEgress{ - cachedFlows: newFlowCategoryCache(), - cookieAllocator: cookieAllocator, - exceptCIDRs: exceptCIDRs, - ipProtocols: ipProtocols, - nodeIPs: nodeIPs, - gatewayMAC: nodeConfig.GatewayConfig.MAC, - category: cookie.Egress, + cachedFlows: newFlowCategoryCache(), + cachedMeter: sync.Map{}, + cookieAllocator: cookieAllocator, + exceptCIDRs: exceptCIDRs, + ipProtocols: ipProtocols, + nodeIPs: nodeIPs, + gatewayMAC: nodeConfig.GatewayConfig.MAC, + category: cookie.Egress, + ovsMetersAreSupported: ovsMetersAreSupported, } } @@ -83,10 +89,17 @@ func (f *featureEgress) replayFlows() []*openflow15.FlowMod { return getCachedFlowMessages(f.cachedFlows) } -func (f *featureEgress) initGroups() []binding.OFEntry { +func (f *featureEgress) initOFEntries() []binding.OFEntry { return nil } -func (f *featureEgress) replayGroups() []binding.OFEntry { - return nil +func (f *featureEgress) replayOFEntries() []binding.OFEntry { + var meters []binding.OFEntry + f.cachedMeter.Range(func(id, value interface{}) bool { + meter := value.(binding.Meter) + meter.Reset() + meters = append(meters, meter) + return true + }) + return meters } diff --git a/pkg/agent/openflow/externalnode_connectivity.go b/pkg/agent/openflow/externalnode_connectivity.go index 8d1ec850e60..c78e6121c1d 100644 --- a/pkg/agent/openflow/externalnode_connectivity.go +++ b/pkg/agent/openflow/externalnode_connectivity.go @@ -153,11 +153,11 @@ func (f *featureExternalNodeConnectivity) replayFlows() []*openflow15.FlowMod { return flows } -func (f *featureExternalNodeConnectivity) initGroups() []binding.OFEntry { +func (f *featureExternalNodeConnectivity) initOFEntries() []binding.OFEntry { return nil } -func (f *featureExternalNodeConnectivity) replayGroups() []binding.OFEntry { +func (f *featureExternalNodeConnectivity) replayOFEntries() []binding.OFEntry { return nil } diff --git a/pkg/agent/openflow/framework.go b/pkg/agent/openflow/framework.go index f26dcf6c952..b2684168fd0 100644 --- a/pkg/agent/openflow/framework.go +++ b/pkg/agent/openflow/framework.go @@ -57,12 +57,12 @@ type feature interface { getFeatureName() string // getRequiredTables returns a slice of required tables of the feature. getRequiredTables() []*Table - // initGroups returns the OpenFlow groups of the feature needed in the initialization. - initGroups() []binding.OFEntry + // initOFEntries returns the OpenFlow entries, e.g. Groups, Meters, of the feature needed in the initialization. + initOFEntries() []binding.OFEntry // initFlows returns the Openflow messages of initial flows of the feature. initFlows() []*openflow15.FlowMod - // replayGroups returns the fixed and cached Openflow groups that need to be replayed after OVS is reconnected. - replayGroups() []binding.OFEntry + // replayOFEntries returns the fixed and cached Openflow entries, e.g. Groups, Meters, that need to be replayed after OVS is reconnected. + replayOFEntries() []binding.OFEntry // replayFlows returns the Openflow messages of fixed and cached flows that need to be replayed after OVS is reconnected. replayFlows() []*openflow15.FlowMod } diff --git a/pkg/agent/openflow/multicast.go b/pkg/agent/openflow/multicast.go index 5cc90b79e5c..a30395af442 100644 --- a/pkg/agent/openflow/multicast.go +++ b/pkg/agent/openflow/multicast.go @@ -181,7 +181,7 @@ func (f *featureMulticast) multicastPodMetricFlows(podIP net.IP, podOFPort uint3 } } -func (f *featureMulticast) replayGroups() []binding.OFEntry { +func (f *featureMulticast) replayOFEntries() []binding.OFEntry { var groups []binding.OFEntry f.groupCache.Range(func(id, value interface{}) bool { group := value.(binding.Group) @@ -192,7 +192,7 @@ func (f *featureMulticast) replayGroups() []binding.OFEntry { return groups } -func (f *featureMulticast) initGroups() []binding.OFEntry { +func (f *featureMulticast) initOFEntries() []binding.OFEntry { return nil } diff --git a/pkg/agent/openflow/multicluster.go b/pkg/agent/openflow/multicluster.go index a6ee9b2e909..cf8dce9fd6b 100644 --- a/pkg/agent/openflow/multicluster.go +++ b/pkg/agent/openflow/multicluster.go @@ -70,11 +70,11 @@ func (f *featureMulticluster) replayFlows() []*openflow15.FlowMod { return getCachedFlowMessages(f.cachedFlows) } -func (f *featureMulticluster) initGroups() []binding.OFEntry { +func (f *featureMulticluster) initOFEntries() []binding.OFEntry { return nil } -func (f *featureMulticluster) replayGroups() []binding.OFEntry { +func (f *featureMulticluster) replayOFEntries() []binding.OFEntry { return nil } diff --git a/pkg/agent/openflow/network_policy.go b/pkg/agent/openflow/network_policy.go index e2ce2390d70..55f0e4e6469 100644 --- a/pkg/agent/openflow/network_policy.go +++ b/pkg/agent/openflow/network_policy.go @@ -2269,8 +2269,8 @@ func (f *featureNetworkPolicy) initLoggingFlows() []binding.Flow { return flows } -func (f *featureNetworkPolicy) initGroups() []binding.OFEntry { - var groups []binding.OFEntry +func (f *featureNetworkPolicy) initOFEntries() []binding.OFEntry { + var entries []binding.OFEntry candidateTables := []*Table{EgressRuleTable, EgressMetricTable, IngressRuleTable, IngressMetricTable} if f.enableMulticast { candidateTables = append(candidateTables, MulticastEgressMetricTable, MulticastIngressMetricTable) @@ -2279,7 +2279,7 @@ func (f *featureNetworkPolicy) initGroups() []binding.OFEntry { groupKey := fmt.Sprintf("%d", nextTable.GetID()) obj, ok := f.loggingGroupCache.Load(groupKey) if ok { - groups = append(groups, obj.(binding.Group)) + entries = append(entries, obj.(binding.Group)) continue } // Create OpenFlow group to both log Antrea-native policy events and resubmit the packet to nextTable. @@ -2296,9 +2296,9 @@ func (f *featureNetworkPolicy) initGroups() []binding.OFEntry { ResubmitToTable(OutputTable.GetID()). Done() f.loggingGroupCache.Store(groupKey, group) - groups = append(groups, group) + entries = append(entries, group) } - return groups + return entries } func (f *featureNetworkPolicy) getLoggingAndResubmitGroupID(nextTable uint8) binding.GroupIDType { @@ -2307,6 +2307,6 @@ func (f *featureNetworkPolicy) getLoggingAndResubmitGroupID(nextTable uint8) bin return group.(binding.Group).GetID() } -func (f *featureNetworkPolicy) replayGroups() []binding.OFEntry { +func (f *featureNetworkPolicy) replayOFEntries() []binding.OFEntry { return nil } diff --git a/pkg/agent/openflow/network_policy_test.go b/pkg/agent/openflow/network_policy_test.go index 833cd0edb9c..e5e15375e92 100644 --- a/pkg/agent/openflow/network_policy_test.go +++ b/pkg/agent/openflow/network_policy_test.go @@ -1373,13 +1373,13 @@ func networkPolicyInitFlows(ovsMeterSupported, externalNodeEnabled, l7NetworkPol } if ovsMeterSupported { loggingFlows = []string{ - "cookie=0x1020000000000, table=Output, priority=200,reg0=0x2400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.01,max_len=128)", - "cookie=0x1020000000000, table=Output, priority=200,reg0=0x4400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.02,max_len=128)", - "cookie=0x1020000000000, table=Output, priority=200,reg0=0x6400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.03,max_len=128)", - "cookie=0x1020000000000, table=Output, priority=200,reg0=0x8400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.04,max_len=128)", - "cookie=0x1020000000000, table=Output, priority=200,reg0=0xa400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.05,max_len=128)", - "cookie=0x1020000000000, table=Output, priority=200,reg0=0xc400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.06,max_len=128)", - "cookie=0x1020000000000, table=Output, priority=200,reg0=0xe400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.07,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0x2400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.01,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0x4400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.02,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0x6400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.03,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0x8400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.04,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0xa400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.05,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0xc400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.06,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0xe400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.07,max_len=128)", } } if externalNodeEnabled { @@ -1455,7 +1455,7 @@ func Test_NewDNSPacketInConjunction(t *testing.T) { } if ovsMetersSupported { ipv4ExpFlows = []string{ - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:3,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:258,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp,tp_src=53 actions=conjunction(1,1/2)", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp,tp_src=53 actions=conjunction(1,1/2)", } @@ -1467,7 +1467,7 @@ func Test_NewDNSPacketInConjunction(t *testing.T) { } if ovsMetersSupported { ipv6ExpFlows = []string{ - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:3,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:258,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp6,tp_src=53 actions=conjunction(1,1/2)", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp6,tp_src=53 actions=conjunction(1,1/2)", } @@ -1481,7 +1481,7 @@ func Test_NewDNSPacketInConjunction(t *testing.T) { } if ovsMetersSupported { dsExpFlows = []string{ - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:3,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:258,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp,tp_src=53 actions=conjunction(1,1/2)", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp,tp_src=53 actions=conjunction(1,1/2)", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp6,tp_src=53 actions=conjunction(1,1/2)", diff --git a/pkg/agent/openflow/packetin.go b/pkg/agent/openflow/packetin.go index 1dc02b799d5..cbccf72e7f5 100644 --- a/pkg/agent/openflow/packetin.go +++ b/pkg/agent/openflow/packetin.go @@ -77,10 +77,11 @@ const ( PacketInNPStoreDenyOperation = 0b100 // We use OpenFlow Meter for packetIn rate limiting on OVS side. - // Meter Entry ID. - PacketInMeterIDNP = 1 - PacketInMeterIDTF = 2 - PacketInMeterIDDNS = 3 + // Meter Entry ID. Note: 1-255 are reserved for Egress QoS. + PacketInMeterIDNP = 256 + PacketInMeterIDTF = 257 + PacketInMeterIDDNS = 258 + // Meter Entry Rate. It is represented as number of events per second. // Packets which exceed the rate will be dropped. PacketInMeterRateNP = 500 diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 918ebd0e62f..af41e063fca 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -2293,13 +2293,18 @@ func (f *featureEgress) snatSkipNodeFlow(nodeIP net.IP) binding.Flow { // packet's tunnel destination IP. func (f *featureEgress) snatIPFromTunnelFlow(snatIP net.IP, mark uint32) binding.Flow { ipProtocol := getIPProtocol(snatIP) - return EgressMarkTable.ofTable.BuildFlow(priorityNormal). + fb := EgressMarkTable.ofTable.BuildFlow(priorityNormal). Cookie(f.cookieAllocator.Request(f.category).Raw()). MatchProtocol(ipProtocol). MatchCTStateNew(true). MatchCTStateTrk(true). - MatchTunnelDst(snatIP). - Action().LoadPktMarkRange(mark, snatPktMarkRange). + MatchTunnelDst(snatIP) + if f.ovsMetersAreSupported { + if _, ok := f.cachedMeter.Load(mark); ok { + fb = fb.Action().Meter(mark) + } + } + return fb.Action().LoadPktMarkRange(mark, snatPktMarkRange). Action().LoadRegMark(ToGatewayRegMark). Action().GotoStage(stageSwitching). Done() @@ -2313,13 +2318,18 @@ func (f *featureEgress) snatRuleFlow(ofPort uint32, snatIP net.IP, snatMark uint ipProtocol := getIPProtocol(snatIP) if snatMark != 0 { // Local SNAT IP. - return EgressMarkTable.ofTable.BuildFlow(priorityNormal). + fb := EgressMarkTable.ofTable.BuildFlow(priorityNormal). Cookie(cookieID). MatchProtocol(ipProtocol). MatchCTStateNew(true). MatchCTStateTrk(true). - MatchInPort(ofPort). - Action().LoadPktMarkRange(snatMark, snatPktMarkRange). + MatchInPort(ofPort) + if f.ovsMetersAreSupported { + if _, ok := f.cachedMeter.Load(snatMark); ok { + fb = fb.Action().Meter(snatMark) + } + } + return fb.Action().LoadPktMarkRange(snatMark, snatPktMarkRange). Action().LoadRegMark(ToGatewayRegMark). Action().GotoStage(stageSwitching). Done() @@ -2745,15 +2755,14 @@ func priorityIndexFunc(obj interface{}) ([]string, error) { return conj.ActionFlowPriorities(), nil } -// genPacketInMeter generates a meter entry with specific meterID and rate. -// `rate` is represented as number of packets per second. -// Packets which exceed the rate will be dropped. -func (c *client) genPacketInMeter(meterID binding.MeterIDType, rate uint32) binding.Meter { - meter := c.bridge.NewMeter(meterID, ofctrl.MeterBurst|ofctrl.MeterPktps). +// genOFMeter generates a meter entry with specific meterID, meterFlag, rate and +// burst. Packets which exceed the rate will be dropped. +func (c *client) genOFMeter(meterID binding.MeterIDType, meterFlags ofctrl.MeterFlag, rate uint32, burst uint32) binding.Meter { + meter := c.bridge.NewMeter(meterID, meterFlags). MeterBand(). MeterType(ofctrl.MeterDrop). Rate(rate). - Burst(2 * rate). + Burst(burst). Done() return meter } diff --git a/pkg/agent/openflow/pod_connectivity.go b/pkg/agent/openflow/pod_connectivity.go index 98ae8ab82e7..08a17648c51 100644 --- a/pkg/agent/openflow/pod_connectivity.go +++ b/pkg/agent/openflow/pod_connectivity.go @@ -275,10 +275,10 @@ func (f *featurePodConnectivity) trafficControlCommonFlows() []binding.Flow { } } -func (f *featurePodConnectivity) initGroups() []binding.OFEntry { +func (f *featurePodConnectivity) initOFEntries() []binding.OFEntry { return nil } -func (f *featurePodConnectivity) replayGroups() []binding.OFEntry { +func (f *featurePodConnectivity) replayOFEntries() []binding.OFEntry { return nil } diff --git a/pkg/agent/openflow/service.go b/pkg/agent/openflow/service.go index 9c37988dcdd..a22bc565737 100644 --- a/pkg/agent/openflow/service.go +++ b/pkg/agent/openflow/service.go @@ -183,7 +183,7 @@ func (f *featureService) replayFlows() []*openflow15.FlowMod { return getCachedFlowMessages(f.cachedFlows) } -func (f *featureService) replayGroups() []binding.OFEntry { +func (f *featureService) replayOFEntries() []binding.OFEntry { var groups []binding.OFEntry f.groupCache.Range(func(id, value interface{}) bool { group := value.(binding.Group) @@ -194,6 +194,6 @@ func (f *featureService) replayGroups() []binding.OFEntry { return groups } -func (f *featureService) initGroups() []binding.OFEntry { +func (f *featureService) initOFEntries() []binding.OFEntry { return nil } diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 1cd0a2e75ba..9520bd053ce 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -260,6 +260,20 @@ func (mr *MockClientMockRecorder) Initialize(arg0, arg1, arg2, arg3, arg4, arg5 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Initialize", reflect.TypeOf((*MockClient)(nil).Initialize), arg0, arg1, arg2, arg3, arg4, arg5) } +// InstallEgressQoSMeter mocks base method +func (m *MockClient) InstallEgressQoSMeter(arg0, arg1, arg2 uint32) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallEgressQoSMeter", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallEgressQoSMeter indicates an expected call of InstallEgressQoSMeter +func (mr *MockClientMockRecorder) InstallEgressQoSMeter(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallEgressQoSMeter", reflect.TypeOf((*MockClient)(nil).InstallEgressQoSMeter), arg0, arg1, arg2) +} + // InstallEndpointFlows mocks base method func (m *MockClient) InstallEndpointFlows(arg0 openflow.Protocol, arg1 []proxy.Endpoint) error { m.ctrl.T.Helper() @@ -814,6 +828,20 @@ func (mr *MockClientMockRecorder) SubscribePacketIn(arg0, arg1 interface{}) *gom return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribePacketIn", reflect.TypeOf((*MockClient)(nil).SubscribePacketIn), arg0, arg1) } +// UninstallEgressQoSMeter mocks base method +func (m *MockClient) UninstallEgressQoSMeter(arg0 uint32) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UninstallEgressQoSMeter", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// UninstallEgressQoSMeter indicates an expected call of UninstallEgressQoSMeter +func (mr *MockClientMockRecorder) UninstallEgressQoSMeter(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallEgressQoSMeter", reflect.TypeOf((*MockClient)(nil).UninstallEgressQoSMeter), arg0) +} + // UninstallEndpointFlows mocks base method func (m *MockClient) UninstallEndpointFlows(arg0 openflow.Protocol, arg1 []proxy.Endpoint) error { m.ctrl.T.Helper() diff --git a/pkg/agent/openflow/traceflow.go b/pkg/agent/openflow/traceflow.go index 451710d4794..f552f3f857e 100644 --- a/pkg/agent/openflow/traceflow.go +++ b/pkg/agent/openflow/traceflow.go @@ -42,10 +42,10 @@ func (f *featureTraceflow) replayFlows() []*openflow15.FlowMod { return []*openflow15.FlowMod{} } -func (f *featureTraceflow) initGroups() []binding.OFEntry { +func (f *featureTraceflow) initOFEntries() []binding.OFEntry { return nil } -func (f *featureTraceflow) replayGroups() []binding.OFEntry { +func (f *featureTraceflow) replayOFEntries() []binding.OFEntry { return nil } diff --git a/pkg/apis/crd/v1beta1/types.go b/pkg/apis/crd/v1beta1/types.go index ec3bb1778b7..0b79a54660a 100644 --- a/pkg/apis/crd/v1beta1/types.go +++ b/pkg/apis/crd/v1beta1/types.go @@ -859,6 +859,16 @@ type EgressSpec struct { // same index in EgressIPs and ExternalIPPools are correlated. // Cannot be set with ExternalIPPool. ExternalIPPools []string `json:"externalIPPools,omitempty"` + // RateLimit specifies the rate limit of north-south egress traffic of this Egress. + // If RateLimit is set, we assume EgressIP(s) of this Egress is(are) not shared with other Egresses. + RateLimit *RateLimit `json:"rateLimit,omitempty"` +} + +type RateLimit struct { + // Rate specifies the maximum mbps. + Peak uint32 `json:"peak"` + // Burst specifies maximum burst mbps for throttle. + Burst uint32 `json:"burst"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/crd/v1beta1/zz_generated.deepcopy.go b/pkg/apis/crd/v1beta1/zz_generated.deepcopy.go index 7ba1a16afe0..5390a4affd6 100644 --- a/pkg/apis/crd/v1beta1/zz_generated.deepcopy.go +++ b/pkg/apis/crd/v1beta1/zz_generated.deepcopy.go @@ -501,6 +501,11 @@ func (in *EgressSpec) DeepCopyInto(out *EgressSpec) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.RateLimit != nil { + in, out := &in.RateLimit, &out.RateLimit + *out = new(RateLimit) + **out = **in + } return } @@ -1374,6 +1379,22 @@ func (in *PeerService) DeepCopy() *PeerService { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RateLimit) DeepCopyInto(out *RateLimit) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RateLimit. +func (in *RateLimit) DeepCopy() *RateLimit { + if in == nil { + return nil + } + out := new(RateLimit) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Rule) DeepCopyInto(out *Rule) { *out = *in diff --git a/pkg/apiserver/openapi/zz_generated.openapi.go b/pkg/apiserver/openapi/zz_generated.openapi.go index 34f4fce09d1..b52890f1efd 100644 --- a/pkg/apiserver/openapi/zz_generated.openapi.go +++ b/pkg/apiserver/openapi/zz_generated.openapi.go @@ -126,6 +126,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "antrea.io/antrea/pkg/apis/crd/v1beta1.Packet": schema_pkg_apis_crd_v1beta1_Packet(ref), "antrea.io/antrea/pkg/apis/crd/v1beta1.PeerNamespaces": schema_pkg_apis_crd_v1beta1_PeerNamespaces(ref), "antrea.io/antrea/pkg/apis/crd/v1beta1.PeerService": schema_pkg_apis_crd_v1beta1_PeerService(ref), + "antrea.io/antrea/pkg/apis/crd/v1beta1.RateLimit": schema_pkg_apis_crd_v1beta1_RateLimit(ref), "antrea.io/antrea/pkg/apis/crd/v1beta1.Rule": schema_pkg_apis_crd_v1beta1_Rule(ref), "antrea.io/antrea/pkg/apis/crd/v1beta1.Source": schema_pkg_apis_crd_v1beta1_Source(ref), "antrea.io/antrea/pkg/apis/crd/v1beta1.TCPHeader": schema_pkg_apis_crd_v1beta1_TCPHeader(ref), @@ -3506,12 +3507,18 @@ func schema_pkg_apis_crd_v1beta1_EgressSpec(ref common.ReferenceCallback) common }, }, }, + "rateLimit": { + SchemaProps: spec.SchemaProps{ + Description: "RateLimit specifies the rate limit of north-south egress traffic of this Egress. If RateLimit is set, we assume EgressIP(s) of this Egress is(are) not shared with other Egresses.", + Ref: ref("antrea.io/antrea/pkg/apis/crd/v1beta1.RateLimit"), + }, + }, }, Required: []string{"appliedTo"}, }, }, Dependencies: []string{ - "antrea.io/antrea/pkg/apis/crd/v1beta1.AppliedTo"}, + "antrea.io/antrea/pkg/apis/crd/v1beta1.AppliedTo", "antrea.io/antrea/pkg/apis/crd/v1beta1.RateLimit"}, } } @@ -4986,6 +4993,35 @@ func schema_pkg_apis_crd_v1beta1_PeerService(ref common.ReferenceCallback) commo } } +func schema_pkg_apis_crd_v1beta1_RateLimit(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "peak": { + SchemaProps: spec.SchemaProps{ + Description: "Rate specifies the maximum mbps.", + Default: 0, + Type: []string{"integer"}, + Format: "int64", + }, + }, + "burst": { + SchemaProps: spec.SchemaProps{ + Description: "Burst specifies maximum burst mbps for throttle.", + Default: 0, + Type: []string{"integer"}, + Format: "int64", + }, + }, + }, + Required: []string{"peak", "burst"}, + }, + }, + } +} + func schema_pkg_apis_crd_v1beta1_Rule(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ diff --git a/pkg/controller/egress/controller.go b/pkg/controller/egress/controller.go index cb93f600b7d..c55c6e6f593 100644 --- a/pkg/controller/egress/controller.go +++ b/pkg/controller/egress/controller.go @@ -57,6 +57,8 @@ const ( egressGroupType grouping.GroupType = "egressGroup" externalIPPoolIndex = "externalIPPool" + + egressIPIndex = "egressIP" ) // ipAllocation contains the IP and the IP Pool which allocates it. @@ -137,6 +139,23 @@ func NewEgressController(crdClient clientset.Interface, } return externalIPPools, nil }}) + // externalIPPoolIndex will be used to get all Egresses associated with a given EgressIP. + egressInformer.Informer().AddIndexers(cache.Indexers{egressIPIndex: func(obj interface{}) (strings []string, e error) { + egress, ok := obj.(*egressv1beta1.Egress) + if !ok { + return nil, fmt.Errorf("obj is not Egress: %+v", obj) + } + var egressIPs []string + if egress.Spec.EgressIP != "" { + egressIPs = append(egressIPs, egress.Spec.EgressIP) + } + for _, egressIP := range egress.Spec.EgressIPs { + if egressIP != "" { + egressIPs = append(egressIPs, egressIP) + } + } + return egressIPs, nil + }}) c.externalIPAllocator.AddEventHandler(func(ipPool string) { c.enqueueEgresses(ipPool) }) diff --git a/pkg/controller/egress/controller_test.go b/pkg/controller/egress/controller_test.go index e0be9bd0edc..b4bdac926f7 100644 --- a/pkg/controller/egress/controller_test.go +++ b/pkg/controller/egress/controller_test.go @@ -67,7 +67,7 @@ var ( eipFoo2 = newExternalIPPool("pool2", "", "2.2.2.10", "2.2.2.20") ) -func newEgress(name, egressIP, externalIPPool string, podSelector, namespaceSelector *metav1.LabelSelector) *v1beta1.Egress { +func newEgress(name, egressIP, externalIPPool string, podSelector, namespaceSelector *metav1.LabelSelector, rateLimit *v1beta1.RateLimit) *v1beta1.Egress { egress := &v1beta1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: name}, Spec: v1beta1.EgressSpec{ @@ -77,6 +77,7 @@ func newEgress(name, egressIP, externalIPPool string, podSelector, namespaceSele }, EgressIP: egressIP, ExternalIPPool: externalIPPool, + RateLimit: rateLimit, }, } return egress diff --git a/pkg/controller/egress/validate.go b/pkg/controller/egress/validate.go index 061758afc01..d73a085c826 100644 --- a/pkg/controller/egress/validate.go +++ b/pkg/controller/egress/validate.go @@ -53,6 +53,21 @@ func (c *EgressController) ValidateEgress(review *admv1.AdmissionReview) *admv1. if len(newEgress.Spec.ExternalIPPools) > 0 { return false, "spec.externalIPPools is not supported yet" } + // Validate if an EgressIP used by a rate-limited Egress is shared. + if newEgress.Spec.EgressIP != "" { + objs, _ := c.egressInformer.Informer().GetIndexer().ByIndex(egressIPIndex, newEgress.Spec.EgressIP) + if newEgress.Spec.RateLimit != nil && len(objs) > 0 { + if len(objs) > 1 || objs[0].(*crdv1beta1.Egress).Name != newEgress.Name { + return false, fmt.Sprintf("Egress with rate-limit can't share EgressIP %s with other Egresses", newEgress.Spec.EgressIP) + } + } + for _, obj := range objs { + egress := obj.(*crdv1beta1.Egress) + if egress.Spec.RateLimit != nil && egress.Name != newEgress.Name { + return false, fmt.Sprintf("EgressIP %s used by other Egress with rate-limit can't be shared", newEgress.Spec.EgressIP) + } + } + } // Allow it if EgressIP and ExternalIPPool don't change. if newEgress.Spec.EgressIP == oldEgress.Spec.EgressIP && newEgress.Spec.ExternalIPPool == oldEgress.Spec.ExternalIPPool { return true, "" diff --git a/pkg/controller/egress/validate_test.go b/pkg/controller/egress/validate_test.go index 29105707337..a4e14d9c372 100644 --- a/pkg/controller/egress/validate_test.go +++ b/pkg/controller/egress/validate_test.go @@ -34,9 +34,16 @@ func marshal(object runtime.Object) []byte { } func TestEgressControllerValidateEgress(t *testing.T) { + var ( + rateLimit = crdv1beta1.RateLimit{ + Peak: 500, + Burst: 1000, + } + ) tests := []struct { name string existingExternalIPPool *crdv1beta1.ExternalIPPool + existingEgress *crdv1beta1.Egress request *admv1.AdmissionRequest expectedResponse *admv1.AdmissionResponse }{ @@ -46,7 +53,7 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "CREATE", - Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "nonExistingPool", nil, nil))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "nonExistingPool", nil, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{ Allowed: false, @@ -61,7 +68,7 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "CREATE", - Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.11.1", "bar", nil, nil))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.11.1", "bar", nil, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{ Allowed: false, @@ -76,7 +83,7 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "CREATE", - Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{Allowed: true}, }, @@ -86,8 +93,8 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "UPDATE", - OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil))}, - Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.11.1", "bar", nil, nil))}, + OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil, nil))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.11.1", "bar", nil, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{ Allowed: false, @@ -102,8 +109,8 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "UPDATE", - OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil))}, - Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.2", "bar", nil, nil))}, + OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil, nil))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.2", "bar", nil, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{Allowed: true}, }, @@ -113,10 +120,10 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "UPDATE", - OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil))}, + OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil, nil))}, Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.2", "bar", &metav1.LabelSelector{ MatchLabels: map[string]string{"foo": "bar"}, - }, nil))}, + }, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{Allowed: true}, }, @@ -125,7 +132,57 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "DELETE", - Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.2", "bar", nil, nil))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.2", "bar", nil, nil, nil))}, + }, + expectedResponse: &admv1.AdmissionResponse{Allowed: true}, + }, + { + name: "Creating an Egress with rate-limiting sharing IP with exist Egresses should not be allowed", + request: &admv1.AdmissionRequest{ + Name: "foo", + Operation: "CREATE", + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "", nil, nil, &rateLimit))}, + }, + existingEgress: newEgress("bar", "10.10.10.1", "", nil, nil, nil), + expectedResponse: &admv1.AdmissionResponse{ + Allowed: false, + Result: &metav1.Status{ + Message: "Egress with rate-limit can't share EgressIP 10.10.10.1 with other Egresses", + }, + }, + }, + { + name: "Creating an Egress sharing IP with an exist Egress with rate-limit should not be allowed", + request: &admv1.AdmissionRequest{ + Name: "foo", + Operation: "CREATE", + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "", nil, nil, nil))}, + }, + existingEgress: newEgress("bar", "10.10.10.1", "", nil, nil, &rateLimit), + expectedResponse: &admv1.AdmissionResponse{ + Allowed: false, + Result: &metav1.Status{ + Message: "EgressIP 10.10.10.1 used by other Egress with rate-limit can't be shared", + }, + }, + }, + { + name: "Creating an Egress with rate-limiting not sharing IP with exist Egresses should be allowed", + request: &admv1.AdmissionRequest{ + Name: "foo", + Operation: "CREATE", + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "", nil, nil, &rateLimit))}, + }, + existingEgress: newEgress("bar", "10.10.10.2", "", nil, nil, nil), + expectedResponse: &admv1.AdmissionResponse{Allowed: true}, + }, + { + name: "Update an Egress rate-limiting config should be allowed", + request: &admv1.AdmissionRequest{ + Name: "foo", + Operation: "UPDATE", + OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "", nil, nil, &rateLimit))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "", nil, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{Allowed: true}, }, @@ -138,6 +195,9 @@ func TestEgressControllerValidateEgress(t *testing.T) { if tt.existingExternalIPPool != nil { objs = append(objs, tt.existingExternalIPPool) } + if tt.existingEgress != nil { + objs = append(objs, tt.existingEgress) + } controller := newController(nil, objs) controller.informerFactory.Start(stopCh) controller.crdInformerFactory.Start(stopCh) diff --git a/test/e2e/egress_test.go b/test/e2e/egress_test.go index e9c01691779..a533fd091b7 100644 --- a/test/e2e/egress_test.go +++ b/test/e2e/egress_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "net" + "strconv" "strings" "testing" "time" @@ -63,6 +64,7 @@ func TestEgress(t *testing.T) { t.Run("testEgressUpdateNodeSelector", func(t *testing.T) { testEgressUpdateNodeSelector(t, data) }) t.Run("testEgressNodeFailure", func(t *testing.T) { testEgressNodeFailure(t, data) }) t.Run("testCreateExternalIPPool", func(t *testing.T) { testCreateExternalIPPool(t, data) }) + t.Run("testUpdateRateLimit", func(t *testing.T) { testEgressUpdateRateLimit(t, data) }) } func testCreateExternalIPPool(t *testing.T, data *TestData) { @@ -203,7 +205,7 @@ func testEgressClientIP(t *testing.T, data *TestData) { Operator: metav1.LabelSelectorOpExists, }, } - egress := data.createEgress(t, "egress-", matchExpressions, nil, "", egressNodeIP) + egress := data.createEgress(t, "egress-", matchExpressions, nil, "", egressNodeIP, nil) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) assertClientIP(localPod, egressNodeIP) assertClientIP(remotePod, egressNodeIP) @@ -355,7 +357,7 @@ func testEgressCRUD(t *testing.T, data *TestData) { pool := data.createExternalIPPool(t, "crud-pool-", tt.ipRange, tt.nodeSelector.MatchExpressions, tt.nodeSelector.MatchLabels) defer data.crdClient.CrdV1beta1().ExternalIPPools().Delete(context.TODO(), pool.Name, metav1.DeleteOptions{}) - egress := data.createEgress(t, "crud-egress-", nil, map[string]string{"foo": "bar"}, pool.Name, "") + egress := data.createEgress(t, "crud-egress-", nil, map[string]string{"foo": "bar"}, pool.Name, "", nil) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) // Use Poll to wait the interval before the first run to detect the case that the IP is assigned to any Node // when it's not supposed to. @@ -472,7 +474,7 @@ func testEgressUpdateEgressIP(t *testing.T, data *TestData) { newPool := data.createExternalIPPool(t, "newpool-", tt.newIPRange, nil, map[string]string{v1.LabelHostname: tt.newNode}) defer data.crdClient.CrdV1beta1().ExternalIPPools().Delete(context.TODO(), newPool.Name, metav1.DeleteOptions{}) - egress := data.createEgress(t, "egress-", nil, map[string]string{"foo": "bar"}, originalPool.Name, "") + egress := data.createEgress(t, "egress-", nil, map[string]string{"foo": "bar"}, originalPool.Name, "", nil) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) egress, err := data.checkEgressState(egress.Name, tt.originalEgressIP, tt.originalNode, "", time.Second) require.NoError(t, err) @@ -620,7 +622,7 @@ func testEgressMigration(t *testing.T, data *TestData, triggerFunc, revertFunc f externalIPPoolTwoNodes := data.createExternalIPPool(t, "pool-with-two-nodes-", *ipRange, matchExpressions, nil) defer data.crdClient.CrdV1beta1().ExternalIPPools().Delete(context.TODO(), externalIPPoolTwoNodes.Name, metav1.DeleteOptions{}) - egress := data.createEgress(t, "migration-egress-", nil, map[string]string{"foo": "bar"}, externalIPPoolTwoNodes.Name, "") + egress := data.createEgress(t, "migration-egress-", nil, map[string]string{"foo": "bar"}, externalIPPoolTwoNodes.Name, "", nil) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) var err error @@ -664,6 +666,172 @@ func testEgressMigration(t *testing.T, data *TestData, triggerFunc, revertFunc f checkIPNeighbor(fromNode) } +func testEgressUpdateRateLimit(t *testing.T, data *TestData) { + var ( + rateLimit1 = &v1beta1.RateLimit{ + Peak: 100, + Burst: 200, + } + rateLimit2 = &v1beta1.RateLimit{ + Peak: 500, + Burst: 1000, + } + ) + + egressNode := controlPlaneNodeName() + var egressNodeIP string + if clusterInfo.podV4NetworkCIDR != "" { + egressNodeIP = controlPlaneNodeIPv4() + } else { + egressNodeIP = controlPlaneNodeIPv6() + } + egressNodeAntreaPodName, err := data.getAntreaPodOnNode(egressNode) + nonEgressNodeAntreaPodName, err := data.getAntreaPodOnNode(workerNodeName(1)) + if err != nil { + t.Fatalf("Failed to get AntreaPod on Node: %v", err) + } + + if err := data.createBusyboxPodOnNode("local-pod", data.testNamespace, egressNode, false); err != nil { + t.Fatalf("Failed to create local Pod: %v", err) + } + defer deletePodWrapper(t, data, data.testNamespace, "local-pod") + if err := data.podWaitForRunning(defaultTimeout, "local-pod", data.testNamespace); err != nil { + t.Fatalf("Error when waiting for Pod 'local-pod' to be in the Running state") + } + if err := data.createBusyboxPodOnNode("remote-pod", data.testNamespace, workerNodeName(1), false); err != nil { + t.Fatalf("Failed to create remote Pod: %v", err) + } + defer deletePodWrapper(t, data, data.testNamespace, "remote-pod") + if err := data.podWaitForRunning(defaultTimeout, "remote-pod", data.testNamespace); err != nil { + t.Fatalf("Error when waiting for Pod 'remote-pod' to be in the Running state") + } + // Match all Pods created above expression + matchExpressions := []metav1.LabelSelectorRequirement{ + { + Key: "antrea-e2e", + Operator: metav1.LabelSelectorOpExists, + }, + } + + tests := []struct { + name string + originalRateLimit *v1beta1.RateLimit + newRateLimit *v1beta1.RateLimit + }{ + { + name: "add rate-limit", + originalRateLimit: nil, + newRateLimit: rateLimit1, + }, + { + name: "remove rate-limit", + originalRateLimit: rateLimit1, + newRateLimit: nil, + }, + { + name: "change rate-limit", + originalRateLimit: rateLimit1, + newRateLimit: rateLimit2, + }, + } + checkRateLimitInstall := func(rateLimit *v1beta1.RateLimit) { + meterIDOnLocal := data.getEgressMeterID(egressNodeAntreaPodName, rateLimit.Peak, rateLimit.Burst) + if meterIDOnLocal == 0 { + t.Fatalf("Meter installation failed") + } + meterIDOnRemote := data.getEgressMeterID(nonEgressNodeAntreaPodName, rateLimit.Peak, rateLimit.Burst) + if meterIDOnRemote != 0 { + t.Fatalf("Unexpected meters present on non-EgressNode") + } + meterFlowNum := data.getEgressMeterFlowNum(egressNodeAntreaPodName, meterIDOnLocal) + // One local Pod flow and one tunnel flow. + if meterFlowNum != 2 { + t.Fatalf("Flows with meter installation failed") + } + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + egress := data.createEgress(t, "egress-qos-", matchExpressions, nil, "", egressNodeIP, tt.originalRateLimit) + _, err := data.waitForEgressRealized(egress) + if err != nil { + t.Fatalf("Error when waiting for Egress to be realized: %v", err) + } + if tt.originalRateLimit != nil { + checkRateLimitInstall(tt.originalRateLimit) + } + + toUpdate := egress.DeepCopy() + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + toUpdate.Spec.RateLimit = tt.newRateLimit + _, err := data.crdClient.CrdV1beta1().Egresses().Update(context.TODO(), toUpdate, metav1.UpdateOptions{}) + if err != nil && errors.IsConflict(err) { + toUpdate, _ = data.crdClient.CrdV1beta1().Egresses().Get(context.TODO(), egress.Name, metav1.GetOptions{}) + } + return err + }) + require.NoError(t, err, "Failed to update Egress") + + if tt.originalRateLimit != nil { + meterID := data.getEgressMeterID(egressNodeAntreaPodName, tt.originalRateLimit.Peak, tt.originalRateLimit.Burst) + if meterID != 0 { + t.Fatalf("Meter update/delete failed") + } + + meterFlowNum := data.getEgressMeterFlowNum(egressNodeAntreaPodName, meterID) + if meterFlowNum != 0 { + t.Fatalf("Flows with meter update/delete failed") + } + } + if tt.newRateLimit != nil { + checkRateLimitInstall(tt.newRateLimit) + } + failOnError(data.deleteEgress(egress.Name), t) + }) + } +} + +func (data *TestData) getEgressMeterFlowNum(antreaPodName string, meterID int) int { + // dump-flows output should look like below: + // cookie=0x7040000000000, duration=17.079s, table=EgressMark, n_packets=0, n_bytes=0, idle_age=17, priority=200,ct_state=+new+trk,ip,tun_dst=172.18.0.3 actions=meter:1,set_field:0x1/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc + // cookie=0x7040000000000, duration=17.073s, table=EgressMark, n_packets=0, n_bytes=0, idle_age=17, priority=200,ct_state=+new+trk,ip,in_port="se1-5584-d7ca50" actions=meter:1,set_field:0x1/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc + cmd := []string{ + "/bin/sh", + "-c", + fmt.Sprintf("ovs-ofctl dump-flows %s table=EgressMark -O openflow15 | grep \"meter:%d\" -c", defaultBridgeName, meterID), + } + stdout, _, _ := data.RunCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd) + flowNum, err := strconv.Atoi(strings.Trim(stdout, "\n")) + if err != nil { + return -1 + } + return flowNum +} + +func (data *TestData) getEgressMeterID(antreaPodName string, peak, burst uint32) int { + // dump-meters output should look like below: + // meter=1 kbps burst bands= + // type=drop rate=100000 burst_size=200000 + // + // meter=256 pktps burst bands= + // type=drop rate=500 burst_size=1000 + cmd := []string{ + "/bin/sh", + "-c", + fmt.Sprintf("ovs-ofctl dump-meters %s -O openflow15 | grep -B 1 \"%d burst_size=%d\" | grep kbps", defaultBridgeName, peak*1000, burst*1000), + } + stdout, _, _ := data.RunCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd) + if stdout == "" { + return 0 + } + meterStr := strings.Split(stdout, " ")[0] + meterIDStr := strings.Split(meterStr, "=")[1] + meterID, err := strconv.Atoi(meterIDStr) + if err != nil { + return 0 + } + return meterID +} + func (data *TestData) checkEgressState(egressName, expectedIP, expectedNode, otherNode string, timeout time.Duration) (*v1beta1.Egress, error) { var egress *v1beta1.Egress var expectedNodeHasIP, otherNodeHasIP bool @@ -790,7 +958,7 @@ func (data *TestData) createExternalIPPool(t *testing.T, generateName string, ip return pool } -func (data *TestData) createEgress(t *testing.T, generateName string, matchExpressions []metav1.LabelSelectorRequirement, matchLabels map[string]string, externalPoolName string, egressIP string) *v1beta1.Egress { +func (data *TestData) createEgress(t *testing.T, generateName string, matchExpressions []metav1.LabelSelectorRequirement, matchLabels map[string]string, externalPoolName string, egressIP string, rateLimit *v1beta1.RateLimit) *v1beta1.Egress { egress := &v1beta1.Egress{ ObjectMeta: metav1.ObjectMeta{GenerateName: generateName}, Spec: v1beta1.EgressSpec{ @@ -802,6 +970,7 @@ func (data *TestData) createEgress(t *testing.T, generateName string, matchExpre }, ExternalIPPool: externalPoolName, EgressIP: egressIP, + RateLimit: rateLimit, }, } egress, err := data.crdClient.CrdV1beta1().Egresses().Create(context.TODO(), egress, metav1.CreateOptions{}) @@ -825,3 +994,21 @@ func (data *TestData) waitForEgressRealized(egress *v1beta1.Egress) (*v1beta1.Eg } return egress, nil } + +func (data *TestData) deleteEgress(egressName string) error { + err := data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egressName, metav1.DeleteOptions{}) + if err != nil { + return err + } + err = wait.PollImmediate(200*time.Millisecond, waitEgressRealizedTimeout, func() (done bool, err error) { + _, err = data.crdClient.CrdV1beta1().Egresses().Get(context.TODO(), egressName, metav1.GetOptions{}) + if err != nil && errors.IsNotFound(err) { + return true, nil + } + return false, nil + }) + if err != nil { + return fmt.Errorf("wait for Egress %s to be deleted failed: %v", egressName, err) + } + return nil +} diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 2ff2d7a60ab..e4da3b2491b 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -553,7 +553,7 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs } else { egressNodeIP = nodeIPv6(0) } - egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "busybox"}, "", egressNodeIP) + egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "busybox"}, "", egressNodeIP, nil) egress, err := data.waitForEgressRealized(egress) if err != nil { t.Fatalf("Error when waiting for Egress to be realized: %v", err) @@ -593,7 +593,7 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs } else { egressNodeIP = nodeIPv6(1) } - egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "busybox"}, "", egressNodeIP) + egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "busybox"}, "", egressNodeIP, nil) egress, err := data.waitForEgressRealized(egress) if err != nil { t.Fatalf("Error when waiting for Egress to be realized: %v", err) diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 744c46de926..08b4db3c327 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -1760,6 +1760,14 @@ func (data *TestData) RunCommandFromAntreaPodOnNode(nodeName string, cmd []strin return data.RunCommandFromPod(antreaNamespace, antreaPodName, agentContainerName, cmd) } +func (data *TestData) RunCommandFromAntreaOVSOnNode(nodeName string, cmd []string) (string, string, error) { + antreaPodName, err := data.getAntreaPodOnNode(nodeName) + if err != nil { + return "", "", err + } + return data.RunCommandFromPod(antreaNamespace, antreaPodName, agentContainerName, cmd) +} + // getFlowAggregator retrieves the name of the Flow-Aggregator Pod (flow-aggregator-*) running on a specific Node. func (data *TestData) getFlowAggregator() (*corev1.Pod, error) { listOptions := metav1.ListOptions{ diff --git a/test/e2e/proxy_test.go b/test/e2e/proxy_test.go index b08a98631da..a4b0274ada9 100644 --- a/test/e2e/proxy_test.go +++ b/test/e2e/proxy_test.go @@ -423,7 +423,7 @@ func TestNodePortAndEgressWithTheSameBackendPod(t *testing.T) { // Create an Egress whose external IP is on worker Node. egressNodeIP := workerNodeIPv4(1) - egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "nginx"}, "", egressNodeIP) + egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "nginx"}, "", egressNodeIP, nil) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) // Create the backend Pod on control plane Node. diff --git a/test/e2e/traceflow_test.go b/test/e2e/traceflow_test.go index e95bb458609..0e1d37ebd0e 100644 --- a/test/e2e/traceflow_test.go +++ b/test/e2e/traceflow_test.go @@ -2052,7 +2052,7 @@ func testTraceflowEgress(t *testing.T, data *TestData) { }, } - egress := data.createEgress(t, "egress-", matchExpressions, nil, "", egressIP) + egress := data.createEgress(t, "egress-", matchExpressions, nil, "", egressIP, nil) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) testcaseLocalEgress := testcase{