diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index 78a64262ce4..e93fe46c4fb 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -76,6 +76,9 @@ featureGates: # Allow users to specify the load balancer mode as DSR (Direct Server Return). {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "LoadBalancerModeDSR" "default" false) }} +# Enable Egress traffic shaping. +{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "EgressTrafficShaping" "default" false) }} + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: {{ .Values.ovs.bridgeName | quote }} diff --git a/build/charts/antrea/crds/egress.yaml b/build/charts/antrea/crds/egress.yaml index ab2b7863b34..9953be258d4 100644 --- a/build/charts/antrea/crds/egress.yaml +++ b/build/charts/antrea/crds/egress.yaml @@ -239,6 +239,16 @@ spec: type: array items: type: string + bandwidth: + type: object + required: + - rate + - burst + properties: + rate: + type: string + burst: + type: string status: type: object properties: diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 1f2e6e1e28b..0b08d80b60a 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -2375,6 +2375,16 @@ spec: type: array items: type: string + bandwidth: + type: object + required: + - rate + - burst + properties: + rate: + type: string + burst: + type: string status: type: object properties: @@ -5561,6 +5571,9 @@ data: # Allow users to specify the load balancer mode as DSR (Direct Server Return). # LoadBalancerModeDSR: false + # Enable Egress traffic shaping. + # EgressTrafficShaping: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -6853,7 +6866,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 77b5f317f3faa10adebca604e145675d41d73631984cc8fa075069b70f9f0419 + checksum/config: e59e0431902646d46cba490279184fea2bdd3c8b486b5a7b1d3ece9a91614634 labels: app: antrea component: antrea-agent @@ -7094,7 +7107,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 77b5f317f3faa10adebca604e145675d41d73631984cc8fa075069b70f9f0419 + checksum/config: e59e0431902646d46cba490279184fea2bdd3c8b486b5a7b1d3ece9a91614634 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-crds.yml b/build/yamls/antrea-crds.yml index 6f82e84e1cd..7f24fc8ee19 100644 --- a/build/yamls/antrea-crds.yml +++ b/build/yamls/antrea-crds.yml @@ -2366,6 +2366,16 @@ spec: type: array items: type: string + bandwidth: + type: object + required: + - rate + - burst + properties: + rate: + type: string + burst: + type: string status: type: object properties: diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index c993eefbf42..1ebcf9995d5 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -2375,6 +2375,16 @@ spec: type: array items: type: string + bandwidth: + type: object + required: + - rate + - burst + properties: + rate: + type: string + burst: + type: string status: type: object properties: @@ -5561,6 +5571,9 @@ data: # Allow users to specify the load balancer mode as DSR (Direct Server Return). # LoadBalancerModeDSR: false + # Enable Egress traffic shaping. + # EgressTrafficShaping: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -6853,7 +6866,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 77b5f317f3faa10adebca604e145675d41d73631984cc8fa075069b70f9f0419 + checksum/config: e59e0431902646d46cba490279184fea2bdd3c8b486b5a7b1d3ece9a91614634 labels: app: antrea component: antrea-agent @@ -7095,7 +7108,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 77b5f317f3faa10adebca604e145675d41d73631984cc8fa075069b70f9f0419 + checksum/config: e59e0431902646d46cba490279184fea2bdd3c8b486b5a7b1d3ece9a91614634 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 04daaf64ef8..572595eb632 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -2375,6 +2375,16 @@ spec: type: array items: type: string + bandwidth: + type: object + required: + - rate + - burst + properties: + rate: + type: string + burst: + type: string status: type: object properties: @@ -5561,6 +5571,9 @@ data: # Allow users to specify the load balancer mode as DSR (Direct Server Return). # LoadBalancerModeDSR: false + # Enable Egress traffic shaping. + # EgressTrafficShaping: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -6853,7 +6866,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 0b761fc6deaf2ebde722c4d34a9898d9e9370e3c99467d40a28009909011b9e9 + checksum/config: 3b1758664de8044af1aa7454c64bd1a4911750e562e1ae9375c9c16a335a469d labels: app: antrea component: antrea-agent @@ -7092,7 +7105,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 0b761fc6deaf2ebde722c4d34a9898d9e9370e3c99467d40a28009909011b9e9 + checksum/config: 3b1758664de8044af1aa7454c64bd1a4911750e562e1ae9375c9c16a335a469d labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 1032a4128a0..a9cc6bd36d3 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -2375,6 +2375,16 @@ spec: type: array items: type: string + bandwidth: + type: object + required: + - rate + - burst + properties: + rate: + type: string + burst: + type: string status: type: object properties: @@ -5574,6 +5584,9 @@ data: # Allow users to specify the load balancer mode as DSR (Direct Server Return). # LoadBalancerModeDSR: false + # Enable Egress traffic shaping. + # EgressTrafficShaping: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -6866,7 +6879,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 55b17484eb9e47c7af06d7a9367348b851d9de4ad0cdc0e1a3f0b328b08df2d2 + checksum/config: a34de3efa658ac40c9bde28e08832dd897259fdcf639beab9d4e47531d7da948 checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -7151,7 +7164,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 55b17484eb9e47c7af06d7a9367348b851d9de4ad0cdc0e1a3f0b328b08df2d2 + checksum/config: a34de3efa658ac40c9bde28e08832dd897259fdcf639beab9d4e47531d7da948 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 3c515757511..3227814289f 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -2375,6 +2375,16 @@ spec: type: array items: type: string + bandwidth: + type: object + required: + - rate + - burst + properties: + rate: + type: string + burst: + type: string status: type: object properties: @@ -5561,6 +5571,9 @@ data: # Allow users to specify the load balancer mode as DSR (Direct Server Return). # LoadBalancerModeDSR: false + # Enable Egress traffic shaping. + # EgressTrafficShaping: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -6853,7 +6866,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 5f88b901b90e9499d36fc38364a673d34b6fd6e79344fb63770d65ae3544470a + checksum/config: aa947bf5c403412b9c8cfcbcc335659992f19bd428886e80f43bafa052bac1e6 labels: app: antrea component: antrea-agent @@ -7092,7 +7105,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 5f88b901b90e9499d36fc38364a673d34b6fd6e79344fb63770d65ae3544470a + checksum/config: aa947bf5c403412b9c8cfcbcc335659992f19bd428886e80f43bafa052bac1e6 labels: app: antrea component: antrea-controller diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 157c0099d6c..d4464951921 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -160,6 +160,7 @@ func run(o *Options) error { features.DefaultFeatureGate.Enabled(features.AntreaPolicy), l7NetworkPolicyEnabled, o.enableEgress, + features.DefaultFeatureGate.Enabled(features.EgressTrafficShaping), enableFlowExporter, o.config.AntreaProxy.ProxyAll, features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR), @@ -514,6 +515,7 @@ func run(o *Options) error { egressController, err = egress.NewEgressController( ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeConfig.NodeTransportInterfaceName, memberlistCluster, egressInformer, nodeInformer, podUpdateChannel, serviceCIDRProvider, o.config.Egress.MaxEgressIPsPerNode, + features.DefaultFeatureGate.Enabled(features.EgressTrafficShaping), ) if err != nil { return fmt.Errorf("error creating new Egress controller: %v", err) diff --git a/docs/egress.md b/docs/egress.md index 447c9c42ba7..b0babd16f37 100644 --- a/docs/egress.md +++ b/docs/egress.md @@ -9,6 +9,7 @@ - [AppliedTo](#appliedto) - [EgressIP](#egressip) - [ExternalIPPool](#externalippool) + - [Bandwidth](#bandwidth) - [The ExternalIPPool resource](#the-externalippool-resource) - [IPRanges](#ipranges) - [NodeSelector](#nodeselector) @@ -127,6 +128,46 @@ The `externalIPPool` field specifies the name of the `ExternalIPPool` that the be assigned to. It can be empty, which means users should assign the `egressIP` to one Node manually. +### Bandwidth + +The `bandwidth` field enables traffic shaping for an Egress, by limiting the +bandwidth for all egress traffic belonging to this Egress. `rate` specifies +the maximum transmission rate. `burst` specifies the maximum burst size when +traffic exceeds the rate. The user-provided values for `rate` and `burst` must +follow the Kubernetes [Quantity](https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/quantity/) format, +e.g. 300k, 100M, 2G. All backend workloads selected by a rate-limited Egress share the +same bandwidth while sending egress traffic via this Egress. If these limits are exceeded, +the traffic will be dropped. + +**Note**: Traffic shaping is currently in alpha version. To use this feature, users should +enable the `EgressTrafficShaping` feature gate. Each Egress IP can be applied one bandwidth only. +If multiple Egresses use the same IP but configure different bandwidths, the effective +bandwidth will be selected randomly from the set of configured bandwidths. The effective use of the `bandwidth` +function requires the OVS datapath to support meters. + +An Egress with traffic shaping example: + +```yaml +apiVersion: crd.antrea.io/v1beta1 +kind: Egress +metadata: + name: egress-prod-web +spec: + appliedTo: + namespaceSelector: + matchLabels: + env: prod + podSelector: + matchLabels: + role: web + egressIP: 10.10.0.8 + bandwidth: + rate: 800M + burst: 2G +status: + egressNode: node01 +``` + ## The ExternalIPPool resource ExternalIPPool defines one or multiple IP ranges that can be used in the diff --git a/docs/feature-gates.md b/docs/feature-gates.md index ba98e1143f0..bec2c838fc1 100644 --- a/docs/feature-gates.md +++ b/docs/feature-gates.md @@ -55,6 +55,7 @@ edit the Agent configuration in the | `SupportBundleCollection` | Agent + Controller | `false` | Alpha | v1.10 | N/A | N/A | Yes | | | `L7NetworkPolicy` | Agent + Controller | `false` | Alpha | v1.10 | N/A | N/A | Yes | | | `AdminNetworkPolicy` | Controller | `false` | Alpha | v1.13 | N/A | N/A | Yes | | +| `EgressTrafficShaping` | Agent | `false` | Alpha | v1.14 | N/A | N/A | Yes | OVS meters should be supported | ## Description and Requirements of Features @@ -402,3 +403,13 @@ this [document](antrea-l7-network-policy.md#prerequisites) for more information The `AdminNetworkPolicy` API (which currently includes the AdminNetworkPolicy and BaselineAdminNetworkPolicy objects) complements the Antrea-native policies and help cluster administrators to set security postures in a portable manner. + +### EgressTrafficShaping + +The `EgressTrafficShaping` feature gate of Antrea Agent enables traffic shaping of Egress, which could limit the +bandwidth for all egress traffic belonging to an Egress. Refer to this [document](egress.md#trafficshaping) + +#### Requirements for this Feature + +This feature leverages OVS meters to do the actual rate-limiting, therefore this feature requires OVS meters +to be supported in the datapath. diff --git a/pkg/agent/controller/egress/egress_controller.go b/pkg/agent/controller/egress/egress_controller.go index e91fa87cd54..00d50d668af 100644 --- a/pkg/agent/controller/egress/egress_controller.go +++ b/pkg/agent/controller/egress/egress_controller.go @@ -25,6 +25,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -91,6 +92,24 @@ type egressState struct { ofPorts sets.Set[int32] // The actual Pods of the Egress. Used to identify stale Pods when updating or deleting an Egress. pods sets.Set[string] + // Rate-limit of this Egress. + rateLimitMeter *rateLimitMeter +} + +type rateLimitMeter struct { + MeterID uint32 + Rate uint32 + Burst uint32 +} + +func (r *rateLimitMeter) Equals(rateLimit *rateLimitMeter) bool { + if r == nil && rateLimit == nil { + return true + } + if r == nil || rateLimit == nil { + return false + } + return r.MeterID == rateLimit.MeterID && r.Rate == rateLimit.Rate && r.Burst == rateLimit.Burst } // egressIPState keeps the actual state of an Egress IP. It's maintained separately from egressState because @@ -154,6 +173,8 @@ type EgressController struct { serviceCIDRUpdateCh chan struct{} // Declared for testing. serviceCIDRUpdateRetryDelay time.Duration + + trafficShapingEnabled bool } func NewEgressController( @@ -170,7 +191,11 @@ func NewEgressController( podUpdateSubscriber channel.Subscriber, serviceCIDRInterface servicecidr.Interface, maxEgressIPsPerNode int, + trafficShapingEnabled bool, ) (*EgressController, error) { + if trafficShapingEnabled && !openflow.OVSMetersAreSupported() { + klog.Info("EgressTrafficShaping feature gate is enabled, but it is ignored because OVS meters are not supported.") + } c := &EgressController{ ofClient: ofClient, routeClient: routeClient, @@ -193,6 +218,8 @@ func NewEgressController( // One buffer is enough as we just use it to ensure the target handler is executed once. serviceCIDRUpdateCh: make(chan struct{}, 1), serviceCIDRUpdateRetryDelay: 10 * time.Second, + + trafficShapingEnabled: openflow.OVSMetersAreSupported() && trafficShapingEnabled, } ipAssigner, err := newIPAssigner(nodeTransportInterface, egressDummyDevice) if err != nil { @@ -506,6 +533,61 @@ func (c *EgressController) realizeEgressIP(egressName, egressIP string) (uint32, return ipState.mark, nil } +func bandwidthToRateLimitMeter(bandwidth *crdv1b1.Bandwidth, meterID uint32) *rateLimitMeter { + if bandwidth == nil { + return nil + } + rate, err := resource.ParseQuantity(bandwidth.Rate) + if err != nil { + klog.ErrorS(err, "Invalid bandwidth rate configured for Egress", "rate", bandwidth.Rate) + return nil + } + burst, err := resource.ParseQuantity(bandwidth.Burst) + if err != nil { + klog.ErrorS(err, "Invalid bandwidth burst size configured for Egress", "burst", bandwidth.Burst) + return nil + } + return &rateLimitMeter{ + MeterID: meterID, + Rate: uint32(rate.Value() / 1000), + Burst: uint32(burst.Value() / 1000), + } +} + +func (c *EgressController) realizeEgressQoS(egressName string, eState *egressState, mark uint32, bandwidth *crdv1b1.Bandwidth) error { + if !c.trafficShapingEnabled { + if bandwidth != nil { + klog.InfoS("Bandwidth in the Egress is ignored because OVS meters are not supported or trafficShaping is not enabled in Antrea-agent config.", "EgressName", egressName) + } + return nil + } + var desiredRateLimit *rateLimitMeter + // QoS is desired only if the Egress is configured on this Node. + if mark != 0 { + desiredRateLimit = bandwidthToRateLimitMeter(bandwidth, mark) + } + // Nothing changes. + if eState.rateLimitMeter.Equals(desiredRateLimit) { + return nil + } + // It's desired to have QoS on this Node, install/override it. + if desiredRateLimit != nil { + if err := c.ofClient.InstallEgressQoS(mark, desiredRateLimit.Rate, desiredRateLimit.Burst); err != nil { + return err + } + eState.rateLimitMeter = desiredRateLimit + return nil + } + // It's undesired to have QoS on this Node, uninstall it. + if eState.rateLimitMeter != nil { + if err := c.ofClient.UninstallEgressQoS(eState.rateLimitMeter.MeterID); err != nil { + return err + } + eState.rateLimitMeter = nil + } + return nil +} + // unrealizeEgressIP unrealizes an Egress IP, reverts what realizeEgressIP does. // For a local Egress IP, only when the last Egress unrealizes the Egress IP, it will releases the IP's mark and // uninstalls corresponding flows and iptables rule. @@ -782,6 +864,10 @@ func (c *EgressController) syncEgress(egressName string) error { return err } + if err = c.realizeEgressQoS(egressName, eState, mark, egress.Spec.Bandwidth); err != nil { + return err + } + // If the mark changes, uninstall all of the Egress's Pod flows first, then installs them with new mark. // It could happen when the Egress IP is added to or removed from the Node. if eState.mark != mark { @@ -858,6 +944,12 @@ func (c *EgressController) uninstallEgress(egressName string, eState *egressStat if err := c.unrealizeEgressIP(egressName, eState.egressIP); err != nil { return err } + // Uninstall its meter. + if c.trafficShapingEnabled && eState.rateLimitMeter != nil { + if err := c.ofClient.UninstallEgressQoS(eState.rateLimitMeter.MeterID); err != nil { + return err + } + } // Unassign the Egress IP from the local Node if it was assigned by the agent. if err := c.ipAssigner.UnassignIP(eState.egressIP); err != nil { return err diff --git a/pkg/agent/controller/egress/egress_controller_test.go b/pkg/agent/controller/egress/egress_controller_test.go index 83301e06bf1..5f347bf04c7 100644 --- a/pkg/agent/controller/egress/egress_controller_test.go +++ b/pkg/agent/controller/egress/egress_controller_test.go @@ -63,6 +63,17 @@ const ( fakeNode2 = "node2" ) +var ( + fakeBandwidth = crdv1b1.Bandwidth{ + Rate: "500k", + Burst: "500k", + } + newFakeBandwidth = crdv1b1.Bandwidth{ + Rate: "10M", + Burst: "20M", + } +) + type fakeLocalIPDetector struct { localIPs sets.Set[string] } @@ -183,6 +194,7 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll podUpdateChannel, mockServiceCIDRProvider, 255, + true, ) egressController.localIPDetector = localIPDetector return &fakeController{ @@ -215,11 +227,11 @@ func TestSyncEgress(t *testing.T) { name: "Local IP becomes non local", existingEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, Bandwidth: &fakeBandwidth}, }, newEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, Bandwidth: &fakeBandwidth}, }, existingEgressGroup: &cpv1b2.EgressGroup{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, @@ -239,10 +251,11 @@ func TestSyncEgress(t *testing.T) { expectedEgresses: []*crdv1b1.Egress{ { ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, Bandwidth: &fakeBandwidth}, }, }, expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) { + mockOFClient.EXPECT().InstallEgressQoS(uint32(1), uint32(500), uint32(500)) mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(2), net.ParseIP(fakeLocalEgressIP1), uint32(1)) @@ -254,6 +267,7 @@ func TestSyncEgress(t *testing.T) { mockOFClient.EXPECT().UninstallPodSNATFlows(uint32(1)) mockOFClient.EXPECT().UninstallPodSNATFlows(uint32(2)) mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1) + mockOFClient.EXPECT().UninstallEgressQoS(uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(0)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(3), net.ParseIP(fakeLocalEgressIP1), uint32(0)) @@ -264,11 +278,11 @@ func TestSyncEgress(t *testing.T) { name: "Non local IP becomes local", existingEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1, Bandwidth: &fakeBandwidth}, }, newEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1, Bandwidth: &fakeBandwidth}, }, existingEgressGroup: &cpv1b2.EgressGroup{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, @@ -288,7 +302,7 @@ func TestSyncEgress(t *testing.T) { expectedEgresses: []*crdv1b1.Egress{ { ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1, Bandwidth: &fakeBandwidth}, Status: crdv1b1.EgressStatus{EgressIP: fakeRemoteEgressIP1, EgressNode: fakeNode}, }, }, @@ -301,6 +315,7 @@ func TestSyncEgress(t *testing.T) { mockOFClient.EXPECT().UninstallPodSNATFlows(uint32(2)) mockIPAssigner.EXPECT().UnassignIP(fakeRemoteEgressIP1) + mockOFClient.EXPECT().InstallEgressQoS(uint32(1), uint32(500), uint32(500)) mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeRemoteEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeRemoteEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(3), net.ParseIP(fakeRemoteEgressIP1), uint32(1)) @@ -312,11 +327,11 @@ func TestSyncEgress(t *testing.T) { name: "Change from local Egress IP to another local one", existingEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, Bandwidth: &fakeBandwidth}, }, newEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP2}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP2, Bandwidth: &fakeBandwidth}, }, existingEgressGroup: &cpv1b2.EgressGroup{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, @@ -335,17 +350,19 @@ func TestSyncEgress(t *testing.T) { expectedEgresses: []*crdv1b1.Egress{ { ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP2}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP2, Bandwidth: &fakeBandwidth}, Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP2, EgressNode: fakeNode}, }, }, expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) { + mockOFClient.EXPECT().InstallEgressQoS(uint32(1), uint32(500), uint32(500)) mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(2), net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockRouteClient.EXPECT().AddSNATRule(net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1) + mockOFClient.EXPECT().UninstallEgressQoS(uint32(1)) mockOFClient.EXPECT().UninstallSNATMarkFlows(uint32(1)) mockRouteClient.EXPECT().DeleteSNATRule(uint32(1)) mockOFClient.EXPECT().UninstallPodSNATFlows(uint32(1)) @@ -353,6 +370,7 @@ func TestSyncEgress(t *testing.T) { mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1) mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP2) + mockOFClient.EXPECT().InstallEgressQoS(uint32(1), uint32(500), uint32(500)) mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP2), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP2), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(3), net.ParseIP(fakeLocalEgressIP2), uint32(1)) @@ -364,11 +382,11 @@ func TestSyncEgress(t *testing.T) { name: "Change from local Egress IP to a remote one", existingEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, Bandwidth: &fakeBandwidth}, }, newEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1, Bandwidth: &fakeBandwidth}, }, existingEgressGroup: &cpv1b2.EgressGroup{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, @@ -387,10 +405,11 @@ func TestSyncEgress(t *testing.T) { expectedEgresses: []*crdv1b1.Egress{ { ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1, Bandwidth: &fakeBandwidth}, }, }, expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) { + mockOFClient.EXPECT().InstallEgressQoS(uint32(1), uint32(500), uint32(500)) mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(2), net.ParseIP(fakeLocalEgressIP1), uint32(1)) @@ -403,6 +422,7 @@ func TestSyncEgress(t *testing.T) { mockOFClient.EXPECT().UninstallPodSNATFlows(uint32(2)) mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1) mockIPAssigner.EXPECT().UnassignIP(fakeRemoteEgressIP1) + mockOFClient.EXPECT().UninstallEgressQoS(uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeRemoteEgressIP1), uint32(0)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(3), net.ParseIP(fakeRemoteEgressIP1), uint32(0)) @@ -413,11 +433,11 @@ func TestSyncEgress(t *testing.T) { name: "Change from remote Egress IP to a local one", existingEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeRemoteEgressIP1, Bandwidth: &fakeBandwidth}, }, newEgress: &crdv1b1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, Bandwidth: &fakeBandwidth}, }, existingEgressGroup: &cpv1b2.EgressGroup{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, @@ -436,7 +456,7 @@ func TestSyncEgress(t *testing.T) { expectedEgresses: []*crdv1b1.Egress{ { ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, - Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, Bandwidth: &fakeBandwidth}, Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP1, EgressNode: fakeNode}, }, }, @@ -450,6 +470,7 @@ func TestSyncEgress(t *testing.T) { mockIPAssigner.EXPECT().UnassignIP(fakeRemoteEgressIP1) mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1) + mockOFClient.EXPECT().InstallEgressQoS(uint32(1), uint32(500), uint32(500)) mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) mockOFClient.EXPECT().InstallPodSNATFlows(uint32(3), net.ParseIP(fakeLocalEgressIP1), uint32(1)) @@ -700,10 +721,126 @@ func TestSyncEgress(t *testing.T) { mockRouteClient.EXPECT().DeleteSNATRule(uint32(1)) }, }, + { + name: "Update Egress from non-rate-limited to rate-limited", + existingEgress: &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + }, + newEgress: &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, Bandwidth: &fakeBandwidth}, + }, + existingEgressGroup: &cpv1b2.EgressGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + GroupMembers: []cpv1b2.GroupMember{ + {Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}}, + }, + }, + newEgressGroup: &cpv1b2.EgressGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + GroupMembers: []cpv1b2.GroupMember{ + {Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}}, + }, + }, + expectedEgresses: []*crdv1b1.Egress{ + { + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, Bandwidth: &fakeBandwidth}, + Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP1, EgressNode: fakeNode}, + }, + }, + expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) { + mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockRouteClient.EXPECT().AddSNATRule(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1).Times(3) + + mockOFClient.EXPECT().InstallEgressQoS(uint32(1), uint32(500), uint32(500)) + }, + }, + { + name: "Update Egress from rate-limited to non-rate-limited", + existingEgress: &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, Bandwidth: &fakeBandwidth}, + }, + newEgress: &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + }, + existingEgressGroup: &cpv1b2.EgressGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + GroupMembers: []cpv1b2.GroupMember{ + {Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}}, + }, + }, + newEgressGroup: &cpv1b2.EgressGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + GroupMembers: []cpv1b2.GroupMember{ + {Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}}, + }, + }, + expectedEgresses: []*crdv1b1.Egress{ + { + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1}, + Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP1, EgressNode: fakeNode}, + }, + }, + expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) { + mockOFClient.EXPECT().InstallEgressQoS(uint32(1), uint32(500), uint32(500)) + mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockRouteClient.EXPECT().AddSNATRule(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1).Times(3) + + mockOFClient.EXPECT().UninstallEgressQoS(uint32(1)) + }, + }, + { + name: "Update Egress rate-limited config", + existingEgress: &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, Bandwidth: &fakeBandwidth}, + }, + newEgress: &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, Bandwidth: &newFakeBandwidth}, + }, + existingEgressGroup: &cpv1b2.EgressGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + GroupMembers: []cpv1b2.GroupMember{ + {Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}}, + }, + }, + newEgressGroup: &cpv1b2.EgressGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + GroupMembers: []cpv1b2.GroupMember{ + {Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}}, + }, + }, + expectedEgresses: []*crdv1b1.Egress{ + { + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, Bandwidth: &newFakeBandwidth}, + Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP1, EgressNode: fakeNode}, + }, + }, + expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) { + mockOFClient.EXPECT().InstallEgressQoS(uint32(1), uint32(500), uint32(500)) + mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockRouteClient.EXPECT().AddSNATRule(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1).Times(3) + mockOFClient.EXPECT().InstallEgressQoS(uint32(1), uint32(10000), uint32(20000)) + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c := newFakeController(t, []runtime.Object{tt.existingEgress}) + c.trafficShapingEnabled = true if tt.maxEgressIPsPerNode > 0 { c.egressIPScheduler.maxEgressIPsPerNode = tt.maxEgressIPsPerNode } diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index e5f049b2ce2..f336ab209a1 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -163,6 +163,14 @@ type Client interface { // UninstallPodSNATFlows removes the SNAT flows for the local Pod. UninstallPodSNATFlows(ofPort uint32) error + // InstallEgressQoS installs an OF meter with specific meterID, rate + // and burst used for QoS of Egress and a QoS flow that direct packets + // into the meter. + InstallEgressQoS(meterID, rate, burst uint32) error + + // UninstallEgressQoS removes the flow and OF meter used by QoS of Egress. + UninstallEgressQoS(meterID uint32) error + // Disconnect disconnects the connection between client and OFSwitch. Disconnect() error @@ -798,13 +806,13 @@ func (c *client) initialize() error { } if c.ovsMetersAreSupported { - if err := c.genPacketInMeter(PacketInMeterIDNP, uint32(c.packetInRate)).Add(); err != nil { + if err := c.genOFMeter(PacketInMeterIDNP, ofctrl.MeterBurst|ofctrl.MeterPktps, uint32(c.packetInRate), uint32(2*c.packetInRate)).Add(); err != nil { return fmt.Errorf("failed to install OpenFlow meter entry (meterID:%d, rate:%d) for NetworkPolicy packet-in rate limiting: %v", PacketInMeterIDNP, c.packetInRate, err) } - if err := c.genPacketInMeter(PacketInMeterIDTF, uint32(c.packetInRate)).Add(); err != nil { + if err := c.genOFMeter(PacketInMeterIDTF, ofctrl.MeterBurst|ofctrl.MeterPktps, uint32(c.packetInRate), uint32(2*c.packetInRate)).Add(); err != nil { return fmt.Errorf("failed to install OpenFlow meter entry (meterID:%d, rate:%d) for TraceFlow packet-in rate limiting: %v", PacketInMeterIDTF, c.packetInRate, err) } - if err := c.genPacketInMeter(PacketInMeterIDDNS, uint32(c.packetInRate)).Add(); err != nil { + if err := c.genOFMeter(PacketInMeterIDDNS, ofctrl.MeterBurst|ofctrl.MeterPktps, uint32(c.packetInRate), uint32(2*c.packetInRate)).Add(); err != nil { return fmt.Errorf("failed to install OpenFlow meter entry (meterID:%d, rate:%d) for DNS interception packet-in rate limiting: %v", PacketInMeterIDDNS, c.packetInRate, err) } } @@ -930,7 +938,7 @@ func (c *client) generatePipelines() { c.traceableFeatures = append(c.traceableFeatures, c.featureNetworkPolicy) if c.enableEgress { - c.featureEgress = newFeatureEgress(c.cookieAllocator, c.ipProtocols, c.nodeConfig, c.egressConfig) + c.featureEgress = newFeatureEgress(c.cookieAllocator, c.ipProtocols, c.nodeConfig, c.egressConfig, c.ovsMetersAreSupported && c.enableEgressTrafficShaping) c.activatedFeatures = append(c.activatedFeatures, c.featureEgress) } @@ -1036,6 +1044,52 @@ func (c *client) UninstallPodSNATFlows(ofPort uint32) error { return c.deleteFlows(c.featureEgress.cachedFlows, cacheKey) } +func (c *client) InstallEgressQoS(meterID, rate, burst uint32) error { + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + + // Install Egress QoS meter. + meter := c.genOFMeter(binding.MeterIDType(meterID), ofctrl.MeterBurst|ofctrl.MeterKbps, rate, burst) + _, installed := c.featureEgress.cachedMeter.Load(meterID) + if !installed { + if err := meter.Add(); err != nil { + return fmt.Errorf("error when installing Egress QoS OF Meter %d: %w", meterID, err) + } + } else { + if err := meter.Modify(); err != nil { + return fmt.Errorf("error when modifying Egress QoS OF Meter %d: %w", meterID, err) + } + } + c.featureEgress.cachedMeter.Store(meterID, meter) + + // Install Egress QoS flow. + flow := c.featureEgress.egressQoSFlow(meterID) + cacheKey := fmt.Sprintf("eq%x", meterID) + return c.modifyFlows(c.featureEgress.cachedFlows, cacheKey, []binding.Flow{flow}) +} + +func (c *client) UninstallEgressQoS(meterID uint32) error { + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + + // Uninstall Egress QoS flow. + cacheKey := fmt.Sprintf("eq%x", meterID) + if err := c.deleteFlows(c.featureEgress.cachedFlows, cacheKey); err != nil { + return err + } + + // Uninstall Egress QoS meter. + mCache, ok := c.featureEgress.cachedMeter.Load(meterID) + if ok { + meter := mCache.(binding.Meter) + if err := meter.Delete(); err != nil { + return fmt.Errorf("error when deleting Egress QoS OF Meter %d: %w", meterID, err) + } + c.featureEgress.cachedMeter.Delete(meterID) + } + return nil +} + func (c *client) ReplayFlows() { c.replayMutex.Lock() defer c.replayMutex.Unlock() @@ -1045,11 +1099,19 @@ func (c *client) ReplayFlows() { } for _, activeFeature := range c.activatedFeatures { + featureName := activeFeature.getFeatureName() + for _, meter := range activeFeature.replayMeters() { + // Openflow bundle message doesn't support meter. Add meter individually instead of + // calling AddOFEntries function. + if err := meter.Add(); err != nil { + klog.ErrorS(err, "Error when replaying feature meters", "feature", featureName) + } + } if err := c.ofEntryOperations.AddOFEntries(activeFeature.replayGroups()); err != nil { - klog.ErrorS(err, "Error when replaying feature groups", "feature", activeFeature.getFeatureName()) + klog.ErrorS(err, "Error when replaying feature groups", "feature", featureName) } if err := c.ofEntryOperations.AddAll(activeFeature.replayFlows()); err != nil { - klog.ErrorS(err, "Error when replaying feature flows", "feature", activeFeature.getFeatureName()) + klog.ErrorS(err, "Error when replaying feature flows", "feature", featureName) } } } diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index b588fd4b67f..54453990857 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -81,16 +81,17 @@ func skipTest(tb testing.TB, skipLinux, skipWindows bool) { } type clientOptions struct { - enableProxy bool - enableAntreaPolicy bool - enableEgress bool - proxyAll bool - enableDSR bool - connectUplinkToBridge bool - enableMulticast bool - enableTrafficControl bool - enableMulticluster bool - enableL7NetworkPolicy bool + enableProxy bool + enableAntreaPolicy bool + enableEgress bool + enableEgressTrafficShaping bool + proxyAll bool + enableDSR bool + connectUplinkToBridge bool + enableMulticast bool + enableTrafficControl bool + enableMulticluster bool + enableL7NetworkPolicy bool } type clientOptionsFn func(*clientOptions) @@ -119,6 +120,10 @@ func disableEgress(o *clientOptions) { o.enableEgress = false } +func enableEgressTrafficShaping(o *clientOptions) { + o.enableEgressTrafficShaping = true +} + func enableConnectUplinkToBridge(o *clientOptions) { o.connectUplinkToBridge = true } @@ -348,16 +353,17 @@ func newFakeClient(mockOFEntryOperations *oftest.MockOFEntryOperations, options ...clientOptionsFn) *client { o := &clientOptions{ - enableProxy: true, - enableAntreaPolicy: true, - enableEgress: true, - proxyAll: false, - enableDSR: false, - connectUplinkToBridge: false, - enableMulticast: false, - enableTrafficControl: false, - enableMulticluster: false, - enableL7NetworkPolicy: false, + enableProxy: true, + enableAntreaPolicy: true, + enableEgress: true, + enableEgressTrafficShaping: false, + proxyAll: false, + enableDSR: false, + connectUplinkToBridge: false, + enableMulticast: false, + enableTrafficControl: false, + enableMulticluster: false, + enableL7NetworkPolicy: false, } for _, fn := range options { fn(o) @@ -370,6 +376,7 @@ func newFakeClient(mockOFEntryOperations *oftest.MockOFEntryOperations, o.enableAntreaPolicy, o.enableL7NetworkPolicy, o.enableEgress, + o.enableEgressTrafficShaping, false, o.proxyAll, o.enableDSR, @@ -1565,31 +1572,58 @@ func Test_client_InstallSNATMarkFlows(t *testing.T) { mark := uint32(100) testCases := []struct { - name string - snatIP net.IP - expectedFlows []string + name string + snatIP net.IP + trafficShapingEnabled bool + expectedFlows []string }{ { - name: "IPv4 SNAT IP", - snatIP: net.ParseIP("192.168.77.100"), + name: "IPv4 SNAT IP", + snatIP: net.ParseIP("192.168.77.100"), + trafficShapingEnabled: false, expectedFlows: []string{ "cookie=0x1040000000000, table=EgressMark, priority=200,ct_state=+new+trk,ip,tun_dst=192.168.77.100 actions=set_field:0x64/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", }, }, { - name: "IPv6 SNAT IP", - snatIP: net.ParseIP("fec0:192:168:77::100"), + name: "IPv6 SNAT IP", + snatIP: net.ParseIP("fec0:192:168:77::100"), + trafficShapingEnabled: false, expectedFlows: []string{ "cookie=0x1040000000000, table=EgressMark, priority=200,ct_state=+new+trk,ipv6,tun_ipv6_dst=fec0:192:168:77::100 actions=set_field:0x64/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", }, }, + { + name: "IPv4 SNAT IP trafficShaping", + snatIP: net.ParseIP("192.168.77.100"), + trafficShapingEnabled: true, + expectedFlows: []string{ + "cookie=0x1040000000000, table=EgressMark, priority=200,ct_state=+trk,ip,tun_dst=192.168.77.100 actions=set_field:0x64/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:EgressQoS", + }, + }, + { + name: "IPv6 SNAT IP trafficShaping", + snatIP: net.ParseIP("fec0:192:168:77::100"), + trafficShapingEnabled: true, + expectedFlows: []string{ + "cookie=0x1040000000000, table=EgressMark, priority=200,ct_state=+trk,ipv6,tun_ipv6_dst=fec0:192:168:77::100 actions=set_field:0x64/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:EgressQoS", + }, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ctrl := gomock.NewController(t) m := oftest.NewMockOFEntryOperations(ctrl) - - fc := newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap) + var fc *client + if tc.trafficShapingEnabled { + if !OVSMetersAreSupported() { + t.Skipf("Skip test because OVS meters are not supported") + } + fc = newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap, enableEgressTrafficShaping) + fc.featureEgress.enableEgressTrafficShaping = tc.trafficShapingEnabled + } else { + fc = newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap) + } defer resetPipelines() m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1) @@ -1613,31 +1647,50 @@ func Test_client_InstallPodSNATFlows(t *testing.T) { ofPort := uint32(100) testCases := []struct { - name string - snatMark uint32 - expectedFlows []string + name string + trafficShapingEnabled bool + snatMark uint32 + expectedFlows []string }{ { - name: "SNAT on Local", - snatMark: uint32(100), + name: "SNAT on Local", + trafficShapingEnabled: false, + snatMark: uint32(100), expectedFlows: []string{ "cookie=0x1040000000000, table=EgressMark, priority=200,ct_state=+new+trk,ip,in_port=100 actions=set_field:0x64/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", }, }, { - name: "SNAT on Remote", + name: "SNAT on Remote", + trafficShapingEnabled: false, expectedFlows: []string{ "cookie=0x1040000000000, table=EgressMark, priority=200,ip,in_port=100 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:ff->eth_dst,set_field:192.168.77.101->tun_dst,set_field:0x10/0xf0->reg0,set_field:0x80000/0x80000->reg0,goto_table:L2ForwardingCalc", }, }, + { + name: "SNAT on Local trafficShaping", + trafficShapingEnabled: true, + snatMark: uint32(100), + expectedFlows: []string{ + "cookie=0x1040000000000, table=EgressMark, priority=200,ct_state=+trk,ip,in_port=100 actions=set_field:0x64/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:EgressQoS", + }, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ctrl := gomock.NewController(t) m := oftest.NewMockOFEntryOperations(ctrl) - - fc := newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap) + var fc *client + if tc.trafficShapingEnabled { + if !OVSMetersAreSupported() { + t.Skipf("Skip test because OVS meters are not supported") + } + fc = newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap, enableEgressTrafficShaping) + fc.featureEgress.enableEgressTrafficShaping = tc.trafficShapingEnabled + } else { + fc = newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap) + } defer resetPipelines() m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1) @@ -1656,6 +1709,50 @@ func Test_client_InstallPodSNATFlows(t *testing.T) { } } +func Test_client_InstallEgressQoS(t *testing.T) { + if !OVSMetersAreSupported() { + t.Skipf("Skip test because OVS meters are not supported") + } + meterID := uint32(100) + meterRate := uint32(100) + meterBurst := uint32(200) + expectedFlows := []string{"cookie=0x1040000000000, table=EgressQoS, priority=200,pkt_mark=0x64/0xff actions=meter:100,goto_table:L2ForwardingCalc"} + + ctrl := gomock.NewController(t) + m := oftest.NewMockOFEntryOperations(ctrl) + bridge := ovsoftest.NewMockBridge(ctrl) + fc := newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap, enableEgressTrafficShaping) + fc.bridge = bridge + fc.featureEgress.enableEgressTrafficShaping = true + fc.ovsMetersAreSupported = true + defer resetPipelines() + + m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1) + m.EXPECT().DeleteAll(gomock.Any()).Return(nil).Times(1) + + meter := ovsoftest.NewMockMeter(ctrl) + meterBuilder := ovsoftest.NewMockMeterBandBuilder(ctrl) + bridge.EXPECT().NewMeter(binding.MeterIDType(meterID), ofctrl.MeterBurst|ofctrl.MeterKbps).Return(meter).Times(1) + meter.EXPECT().MeterBand().Return(meterBuilder).Times(1) + meterBuilder.EXPECT().MeterType(ofctrl.MeterDrop).Return(meterBuilder).Times(1) + meterBuilder.EXPECT().Rate(meterRate).Return(meterBuilder).Times(1) + meterBuilder.EXPECT().Burst(meterBurst).Return(meterBuilder).Times(1) + meterBuilder.EXPECT().Done().Return(meter).Times(1) + meter.EXPECT().Add().Return(nil).Times(1) + + require.NoError(t, fc.InstallEgressQoS(meterID, meterRate, meterBurst)) + + cacheKey := fmt.Sprintf("eq%x", meterID) + fCacheI, ok := fc.featureEgress.cachedFlows.Load(cacheKey) + require.True(t, ok) + assert.ElementsMatch(t, expectedFlows, getFlowStrings(fCacheI)) + + meter.EXPECT().Delete().Return(nil).Times(1) + require.NoError(t, fc.UninstallEgressQoS(meterID)) + _, ok = fc.featureEgress.cachedFlows.Load(cacheKey) + require.False(t, ok) +} + func Test_client_InstallTraceflowFlows(t *testing.T) { type fields struct { } @@ -1911,7 +2008,7 @@ func Test_client_setBasePacketOutBuilder(t *testing.T) { } func prepareSetBasePacketOutBuilder(ctrl *gomock.Controller, success bool) *client { - ofClient := NewClient(bridgeName, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, true, false, false, false, false, false, false, false, false, false, nil, false, defaultPacketInRate) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, true, false, false, false, false, false, false, false, false, false, false, nil, false, defaultPacketInRate) m := ovsoftest.NewMockBridge(ctrl) ofClient.bridge = m bridge := binding.OFBridge{} @@ -2573,11 +2670,16 @@ func Test_client_ReplayFlows(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - - fc := newFakeClient(m, true, false, config.K8sNode, config.TrafficEncapModeEncap, enableTrafficControl, enableMulticast, enableMulticluster) + egressTrafficShaping := false + clientOptions := []clientOptionsFn{enableTrafficControl, enableMulticast, enableMulticluster} + if OVSMetersAreSupported() { + egressTrafficShaping = true + clientOptions = append(clientOptions, enableEgressTrafficShaping) + } + fc := newFakeClient(m, true, false, config.K8sNode, config.TrafficEncapModeEncap, clientOptions...) defer resetPipelines() - expectedFlows := append(pipelineDefaultFlows(false, true, true), egressInitFlows(true)...) + expectedFlows := append(pipelineDefaultFlows(egressTrafficShaping, false, true, true), egressInitFlows(true)...) expectedFlows = append(expectedFlows, multicastInitFlows(true)...) expectedFlows = append(expectedFlows, networkPolicyInitFlows(fc.ovsMetersAreSupported, false, false)...) expectedFlows = append(expectedFlows, podConnectivityInitFlows(config.TrafficEncapModeEncap, false, true, true, true)...) @@ -2596,9 +2698,16 @@ func Test_client_ReplayFlows(t *testing.T) { // Feature Egress replays flows. snatIP := net.ParseIP("192.168.77.100") addFlowInCache(fc.featureEgress.cachedFlows, "egressFlows", []binding.Flow{fc.featureEgress.snatIPFromTunnelFlow(snatIP, uint32(100))}) - replayedFlows = append(replayedFlows, - "cookie=0x1040000000000, table=EgressMark, priority=200,ct_state=+new+trk,ip,tun_dst=192.168.77.100 actions=set_field:0x64/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", - ) + if egressTrafficShaping { + replayedFlows = append(replayedFlows, + "cookie=0x1040000000000, table=EgressMark, priority=200,ct_state=+trk,ip,tun_dst=192.168.77.100 actions=set_field:0x64/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:EgressQoS", + "cookie=0x1040000000000, table=EgressQoS, priority=190 actions=goto_table:L2ForwardingCalc", + ) + } else { + replayedFlows = append(replayedFlows, + "cookie=0x1040000000000, table=EgressMark, priority=200,ct_state=+new+trk,ip,tun_dst=192.168.77.100 actions=set_field:0x64/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", + ) + } // Feature Multicast replays flows. podIP := net.ParseIP("10.10.0.66") podOfPort := uint32(100) @@ -2642,11 +2751,24 @@ func Test_client_ReplayFlows(t *testing.T) { } fc.featureNetworkPolicy.globalConjMatchFlowCache["npMatch"] = context replayedFlows = append(replayedFlows, - "cookie=0x1020000000000, table=IngressRule, priority=200,conj_id=15 actions=set_field:0xf->reg3,set_field:0x400/0x400->reg0,set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x1a/0xff->reg2,group:4", "cookie=0x1020000000000, table=IngressRule, priority=200,reg1=0x64 actions=conjunction(15,2/2)", - "cookie=0x1020000000000, table=IngressDefaultRule, priority=200,reg1=0x64 actions=set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x400000/0x600000->reg0,set_field:0x1b/0xff->reg2,goto_table:Output", "cookie=0x1020000000000, table=IngressMetric, priority=200,reg0=0x400/0x400,reg3=0xf actions=drop", ) + if egressTrafficShaping { + // When egressTrafficShaping is enabled, EgressQoSTable will be initialized, which + // will cause IDs of tables after EgressQoSTable shifted. + // The tableID stored in PacketInTableField need to be added 1: + // set_field:0x1a/0xff->reg2 => set_field:0x1b/0xff->reg2 + replayedFlows = append(replayedFlows, + "cookie=0x1020000000000, table=IngressRule, priority=200,conj_id=15 actions=set_field:0xf->reg3,set_field:0x400/0x400->reg0,set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x1b/0xff->reg2,group:4", + "cookie=0x1020000000000, table=IngressDefaultRule, priority=200,reg1=0x64 actions=set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x400000/0x600000->reg0,set_field:0x1c/0xff->reg2,goto_table:Output", + ) + } else { + replayedFlows = append(replayedFlows, + "cookie=0x1020000000000, table=IngressRule, priority=200,conj_id=15 actions=set_field:0xf->reg3,set_field:0x400/0x400->reg0,set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x1a/0xff->reg2,group:4", + "cookie=0x1020000000000, table=IngressDefaultRule, priority=200,reg1=0x64 actions=set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x400000/0x600000->reg0,set_field:0x1b/0xff->reg2,goto_table:Output", + ) + } // Feature Pod connectivity replays flows. podMAC, _ := net.ParseMAC("00:00:10:10:00:66") @@ -2701,6 +2823,26 @@ func Test_client_ReplayFlows(t *testing.T) { // on the flows and groups. bridge := ovsoftest.NewMockBridge(ctrl) fc.bridge = bridge + egressMeterID := uint32(1) + egressMeterRate := uint32(100) + egressMeterBurst := uint32(200) + expectNewMeter := func(id, rate, burst uint32, unit ofctrl.MeterFlag, isCached bool) { + meter := ovsoftest.NewMockMeter(ctrl) + meterBuilder := ovsoftest.NewMockMeterBandBuilder(ctrl) + bridge.EXPECT().NewMeter(binding.MeterIDType(id), ofctrl.MeterBurst|unit).Return(meter).Times(1) + meter.EXPECT().MeterBand().Return(meterBuilder).Times(1) + meterBuilder.EXPECT().MeterType(ofctrl.MeterDrop).Return(meterBuilder).Times(1) + meterBuilder.EXPECT().Rate(rate).Return(meterBuilder).Times(1) + meterBuilder.EXPECT().Burst(burst).Return(meterBuilder).Times(1) + meterBuilder.EXPECT().Done().Return(meter).Times(1) + if isCached { + meter.EXPECT().Reset().Times(1) + } + meter.EXPECT().Add().Return(nil).Times(1) + } + expectNewMeter(egressMeterID, egressMeterRate, egressMeterBurst, ofctrl.MeterKbps, true) + egressMeter := fc.genOFMeter(binding.MeterIDType(egressMeterID), ofctrl.MeterBurst|ofctrl.MeterKbps, egressMeterRate, egressMeterBurst) + fc.featureEgress.cachedMeter.Store(egressMeterID, egressMeter) for _, meterCfg := range []struct { id binding.MeterIDType rate uint32 @@ -2709,15 +2851,7 @@ func Test_client_ReplayFlows(t *testing.T) { {id: PacketInMeterIDTF, rate: uint32(defaultPacketInRate)}, {id: PacketInMeterIDDNS, rate: uint32(defaultPacketInRate)}, } { - meter := ovsoftest.NewMockMeter(ctrl) - meterBuilder := ovsoftest.NewMockMeterBandBuilder(ctrl) - bridge.EXPECT().NewMeter(meterCfg.id, ofctrl.MeterBurst|ofctrl.MeterPktps).Return(meter).Times(1) - meter.EXPECT().MeterBand().Return(meterBuilder).Times(1) - meterBuilder.EXPECT().MeterType(ofctrl.MeterDrop).Return(meterBuilder).Times(1) - meterBuilder.EXPECT().Rate(meterCfg.rate).Return(meterBuilder).Times(1) - meterBuilder.EXPECT().Burst(2 * meterCfg.rate).Return(meterBuilder).Times(1) - meterBuilder.EXPECT().Done().Return(meter).Times(1) - meter.EXPECT().Add().Return(nil).Times(1) + expectNewMeter(uint32(meterCfg.id), meterCfg.rate, meterCfg.rate*2, ofctrl.MeterPktps, false) } } diff --git a/pkg/agent/openflow/egress.go b/pkg/agent/openflow/egress.go index f5f42d3276e..b2f218ce71a 100644 --- a/pkg/agent/openflow/egress.go +++ b/pkg/agent/openflow/egress.go @@ -16,6 +16,7 @@ package openflow import ( "net" + "sync" "antrea.io/libOpenflow/openflow15" @@ -29,12 +30,14 @@ type featureEgress struct { ipProtocols []binding.Protocol cachedFlows *flowCategoryCache + cachedMeter sync.Map exceptCIDRs map[binding.Protocol][]net.IPNet nodeIPs map[binding.Protocol]net.IP gatewayMAC net.HardwareAddr - category cookie.Category + category cookie.Category + enableEgressTrafficShaping bool } func (f *featureEgress) getFeatureName() string { @@ -44,7 +47,8 @@ func (f *featureEgress) getFeatureName() string { func newFeatureEgress(cookieAllocator cookie.Allocator, ipProtocols []binding.Protocol, nodeConfig *config.NodeConfig, - egressConfig *config.EgressConfig) *featureEgress { + egressConfig *config.EgressConfig, + enableEgressTrafficShaping bool) *featureEgress { exceptCIDRs := make(map[binding.Protocol][]net.IPNet) for _, cidr := range egressConfig.ExceptCIDRs { if cidr.IP.To4() == nil { @@ -63,20 +67,26 @@ func newFeatureEgress(cookieAllocator cookie.Allocator, } } return &featureEgress{ - cachedFlows: newFlowCategoryCache(), - cookieAllocator: cookieAllocator, - exceptCIDRs: exceptCIDRs, - ipProtocols: ipProtocols, - nodeIPs: nodeIPs, - gatewayMAC: nodeConfig.GatewayConfig.MAC, - category: cookie.Egress, + cachedFlows: newFlowCategoryCache(), + cachedMeter: sync.Map{}, + cookieAllocator: cookieAllocator, + exceptCIDRs: exceptCIDRs, + ipProtocols: ipProtocols, + nodeIPs: nodeIPs, + gatewayMAC: nodeConfig.GatewayConfig.MAC, + category: cookie.Egress, + enableEgressTrafficShaping: enableEgressTrafficShaping, } } func (f *featureEgress) initFlows() []*openflow15.FlowMod { // This installs the flows to enable Pods to communicate to the external IP addresses. The flows identify the packets // from local Pods to the external IP address, and mark the packets to be SNAT'd with the configured SNAT IPs. - return GetFlowModMessages(f.externalFlows(), binding.AddMessage) + initialFlows := f.externalFlows() + if f.enableEgressTrafficShaping { + initialFlows = append(initialFlows, f.egressQoSDefaultFlow()) + } + return GetFlowModMessages(initialFlows, binding.AddMessage) } func (f *featureEgress) replayFlows() []*openflow15.FlowMod { @@ -90,3 +100,14 @@ func (f *featureEgress) initGroups() []binding.OFEntry { func (f *featureEgress) replayGroups() []binding.OFEntry { return nil } + +func (f *featureEgress) replayMeters() []binding.OFEntry { + var meters []binding.OFEntry + f.cachedMeter.Range(func(id, value interface{}) bool { + meter := value.(binding.Meter) + meter.Reset() + meters = append(meters, meter) + return true + }) + return meters +} diff --git a/pkg/agent/openflow/externalnode_connectivity.go b/pkg/agent/openflow/externalnode_connectivity.go index 8d1ec850e60..a9cf6e718c5 100644 --- a/pkg/agent/openflow/externalnode_connectivity.go +++ b/pkg/agent/openflow/externalnode_connectivity.go @@ -161,6 +161,10 @@ func (f *featureExternalNodeConnectivity) replayGroups() []binding.OFEntry { return nil } +func (f *featureExternalNodeConnectivity) replayMeters() []binding.OFEntry { + return nil +} + func (f *featureExternalNodeConnectivity) policyBypassFlow(protocol binding.Protocol, ipNet *net.IPNet, port uint16, isIngress bool) binding.Flow { cookieID := f.cookieAllocator.Request(f.category).Raw() var flowBuilder binding.FlowBuilder diff --git a/pkg/agent/openflow/framework.go b/pkg/agent/openflow/framework.go index f26dcf6c952..26320404a48 100644 --- a/pkg/agent/openflow/framework.go +++ b/pkg/agent/openflow/framework.go @@ -61,6 +61,8 @@ type feature interface { initGroups() []binding.OFEntry // initFlows returns the Openflow messages of initial flows of the feature. initFlows() []*openflow15.FlowMod + // replayMeters returns the fixed and cached Openflow meters that need to be replayed after OVS is reconnected. + replayMeters() []binding.OFEntry // replayGroups returns the fixed and cached Openflow groups that need to be replayed after OVS is reconnected. replayGroups() []binding.OFEntry // replayFlows returns the Openflow messages of fixed and cached flows that need to be replayed after OVS is reconnected. @@ -271,10 +273,14 @@ func (f *featureService) getRequiredTables() []*Table { } func (f *featureEgress) getRequiredTables() []*Table { - return []*Table{ + tables := []*Table{ L3ForwardingTable, EgressMarkTable, } + if f.enableEgressTrafficShaping { + tables = append(tables, EgressQoSTable) + } + return tables } func (f *featureMulticast) getRequiredTables() []*Table { diff --git a/pkg/agent/openflow/meters_linux.go b/pkg/agent/openflow/meters_linux.go index 8807b40d3d8..237e1065fb1 100644 --- a/pkg/agent/openflow/meters_linux.go +++ b/pkg/agent/openflow/meters_linux.go @@ -24,7 +24,7 @@ import ( "antrea.io/antrea/pkg/util/runtime" ) -func ovsMetersAreSupported() bool { +func OVSMetersAreSupported() bool { // According to the OVS documentation, meters are supported in the kernel module since 4.15 // (https://docs.openvswitch.org/en/latest/faq/releases/). However, it turns out that // because of a bug meters cannot be used with kernel versions older than 4.18, which is diff --git a/pkg/agent/openflow/meters_others.go b/pkg/agent/openflow/meters_others.go index 61d9a7b6a50..c4cd41185d3 100644 --- a/pkg/agent/openflow/meters_others.go +++ b/pkg/agent/openflow/meters_others.go @@ -17,7 +17,7 @@ package openflow -func ovsMetersAreSupported() bool { +func OVSMetersAreSupported() bool { // TODO: revisit after Windows OVS supports OpenFlow meters. return false } diff --git a/pkg/agent/openflow/multicast.go b/pkg/agent/openflow/multicast.go index 5cc90b79e5c..0cf1384d51c 100644 --- a/pkg/agent/openflow/multicast.go +++ b/pkg/agent/openflow/multicast.go @@ -196,6 +196,10 @@ func (f *featureMulticast) initGroups() []binding.OFEntry { return nil } +func (f *featureMulticast) replayMeters() []binding.OFEntry { + return nil +} + func (f *featureMulticast) multicastRemoteReportFlows(groupID binding.GroupIDType, firstMulticastTable binding.Table) []binding.Flow { return []binding.Flow{ // This flow outputs the IGMP report message sent from Antrea Agent to an OpenFlow group which is expected to diff --git a/pkg/agent/openflow/multicluster.go b/pkg/agent/openflow/multicluster.go index a6ee9b2e909..67dcb2d5a46 100644 --- a/pkg/agent/openflow/multicluster.go +++ b/pkg/agent/openflow/multicluster.go @@ -78,6 +78,10 @@ func (f *featureMulticluster) replayGroups() []binding.OFEntry { return nil } +func (f *featureMulticluster) replayMeters() []binding.OFEntry { + return nil +} + func (f *featureMulticluster) l3FwdFlowToRemoteGateway( localGatewayMAC net.HardwareAddr, peerServiceCIDR net.IPNet, diff --git a/pkg/agent/openflow/network_policy.go b/pkg/agent/openflow/network_policy.go index e2ce2390d70..8a8c96a344d 100644 --- a/pkg/agent/openflow/network_policy.go +++ b/pkg/agent/openflow/network_policy.go @@ -2301,6 +2301,10 @@ func (f *featureNetworkPolicy) initGroups() []binding.OFEntry { return groups } +func (f *featureNetworkPolicy) replayMeters() []binding.OFEntry { + return nil +} + func (f *featureNetworkPolicy) getLoggingAndResubmitGroupID(nextTable uint8) binding.GroupIDType { groupKey := fmt.Sprintf("%d", nextTable) group, _ := f.loggingGroupCache.Load(groupKey) diff --git a/pkg/agent/openflow/network_policy_test.go b/pkg/agent/openflow/network_policy_test.go index 9dce4d3b6a3..c7bff73bffc 100644 --- a/pkg/agent/openflow/network_policy_test.go +++ b/pkg/agent/openflow/network_policy_test.go @@ -1373,13 +1373,13 @@ func networkPolicyInitFlows(ovsMeterSupported, externalNodeEnabled, l7NetworkPol } if ovsMeterSupported { loggingFlows = []string{ - "cookie=0x1020000000000, table=Output, priority=200,reg0=0x2400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.01,max_len=128)", - "cookie=0x1020000000000, table=Output, priority=200,reg0=0x4400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.02,max_len=128)", - "cookie=0x1020000000000, table=Output, priority=200,reg0=0x6400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.03,max_len=128)", - "cookie=0x1020000000000, table=Output, priority=200,reg0=0x8400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.04,max_len=128)", - "cookie=0x1020000000000, table=Output, priority=200,reg0=0xa400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.05,max_len=128)", - "cookie=0x1020000000000, table=Output, priority=200,reg0=0xc400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.06,max_len=128)", - "cookie=0x1020000000000, table=Output, priority=200,reg0=0xe400000/0xfe600000 actions=meter:1,controller(id=32776,reason=no_match,userdata=01.07,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0x2400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.01,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0x4400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.02,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0x6400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.03,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0x8400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.04,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0xa400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.05,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0xc400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.06,max_len=128)", + "cookie=0x1020000000000, table=Output, priority=200,reg0=0xe400000/0xfe600000 actions=meter:256,controller(id=32776,reason=no_match,userdata=01.07,max_len=128)", } } if externalNodeEnabled { @@ -1410,7 +1410,7 @@ func networkPolicyInitFlows(ovsMeterSupported, externalNodeEnabled, l7NetworkPol } func Test_featureNetworkPolicy_initFlows(t *testing.T) { - ovsMetersSupported := ovsMetersAreSupported() + ovsMetersSupported := OVSMetersAreSupported() testCases := []struct { name string nodeType config.NodeType @@ -1447,7 +1447,7 @@ func Test_featureNetworkPolicy_initFlows(t *testing.T) { } func Test_NewDNSPacketInConjunction(t *testing.T) { - ovsMetersSupported := ovsMetersAreSupported() + ovsMetersSupported := OVSMetersAreSupported() ipv4ExpFlows := []string{ "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp,tp_src=53 actions=conjunction(1,1/2)", @@ -1455,7 +1455,7 @@ func Test_NewDNSPacketInConjunction(t *testing.T) { } if ovsMetersSupported { ipv4ExpFlows = []string{ - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:3,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:258,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp,tp_src=53 actions=conjunction(1,1/2)", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp,tp_src=53 actions=conjunction(1,1/2)", } @@ -1467,7 +1467,7 @@ func Test_NewDNSPacketInConjunction(t *testing.T) { } if ovsMetersSupported { ipv6ExpFlows = []string{ - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:3,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:258,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp6,tp_src=53 actions=conjunction(1,1/2)", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp6,tp_src=53 actions=conjunction(1,1/2)", } @@ -1481,7 +1481,7 @@ func Test_NewDNSPacketInConjunction(t *testing.T) { } if ovsMetersSupported { dsExpFlows = []string{ - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:3,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:258,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp,tp_src=53 actions=conjunction(1,1/2)", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp,tp_src=53 actions=conjunction(1,1/2)", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp6,tp_src=53 actions=conjunction(1,1/2)", diff --git a/pkg/agent/openflow/packetin.go b/pkg/agent/openflow/packetin.go index e961ba3877e..e4189760722 100644 --- a/pkg/agent/openflow/packetin.go +++ b/pkg/agent/openflow/packetin.go @@ -78,9 +78,12 @@ const ( // We use OpenFlow Meter for packetIn rate limiting on OVS side. // Meter Entry ID. - PacketInMeterIDNP = 1 - PacketInMeterIDTF = 2 - PacketInMeterIDDNS = 3 + // 1-255 are reserved for Egress QoS. The Egress QoS meterID leverage the same + // value as the mark allocated to the EgressIP and Antrea limits the number of + // Egress IPs per Node to 255, hence the reserved meter ID range is 1-255. + PacketInMeterIDNP = 256 + PacketInMeterIDTF = 257 + PacketInMeterIDDNS = 258 ) // RegisterPacketInHandler stores controller handler in a map with category as keys. diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 3219d96c1a7..053d9fa6c1f 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -157,6 +157,7 @@ var ( // Tables in stageRouting: L3ForwardingTable = newTable("L3Forwarding", stageRouting, pipelineIP) EgressMarkTable = newTable("EgressMark", stageRouting, pipelineIP) + EgressQoSTable = newTable("EgressQoS", stageRouting, pipelineIP) L3DecTTLTable = newTable("L3DecTTL", stageRouting, pipelineIP) // Tables in stagePostRouting: @@ -419,23 +420,24 @@ type flowCategoryCache struct { } type client struct { - enableProxy bool - proxyAll bool - enableDSR bool - enableAntreaPolicy bool - enableL7NetworkPolicy bool - enableDenyTracking bool - enableEgress bool - enableMulticast bool - enableTrafficControl bool - enableMulticluster bool - enablePrometheusMetrics bool - connectUplinkToBridge bool - nodeType config.NodeType - roundInfo types.RoundInfo - cookieAllocator cookie.Allocator - bridge binding.Bridge - groupIDAllocator GroupAllocator + enableProxy bool + proxyAll bool + enableDSR bool + enableAntreaPolicy bool + enableL7NetworkPolicy bool + enableDenyTracking bool + enableEgress bool + enableEgressTrafficShaping bool + enableMulticast bool + enableTrafficControl bool + enableMulticluster bool + enablePrometheusMetrics bool + connectUplinkToBridge bool + nodeType config.NodeType + roundInfo types.RoundInfo + cookieAllocator cookie.Allocator + bridge binding.Bridge + groupIDAllocator GroupAllocator featurePodConnectivity *featurePodConnectivity featureService *featureService @@ -2311,16 +2313,21 @@ func (f *featureEgress) snatSkipNodeFlow(nodeIP net.IP) binding.Flow { // packet's tunnel destination IP. func (f *featureEgress) snatIPFromTunnelFlow(snatIP net.IP, mark uint32) binding.Flow { ipProtocol := getIPProtocol(snatIP) - return EgressMarkTable.ofTable.BuildFlow(priorityNormal). + fb := EgressMarkTable.ofTable.BuildFlow(priorityNormal). Cookie(f.cookieAllocator.Request(f.category).Raw()). MatchProtocol(ipProtocol). - MatchCTStateNew(true). MatchCTStateTrk(true). MatchTunnelDst(snatIP). Action().LoadPktMarkRange(mark, snatPktMarkRange). - Action().LoadRegMark(ToGatewayRegMark). - Action().GotoStage(stageSwitching). - Done() + Action().LoadRegMark(ToGatewayRegMark) + if f.enableEgressTrafficShaping { + // To apply rate-limit on all traffic, instead of just the first one, remove ct_state=+new. + fb = fb.Action().GotoTable(EgressQoSTable.GetID()) + } else { + fb = fb.MatchCTStateNew(true). + Action().GotoStage(stageSwitching) + } + return fb.Done() } // snatRuleFlow generates the flow that applies the SNAT rule for a local Pod. If the SNAT IP exists on the local Node, @@ -2331,16 +2338,21 @@ func (f *featureEgress) snatRuleFlow(ofPort uint32, snatIP net.IP, snatMark uint ipProtocol := getIPProtocol(snatIP) if snatMark != 0 { // Local SNAT IP. - return EgressMarkTable.ofTable.BuildFlow(priorityNormal). + fb := EgressMarkTable.ofTable.BuildFlow(priorityNormal). Cookie(cookieID). MatchProtocol(ipProtocol). - MatchCTStateNew(true). MatchCTStateTrk(true). MatchInPort(ofPort). Action().LoadPktMarkRange(snatMark, snatPktMarkRange). - Action().LoadRegMark(ToGatewayRegMark). - Action().GotoStage(stageSwitching). - Done() + Action().LoadRegMark(ToGatewayRegMark) + if f.enableEgressTrafficShaping { + // To apply rate-limit on all traffic, instead of just the first one, remove ct_state=+new. + fb = fb.Action().GotoTable(EgressQoSTable.GetID()) + } else { + fb = fb.MatchCTStateNew(true). + Action().GotoStage(stageSwitching) + } + return fb.Done() } // SNAT IP should be on a remote Node. return EgressMarkTable.ofTable.BuildFlow(priorityNormal). @@ -2355,6 +2367,22 @@ func (f *featureEgress) snatRuleFlow(ofPort uint32, snatIP net.IP, snatMark uint Done() } +func (f *featureEgress) egressQoSFlow(mark uint32) binding.Flow { + return EgressQoSTable.ofTable.BuildFlow(priorityNormal). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchPktMark(mark, &types.SNATIPMarkMask). + Action().Meter(mark). + Action().GotoStage(stageSwitching). + Done() +} + +func (f *featureEgress) egressQoSDefaultFlow() binding.Flow { + return EgressQoSTable.ofTable.BuildFlow(priorityLow). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + Action().GotoStage(stageSwitching). + Done() +} + // nodePortMarkFlows generates the flows to mark the first packet of Service NodePort connection with ToNodePortAddressRegMark, // which indicates the Service type is NodePort. func (f *featureService) nodePortMarkFlows() []binding.Flow { @@ -2757,15 +2785,14 @@ func priorityIndexFunc(obj interface{}) ([]string, error) { return conj.ActionFlowPriorities(), nil } -// genPacketInMeter generates a meter entry with specific meterID and rate. -// `rate` is represented as number of packets per second. -// Packets which exceed the rate will be dropped. -func (c *client) genPacketInMeter(meterID binding.MeterIDType, rate uint32) binding.Meter { - meter := c.bridge.NewMeter(meterID, ofctrl.MeterBurst|ofctrl.MeterPktps). +// genOFMeter generates a meter entry with specific meterID, meterFlag, rate and +// burst. Packets which exceed the rate will be dropped. +func (c *client) genOFMeter(meterID binding.MeterIDType, meterFlags ofctrl.MeterFlag, rate uint32, burst uint32) binding.Meter { + meter := c.bridge.NewMeter(meterID, meterFlags). MeterBand(). MeterType(ofctrl.MeterDrop). Rate(rate). - Burst(2 * rate). + Burst(burst). Done() return meter } @@ -2896,6 +2923,7 @@ func NewClient(bridgeName string, enableAntreaPolicy bool, enableL7NetworkPolicy bool, enableEgress bool, + enableEgressTrafficShaping bool, enableDenyTracking bool, proxyAll bool, enableDSR bool, @@ -2909,26 +2937,27 @@ func NewClient(bridgeName string, ) *client { bridge := binding.NewOFBridge(bridgeName, mgmtAddr) c := &client{ - bridge: bridge, - nodeIPChecker: nodeIPCheck, - enableProxy: enableProxy, - proxyAll: proxyAll, - enableDSR: enableDSR, - enableAntreaPolicy: enableAntreaPolicy, - enableL7NetworkPolicy: enableL7NetworkPolicy, - enableDenyTracking: enableDenyTracking, - enableEgress: enableEgress, - enableMulticast: enableMulticast, - enableTrafficControl: enableTrafficControl, - enableMulticluster: enableMulticluster, - enablePrometheusMetrics: enablePrometheusMetrics, - connectUplinkToBridge: connectUplinkToBridge, - pipelines: make(map[binding.PipelineID]binding.Pipeline), - packetInHandlers: map[uint8]PacketInHandler{}, - ovsctlClient: ovsctl.NewClient(bridgeName), - ovsMetersAreSupported: ovsMetersAreSupported(), - packetInRate: packetInRate, - groupIDAllocator: groupIDAllocator, + bridge: bridge, + nodeIPChecker: nodeIPCheck, + enableProxy: enableProxy, + proxyAll: proxyAll, + enableDSR: enableDSR, + enableAntreaPolicy: enableAntreaPolicy, + enableL7NetworkPolicy: enableL7NetworkPolicy, + enableDenyTracking: enableDenyTracking, + enableEgress: enableEgress, + enableEgressTrafficShaping: enableEgressTrafficShaping, + enableMulticast: enableMulticast, + enableTrafficControl: enableTrafficControl, + enableMulticluster: enableMulticluster, + enablePrometheusMetrics: enablePrometheusMetrics, + connectUplinkToBridge: connectUplinkToBridge, + pipelines: make(map[binding.PipelineID]binding.Pipeline), + packetInHandlers: map[uint8]PacketInHandler{}, + ovsctlClient: ovsctl.NewClient(bridgeName), + ovsMetersAreSupported: OVSMetersAreSupported(), + packetInRate: packetInRate, + groupIDAllocator: groupIDAllocator, } c.ofEntryOperations = c return c diff --git a/pkg/agent/openflow/pipeline_test.go b/pkg/agent/openflow/pipeline_test.go index 5bc1d256b4a..b7f5eb307c6 100644 --- a/pkg/agent/openflow/pipeline_test.go +++ b/pkg/agent/openflow/pipeline_test.go @@ -24,7 +24,7 @@ import ( oftest "antrea.io/antrea/pkg/agent/openflow/testing" ) -func pipelineDefaultFlows(externalNodeEnabled, isEncap, isIPv4 bool) []string { +func pipelineDefaultFlows(egressTrafficShapingEnabled, externalNodeEnabled, isEncap, isIPv4 bool) []string { if externalNodeEnabled { return []string{ "cookie=0x1000000000000, table=PipelineRootClassifier, priority=200,ip actions=goto_table:ConntrackZone", @@ -69,7 +69,6 @@ func pipelineDefaultFlows(externalNodeEnabled, isEncap, isIPv4 bool) []string { "cookie=0x1000000000000, table=EgressDefaultRule, priority=0 actions=goto_table:EgressMetric", "cookie=0x1000000000000, table=EgressMetric, priority=0 actions=goto_table:L3Forwarding", "cookie=0x1000000000000, table=L3Forwarding, priority=0 actions=goto_table:EgressMark", - "cookie=0x1000000000000, table=EgressMark, priority=0 actions=goto_table:L3DecTTL", "cookie=0x1000000000000, table=L3DecTTL, priority=0 actions=goto_table:SNATMark", "cookie=0x1000000000000, table=SNATMark, priority=0 actions=goto_table:SNAT", "cookie=0x1000000000000, table=SNAT, priority=0 actions=goto_table:L2ForwardingCalc", @@ -106,6 +105,16 @@ func pipelineDefaultFlows(externalNodeEnabled, isEncap, isIPv4 bool) []string { "cookie=0x1000000000000, table=IPv6, priority=0 actions=goto_table:UnSNAT", ) } + if egressTrafficShapingEnabled { + flows = append(flows, + "cookie=0x1000000000000, table=EgressMark, priority=0 actions=goto_table:EgressQoS", + "cookie=0x1000000000000, table=EgressQoS, priority=0 actions=goto_table:L3DecTTL", + ) + } else { + flows = append(flows, + "cookie=0x1000000000000, table=EgressMark, priority=0 actions=goto_table:L3DecTTL", + ) + } } else { flows = []string{ "cookie=0x1000000000000, table=PipelineRootClassifier, priority=200,arp actions=goto_table:ARPSpoofGuard", @@ -158,13 +167,14 @@ func pipelineDefaultFlows(externalNodeEnabled, isEncap, isIPv4 bool) []string { func Test_client_defaultFlows(t *testing.T) { testCases := []struct { - name string - enableIPv4 bool - enableIPv6 bool - nodeType config.NodeType - trafficEncapMode config.TrafficEncapModeType - clientOptions []clientOptionsFn - expectedFlows []string + name string + enableIPv4 bool + enableIPv6 bool + nodeType config.NodeType + trafficEncapMode config.TrafficEncapModeType + clientOptions []clientOptionsFn + requireMeterSupport bool + expectedFlows []string }{ { name: "IPv4,Encap,K8s Node", @@ -172,7 +182,16 @@ func Test_client_defaultFlows(t *testing.T) { nodeType: config.K8sNode, trafficEncapMode: config.TrafficEncapModeEncap, clientOptions: []clientOptionsFn{enableTrafficControl, enableMulticast, enableMulticluster}, - expectedFlows: pipelineDefaultFlows(false, true, true), + expectedFlows: pipelineDefaultFlows(false, false, true, true), + }, + { + name: "IPv4,Encap,K8s Node,EgressTrafficShaping", + enableIPv4: true, + nodeType: config.K8sNode, + trafficEncapMode: config.TrafficEncapModeEncap, + clientOptions: []clientOptionsFn{enableTrafficControl, enableMulticast, enableMulticluster, enableEgressTrafficShaping}, + requireMeterSupport: true, + expectedFlows: pipelineDefaultFlows(true, false, true, true), }, { name: "IPv4,NoEncap,K8s Node", @@ -180,7 +199,7 @@ func Test_client_defaultFlows(t *testing.T) { nodeType: config.K8sNode, trafficEncapMode: config.TrafficEncapModeNoEncap, clientOptions: []clientOptionsFn{enableTrafficControl, enableMulticast, enableConnectUplinkToBridge}, - expectedFlows: pipelineDefaultFlows(false, false, true), + expectedFlows: pipelineDefaultFlows(false, false, false, true), }, { name: "IPv6,K8s Node", @@ -188,17 +207,20 @@ func Test_client_defaultFlows(t *testing.T) { trafficEncapMode: config.TrafficEncapModeEncap, clientOptions: []clientOptionsFn{enableTrafficControl}, nodeType: config.K8sNode, - expectedFlows: pipelineDefaultFlows(false, true, false), + expectedFlows: pipelineDefaultFlows(false, false, true, false), }, { name: "IPv4,ExternalNode Node", enableIPv4: true, nodeType: config.ExternalNode, - expectedFlows: pipelineDefaultFlows(true, false, false), + expectedFlows: pipelineDefaultFlows(false, true, false, false), }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + if tc.requireMeterSupport && !OVSMetersAreSupported() { + t.Skipf("Skip test because OVS meters are not supported") + } ctrl := gomock.NewController(t) m := oftest.NewMockOFEntryOperations(ctrl) diff --git a/pkg/agent/openflow/pod_connectivity.go b/pkg/agent/openflow/pod_connectivity.go index 98ae8ab82e7..7173225a14c 100644 --- a/pkg/agent/openflow/pod_connectivity.go +++ b/pkg/agent/openflow/pod_connectivity.go @@ -282,3 +282,7 @@ func (f *featurePodConnectivity) initGroups() []binding.OFEntry { func (f *featurePodConnectivity) replayGroups() []binding.OFEntry { return nil } + +func (f *featurePodConnectivity) replayMeters() []binding.OFEntry { + return nil +} diff --git a/pkg/agent/openflow/service.go b/pkg/agent/openflow/service.go index 9c37988dcdd..3a2c372c2ae 100644 --- a/pkg/agent/openflow/service.go +++ b/pkg/agent/openflow/service.go @@ -197,3 +197,7 @@ func (f *featureService) replayGroups() []binding.OFEntry { func (f *featureService) initGroups() []binding.OFEntry { return nil } + +func (f *featureService) replayMeters() []binding.OFEntry { + return nil +} diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 8a21e47e960..af1e9a34cc2 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -265,6 +265,20 @@ func (mr *MockClientMockRecorder) Initialize(arg0, arg1, arg2, arg3, arg4, arg5 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Initialize", reflect.TypeOf((*MockClient)(nil).Initialize), arg0, arg1, arg2, arg3, arg4, arg5) } +// InstallEgressQoS mocks base method. +func (m *MockClient) InstallEgressQoS(arg0, arg1, arg2 uint32) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallEgressQoS", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallEgressQoS indicates an expected call of InstallEgressQoS. +func (mr *MockClientMockRecorder) InstallEgressQoS(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallEgressQoS", reflect.TypeOf((*MockClient)(nil).InstallEgressQoS), arg0, arg1, arg2) +} + // InstallEndpointFlows mocks base method. func (m *MockClient) InstallEndpointFlows(arg0 openflow.Protocol, arg1 []proxy.Endpoint) error { m.ctrl.T.Helper() @@ -833,6 +847,20 @@ func (mr *MockClientMockRecorder) SubscribePacketIn(arg0, arg1 any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribePacketIn", reflect.TypeOf((*MockClient)(nil).SubscribePacketIn), arg0, arg1) } +// UninstallEgressQoS mocks base method. +func (m *MockClient) UninstallEgressQoS(arg0 uint32) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UninstallEgressQoS", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// UninstallEgressQoS indicates an expected call of UninstallEgressQoS. +func (mr *MockClientMockRecorder) UninstallEgressQoS(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallEgressQoS", reflect.TypeOf((*MockClient)(nil).UninstallEgressQoS), arg0) +} + // UninstallEndpointFlows mocks base method. func (m *MockClient) UninstallEndpointFlows(arg0 openflow.Protocol, arg1 []proxy.Endpoint) error { m.ctrl.T.Helper() diff --git a/pkg/agent/openflow/traceflow.go b/pkg/agent/openflow/traceflow.go index 451710d4794..16d446d40f0 100644 --- a/pkg/agent/openflow/traceflow.go +++ b/pkg/agent/openflow/traceflow.go @@ -49,3 +49,7 @@ func (f *featureTraceflow) initGroups() []binding.OFEntry { func (f *featureTraceflow) replayGroups() []binding.OFEntry { return nil } + +func (f *featureTraceflow) replayMeters() []binding.OFEntry { + return nil +} diff --git a/pkg/apis/crd/v1beta1/types.go b/pkg/apis/crd/v1beta1/types.go index 9c94d4ee4a3..08ce6248a22 100644 --- a/pkg/apis/crd/v1beta1/types.go +++ b/pkg/apis/crd/v1beta1/types.go @@ -880,6 +880,15 @@ type EgressSpec struct { // same index in EgressIPs and ExternalIPPools are correlated. // Cannot be set with ExternalIPPool. ExternalIPPools []string `json:"externalIPPools,omitempty"` + // Bandwidth specifies the rate limit of north-south egress traffic of this Egress. + Bandwidth *Bandwidth `json:"bandwidth,omitempty"` +} + +type Bandwidth struct { + // Rate specifies the maximum traffic rate. e.g. 300k, 10M + Rate string `json:"rate"` + // Burst specifies the maximum burst size when traffic exceeds the rate. e.g. 300k, 10M + Burst string `json:"burst"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/crd/v1beta1/zz_generated.deepcopy.go b/pkg/apis/crd/v1beta1/zz_generated.deepcopy.go index e1b8f17f2a0..25d2e149bb8 100644 --- a/pkg/apis/crd/v1beta1/zz_generated.deepcopy.go +++ b/pkg/apis/crd/v1beta1/zz_generated.deepcopy.go @@ -234,6 +234,22 @@ func (in *AppliedTo) DeepCopy() *AppliedTo { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Bandwidth) DeepCopyInto(out *Bandwidth) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Bandwidth. +func (in *Bandwidth) DeepCopy() *Bandwidth { + if in == nil { + return nil + } + out := new(Bandwidth) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusterGroup) DeepCopyInto(out *ClusterGroup) { *out = *in @@ -518,6 +534,11 @@ func (in *EgressSpec) DeepCopyInto(out *EgressSpec) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.Bandwidth != nil { + in, out := &in.Bandwidth, &out.Bandwidth + *out = new(Bandwidth) + **out = **in + } return } diff --git a/pkg/apiserver/handlers/featuregates/handler_test.go b/pkg/apiserver/handlers/featuregates/handler_test.go index 478d4c78a3e..e7a3d9c1f76 100644 --- a/pkg/apiserver/handlers/featuregates/handler_test.go +++ b/pkg/apiserver/handlers/featuregates/handler_test.go @@ -56,6 +56,7 @@ func Test_getGatesResponse(t *testing.T) { {Component: "agent", Name: "AntreaProxy", Status: "Enabled", Version: "GA"}, {Component: "agent", Name: "CleanupStaleUDPSvcConntrack", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "Egress", Status: egressStatus, Version: "BETA"}, + {Component: "agent", Name: "EgressTrafficShaping", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "EndpointSlice", Status: "Enabled", Version: "GA"}, {Component: "agent", Name: "ExternalNode", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "FlowExporter", Status: "Disabled", Version: "ALPHA"}, diff --git a/pkg/apiserver/openapi/zz_generated.openapi.go b/pkg/apiserver/openapi/zz_generated.openapi.go index 058ae6946b6..ea9e911f76e 100644 --- a/pkg/apiserver/openapi/zz_generated.openapi.go +++ b/pkg/apiserver/openapi/zz_generated.openapi.go @@ -80,6 +80,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "antrea.io/antrea/pkg/apis/crd/v1beta1.AntreaControllerInfo": schema_pkg_apis_crd_v1beta1_AntreaControllerInfo(ref), "antrea.io/antrea/pkg/apis/crd/v1beta1.AntreaControllerInfoList": schema_pkg_apis_crd_v1beta1_AntreaControllerInfoList(ref), "antrea.io/antrea/pkg/apis/crd/v1beta1.AppliedTo": schema_pkg_apis_crd_v1beta1_AppliedTo(ref), + "antrea.io/antrea/pkg/apis/crd/v1beta1.Bandwidth": schema_pkg_apis_crd_v1beta1_Bandwidth(ref), "antrea.io/antrea/pkg/apis/crd/v1beta1.ClusterGroup": schema_pkg_apis_crd_v1beta1_ClusterGroup(ref), "antrea.io/antrea/pkg/apis/crd/v1beta1.ClusterGroupList": schema_pkg_apis_crd_v1beta1_ClusterGroupList(ref), "antrea.io/antrea/pkg/apis/crd/v1beta1.ClusterNetworkPolicy": schema_pkg_apis_crd_v1beta1_ClusterNetworkPolicy(ref), @@ -2989,6 +2990,35 @@ func schema_pkg_apis_crd_v1beta1_AppliedTo(ref common.ReferenceCallback) common. } } +func schema_pkg_apis_crd_v1beta1_Bandwidth(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "rate": { + SchemaProps: spec.SchemaProps{ + Description: "Rate specifies the maximum traffic rate. e.g. 300k, 10M", + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + "burst": { + SchemaProps: spec.SchemaProps{ + Description: "Burst specifies the maximum burst size when traffic exceeds the rate. e.g. 300k, 10M", + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"rate", "burst"}, + }, + }, + } +} + func schema_pkg_apis_crd_v1beta1_ClusterGroup(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -3551,12 +3581,18 @@ func schema_pkg_apis_crd_v1beta1_EgressSpec(ref common.ReferenceCallback) common }, }, }, + "bandwidth": { + SchemaProps: spec.SchemaProps{ + Description: "Bandwidth specifies the rate limit of north-south egress traffic of this Egress.", + Ref: ref("antrea.io/antrea/pkg/apis/crd/v1beta1.Bandwidth"), + }, + }, }, Required: []string{"appliedTo"}, }, }, Dependencies: []string{ - "antrea.io/antrea/pkg/apis/crd/v1beta1.AppliedTo"}, + "antrea.io/antrea/pkg/apis/crd/v1beta1.AppliedTo", "antrea.io/antrea/pkg/apis/crd/v1beta1.Bandwidth"}, } } diff --git a/pkg/client/informers/externalversions/crd/v1alpha2/interface.go b/pkg/client/informers/externalversions/crd/v1alpha2/interface.go index cf2fef7feb2..a15d69484b0 100644 --- a/pkg/client/informers/externalversions/crd/v1alpha2/interface.go +++ b/pkg/client/informers/externalversions/crd/v1alpha2/interface.go @@ -1,4 +1,4 @@ -// Copyright 2022 Antrea Authors +// Copyright 2023 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/controller/egress/controller_test.go b/pkg/controller/egress/controller_test.go index 4e8c6392e66..356774503aa 100644 --- a/pkg/controller/egress/controller_test.go +++ b/pkg/controller/egress/controller_test.go @@ -68,7 +68,7 @@ var ( eipFoo2 = newExternalIPPool("pool2", "", "2.2.2.10", "2.2.2.20") ) -func newEgress(name, egressIP, externalIPPool string, podSelector, namespaceSelector *metav1.LabelSelector) *v1beta1.Egress { +func newEgress(name, egressIP, externalIPPool string, podSelector, namespaceSelector *metav1.LabelSelector, bandwidth *v1beta1.Bandwidth) *v1beta1.Egress { egress := &v1beta1.Egress{ ObjectMeta: metav1.ObjectMeta{Name: name}, Spec: v1beta1.EgressSpec{ @@ -78,6 +78,7 @@ func newEgress(name, egressIP, externalIPPool string, podSelector, namespaceSele }, EgressIP: egressIP, ExternalIPPool: externalIPPool, + Bandwidth: bandwidth, }, } return egress diff --git a/pkg/controller/egress/validate.go b/pkg/controller/egress/validate.go index 061758afc01..713c5378f65 100644 --- a/pkg/controller/egress/validate.go +++ b/pkg/controller/egress/validate.go @@ -20,6 +20,7 @@ import ( "net" admv1 "k8s.io/api/admission/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" @@ -53,6 +54,17 @@ func (c *EgressController) ValidateEgress(review *admv1.AdmissionReview) *admv1. if len(newEgress.Spec.ExternalIPPools) > 0 { return false, "spec.externalIPPools is not supported yet" } + // Validate Egress trafficShaping + if newEgress.Spec.Bandwidth != nil { + _, err := resource.ParseQuantity(newEgress.Spec.Bandwidth.Rate) + if err != nil { + return false, fmt.Sprintf("Rate %s in Egress %s is invalid: %v", newEgress.Spec.Bandwidth.Rate, newEgress.Name, err) + } + _, err = resource.ParseQuantity(newEgress.Spec.Bandwidth.Burst) + if err != nil { + return false, fmt.Sprintf("Burst %s in Egress %s is invalid: %v", newEgress.Spec.Bandwidth.Burst, newEgress.Name, err) + } + } // Allow it if EgressIP and ExternalIPPool don't change. if newEgress.Spec.EgressIP == oldEgress.Spec.EgressIP && newEgress.Spec.ExternalIPPool == oldEgress.Spec.ExternalIPPool { return true, "" diff --git a/pkg/controller/egress/validate_test.go b/pkg/controller/egress/validate_test.go index 29105707337..16f21f09367 100644 --- a/pkg/controller/egress/validate_test.go +++ b/pkg/controller/egress/validate_test.go @@ -34,6 +34,20 @@ func marshal(object runtime.Object) []byte { } func TestEgressControllerValidateEgress(t *testing.T) { + var ( + bandwidth = crdv1beta1.Bandwidth{ + Rate: "500k", + Burst: "10M", + } + invalidBandwidthRate = crdv1beta1.Bandwidth{ + Rate: "500A", + Burst: "10G", + } + invalidBandwidthBurst = crdv1beta1.Bandwidth{ + Rate: "1.5G", + Burst: "10b", + } + ) tests := []struct { name string existingExternalIPPool *crdv1beta1.ExternalIPPool @@ -46,7 +60,7 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "CREATE", - Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "nonExistingPool", nil, nil))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "nonExistingPool", nil, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{ Allowed: false, @@ -61,7 +75,7 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "CREATE", - Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.11.1", "bar", nil, nil))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.11.1", "bar", nil, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{ Allowed: false, @@ -76,7 +90,7 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "CREATE", - Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{Allowed: true}, }, @@ -86,8 +100,8 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "UPDATE", - OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil))}, - Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.11.1", "bar", nil, nil))}, + OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil, nil))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.11.1", "bar", nil, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{ Allowed: false, @@ -102,8 +116,8 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "UPDATE", - OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil))}, - Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.2", "bar", nil, nil))}, + OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil, nil))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.2", "bar", nil, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{Allowed: true}, }, @@ -113,10 +127,10 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "UPDATE", - OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil))}, + OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "bar", nil, nil, nil))}, Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.2", "bar", &metav1.LabelSelector{ MatchLabels: map[string]string{"foo": "bar"}, - }, nil))}, + }, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{Allowed: true}, }, @@ -125,10 +139,57 @@ func TestEgressControllerValidateEgress(t *testing.T) { request: &admv1.AdmissionRequest{ Name: "foo", Operation: "DELETE", - Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.2", "bar", nil, nil))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.2", "bar", nil, nil, nil))}, }, expectedResponse: &admv1.AdmissionResponse{Allowed: true}, }, + { + name: "Creating an Egress with bandwidth should be allowed", + request: &admv1.AdmissionRequest{ + Name: "foo", + Operation: "CREATE", + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "", nil, nil, &bandwidth))}, + }, + expectedResponse: &admv1.AdmissionResponse{Allowed: true}, + }, + { + name: "Update an Egress bandwidth config should be allowed", + request: &admv1.AdmissionRequest{ + Name: "foo", + Operation: "UPDATE", + OldObject: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "", nil, nil, &bandwidth))}, + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "", nil, nil, nil))}, + }, + expectedResponse: &admv1.AdmissionResponse{Allowed: true}, + }, + { + name: "Create an Egress with invalid bandwidth rate", + request: &admv1.AdmissionRequest{ + Name: "foo", + Operation: "CREATE", + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "", nil, nil, &invalidBandwidthRate))}, + }, + expectedResponse: &admv1.AdmissionResponse{ + Allowed: false, + Result: &metav1.Status{ + Message: "Rate 500A in Egress foo is invalid: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'", + }, + }, + }, + { + name: "Create an Egress with invalid bandwidth burst", + request: &admv1.AdmissionRequest{ + Name: "foo", + Operation: "CREATE", + Object: runtime.RawExtension{Raw: marshal(newEgress("foo", "10.10.10.1", "", nil, nil, &invalidBandwidthBurst))}, + }, + expectedResponse: &admv1.AdmissionResponse{ + Allowed: false, + Result: &metav1.Status{ + Message: "Burst 10b in Egress foo is invalid: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'", + }, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/features/antrea_features.go b/pkg/features/antrea_features.go index 057473f1e04..e2b5cb801c8 100644 --- a/pkg/features/antrea_features.go +++ b/pkg/features/antrea_features.go @@ -142,6 +142,10 @@ const ( // Enable the AdminNetworkPolicy APIs // https://github.com/kubernetes-sigs/network-policy-api AdminNetworkPolicy featuregate.Feature = "AdminNetworkPolicy" + + // alpha: v1.14 + // Enable Egress traffic shaping. + EgressTrafficShaping featuregate.Feature = "EgressTrafficShaping" ) var ( @@ -179,6 +183,7 @@ var ( L7NetworkPolicy: {Default: false, PreRelease: featuregate.Alpha}, LoadBalancerModeDSR: {Default: false, PreRelease: featuregate.Alpha}, AdminNetworkPolicy: {Default: false, PreRelease: featuregate.Alpha}, + EgressTrafficShaping: {Default: false, PreRelease: featuregate.Alpha}, } // AgentGates consists of all known feature gates for the Antrea Agent. @@ -205,6 +210,7 @@ var ( TopologyAwareHints, Traceflow, TrafficControl, + EgressTrafficShaping, ) // ControllerGates consists of all known feature gates for the Antrea Controller. @@ -248,6 +254,7 @@ var ( L7NetworkPolicy: {}, LoadBalancerModeDSR: {}, CleanupStaleUDPSvcConntrack: {}, + EgressTrafficShaping: {}, } // supportedFeaturesOnExternalNode records the features supported on an external // Node. Antrea Agent checks the enabled features if it is running on an diff --git a/test/e2e/egress_test.go b/test/e2e/egress_test.go index 7da9d309080..230013ddae5 100644 --- a/test/e2e/egress_test.go +++ b/test/e2e/egress_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "net" + "strconv" "strings" "testing" "time" @@ -64,6 +65,7 @@ func TestEgress(t *testing.T) { t.Run("testEgressUpdateNodeSelector", func(t *testing.T) { testEgressUpdateNodeSelector(t, data) }) t.Run("testEgressNodeFailure", func(t *testing.T) { testEgressNodeFailure(t, data) }) t.Run("testCreateExternalIPPool", func(t *testing.T) { testCreateExternalIPPool(t, data) }) + t.Run("testUpdateBandwidth", func(t *testing.T) { testEgressUpdateBandwidth(t, data) }) } func testCreateExternalIPPool(t *testing.T, data *TestData) { @@ -204,7 +206,7 @@ func testEgressClientIP(t *testing.T, data *TestData) { Operator: metav1.LabelSelectorOpExists, }, } - egress := data.createEgress(t, "egress-", matchExpressions, nil, "", egressNodeIP) + egress := data.createEgress(t, "egress-", matchExpressions, nil, "", egressNodeIP, nil) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) assertClientIP(localPod, egressNodeIP) assertClientIP(remotePod, egressNodeIP) @@ -373,7 +375,7 @@ func testEgressCRUD(t *testing.T, data *TestData) { pool := data.createExternalIPPool(t, "crud-pool-", tt.ipRange, tt.nodeSelector.MatchExpressions, tt.nodeSelector.MatchLabels) defer data.crdClient.CrdV1beta1().ExternalIPPools().Delete(context.TODO(), pool.Name, metav1.DeleteOptions{}) - egress := data.createEgress(t, "crud-egress-", nil, map[string]string{"foo": "bar"}, pool.Name, "") + egress := data.createEgress(t, "crud-egress-", nil, map[string]string{"foo": "bar"}, pool.Name, "", nil) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) // Use Poll to wait the interval before the first run to detect the case that the IP is assigned to any Node // when it's not supposed to. @@ -493,7 +495,7 @@ func testEgressUpdateEgressIP(t *testing.T, data *TestData) { newPool := data.createExternalIPPool(t, "newpool-", tt.newIPRange, nil, map[string]string{v1.LabelHostname: tt.newNode}) defer data.crdClient.CrdV1beta1().ExternalIPPools().Delete(context.TODO(), newPool.Name, metav1.DeleteOptions{}) - egress := data.createEgress(t, "egress-", nil, map[string]string{"foo": "bar"}, originalPool.Name, "") + egress := data.createEgress(t, "egress-", nil, map[string]string{"foo": "bar"}, originalPool.Name, "", nil) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) egress, err := data.checkEgressState(egress.Name, tt.originalEgressIP, tt.originalNode, "", time.Second) require.NoError(t, err) @@ -641,7 +643,7 @@ func testEgressMigration(t *testing.T, data *TestData, triggerFunc, revertFunc f externalIPPoolTwoNodes := data.createExternalIPPool(t, "pool-with-two-nodes-", *ipRange, matchExpressions, nil) defer data.crdClient.CrdV1beta1().ExternalIPPools().Delete(context.TODO(), externalIPPoolTwoNodes.Name, metav1.DeleteOptions{}) - egress := data.createEgress(t, "migration-egress-", nil, map[string]string{"foo": "bar"}, externalIPPoolTwoNodes.Name, "") + egress := data.createEgress(t, "migration-egress-", nil, map[string]string{"foo": "bar"}, externalIPPoolTwoNodes.Name, "", nil) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) var err error @@ -685,6 +687,62 @@ func testEgressMigration(t *testing.T, data *TestData, triggerFunc, revertFunc f checkIPNeighbor(fromNode) } +func testEgressUpdateBandwidth(t *testing.T, data *TestData) { + skipIfEgressShapingDisabled(t) + skipIfNotIPv4Cluster(t) + skipIfHasWindowsNodes(t) + bandwidth := &v1beta1.Bandwidth{ + Rate: "100M", + Burst: "200M", + } + transMap := map[string]int{ + "100M": 100, + "200M": 200, + } + + egressNode := nodeName(1) + egressNodeIP := nodeIP(1) + + // Create another netns to fake an external network on the host network Pod. + fakeExternalName := "fake-external" + fakeExternalCmd := "iperf3 -s" + cmd, _ := getCommandInFakeExternalNetwork(fakeExternalCmd, 24, "1.1.1.1", "1.1.1.254") + + err := NewPodBuilder(fakeExternalName, data.testNamespace, toolboxImage).OnNode(egressNode).WithCommand([]string{"bash", "-c", cmd}).InHostNetwork().Privileged().Create(data) + require.NoError(t, err, "Failed to create fake external Pod") + defer deletePodWrapper(t, data, data.testNamespace, fakeExternalName) + err = data.podWaitForRunning(defaultTimeout, fakeExternalName, data.testNamespace) + require.NoError(t, err, "Error when waiting for fake external Pod to be in the Running state") + + clientPodName := "client-pod" + err = NewPodBuilder(clientPodName, data.testNamespace, toolboxImage).OnNode(egressNode).Create(data) + require.NoError(t, err, "Failed to create client Pod") + defer deletePodWrapper(t, data, data.testNamespace, clientPodName) + err = data.podWaitForRunning(defaultTimeout, clientPodName, data.testNamespace) + require.NoError(t, err, "Error when waiting for the client Pod to be in the Running state") + + egress := data.createEgress(t, "egress-qos-", nil, map[string]string{"antrea-e2e": clientPodName}, "", egressNodeIP, bandwidth) + _, err = data.waitForEgressRealized(egress) + require.NoError(t, err, "Error when waiting for Egress to be realized") + defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) + + // expectedBandwidth is Mbps + runIperf := func(cmd []string, expectedBandwidth int) { + stdout, _, err := data.RunCommandFromPod(data.testNamespace, clientPodName, "toolbox", cmd) + if err != nil { + t.Fatalf("Error when running iperf3 client: %v", err) + } + stdout = strings.TrimSpace(stdout) + actualBandwidth, _ := strconv.ParseFloat(strings.TrimSpace(stdout), 64) + t.Logf("Actual bandwidth: %v Mbits/sec", actualBandwidth) + // Allow a certain deviation. + assert.InEpsilon(t, actualBandwidth, expectedBandwidth, 0.2) + } + + runIperf([]string{"bash", "-c", "iperf3 -c 1.1.1.1 -f m -t 1|grep sender|awk '{print $7}'"}, transMap[bandwidth.Rate]+transMap[bandwidth.Burst]) + runIperf([]string{"bash", "-c", "iperf3 -c 1.1.1.1 -f m -O 1|grep sender|awk '{print $7}'"}, transMap[bandwidth.Rate]) +} + func (data *TestData) checkEgressState(egressName, expectedIP, expectedNode, otherNode string, timeout time.Duration) (*v1beta1.Egress, error) { var egress *v1beta1.Egress var expectedNodeHasIP, otherNodeHasIP bool @@ -811,7 +869,7 @@ func (data *TestData) createExternalIPPool(t *testing.T, generateName string, ip return pool } -func (data *TestData) createEgress(t *testing.T, generateName string, matchExpressions []metav1.LabelSelectorRequirement, matchLabels map[string]string, externalPoolName string, egressIP string) *v1beta1.Egress { +func (data *TestData) createEgress(t *testing.T, generateName string, matchExpressions []metav1.LabelSelectorRequirement, matchLabels map[string]string, externalPoolName string, egressIP string, bandwidth *v1beta1.Bandwidth) *v1beta1.Egress { egress := &v1beta1.Egress{ ObjectMeta: metav1.ObjectMeta{GenerateName: generateName}, Spec: v1beta1.EgressSpec{ @@ -823,6 +881,7 @@ func (data *TestData) createEgress(t *testing.T, generateName string, matchExpre }, ExternalIPPool: externalPoolName, EgressIP: egressIP, + Bandwidth: bandwidth, }, } egress, err := data.crdClient.CrdV1beta1().Egresses().Create(context.TODO(), egress, metav1.CreateOptions{}) diff --git a/test/e2e/fixtures.go b/test/e2e/fixtures.go index 9c808d98c73..34eb5ac91a7 100644 --- a/test/e2e/fixtures.go +++ b/test/e2e/fixtures.go @@ -180,6 +180,10 @@ func skipIfProxyDisabled(t *testing.T, data *TestData) { } } +func skipIfEgressShapingDisabled(t *testing.T) { + skipIfFeatureDisabled(t, features.EgressTrafficShaping, true /* checkAgent */, false /* checkController */) +} + func skipIfProxyAllDisabled(t *testing.T, data *TestData) { isProxyAll, err := data.isProxyAll() if err != nil { diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 7b8a1f44a1e..6afe450c5a8 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -553,7 +553,7 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs } else { egressNodeIP = nodeIPv6(0) } - egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "busybox"}, "", egressNodeIP) + egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "busybox"}, "", egressNodeIP, nil) egress, err := data.waitForEgressRealized(egress) if err != nil { t.Fatalf("Error when waiting for Egress to be realized: %v", err) @@ -593,7 +593,7 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs } else { egressNodeIP = nodeIPv6(1) } - egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "busybox"}, "", egressNodeIP) + egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "busybox"}, "", egressNodeIP, nil) egress, err := data.waitForEgressRealized(egress) if err != nil { t.Fatalf("Error when waiting for Egress to be realized: %v", err) diff --git a/test/e2e/proxy_test.go b/test/e2e/proxy_test.go index 8a5d71012ef..76e78aee429 100644 --- a/test/e2e/proxy_test.go +++ b/test/e2e/proxy_test.go @@ -423,7 +423,7 @@ func TestNodePortAndEgressWithTheSameBackendPod(t *testing.T) { // Create an Egress whose external IP is on worker Node. egressNodeIP := workerNodeIPv4(1) - egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "nginx"}, "", egressNodeIP) + egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "nginx"}, "", egressNodeIP, nil) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) // Create the backend Pod on control plane Node. diff --git a/test/e2e/traceflow_test.go b/test/e2e/traceflow_test.go index 125f7e89c06..61701ebe952 100644 --- a/test/e2e/traceflow_test.go +++ b/test/e2e/traceflow_test.go @@ -2052,7 +2052,7 @@ func testTraceflowEgress(t *testing.T, data *TestData) { }, } - egress := data.createEgress(t, "egress-", matchExpressions, nil, "", egressIP) + egress := data.createEgress(t, "egress-", matchExpressions, nil, "", egressIP, nil) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) testcaseLocalEgress := testcase{ diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index e23bb7b0fad..8f67458f859 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -120,7 +120,7 @@ func TestConnectivityFlows(t *testing.T) { antrearuntime.WindowsOS = runtime.GOOS } - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) defer func() { @@ -176,7 +176,7 @@ func TestAntreaFlexibleIPAMConnectivityFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, false, false, false, true, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, true, false, false, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) defer func() { @@ -239,7 +239,7 @@ func TestReplayFlowsConnectivityFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -281,7 +281,7 @@ func TestReplayFlowsNetworkPolicyFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -466,7 +466,7 @@ func TestNetworkPolicyFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -580,7 +580,7 @@ func TestIPv6ConnectivityFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -621,7 +621,7 @@ func TestProxyServiceFlowsAntreaPolicyDisabled(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -711,7 +711,7 @@ func TestProxyServiceFlowsAntreaPoilcyEnabled(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, true, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, true, false, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -1704,8 +1704,8 @@ func expectedExternalFlows(ipProtoStr, gwMACStr string) []expectTableFlows { } } -func prepareEgressMarkFlows(snatIP net.IP, mark, podOFPort, podOFPortRemote uint32, vMAC, localGwMAC net.HardwareAddr) []expectTableFlows { - var ipProtoStr, tunDstFieldName string +func prepareEgressMarkFlows(snatIP net.IP, mark, podOFPort, podOFPortRemote uint32, vMAC, localGwMAC net.HardwareAddr, trafficShaping bool) []expectTableFlows { + var ipProtoStr, tunDstFieldName, nextTableName, ctStateMatch string if snatIP.To4() != nil { tunDstFieldName = "tun_dst" ipProtoStr = "ip" @@ -1713,17 +1713,24 @@ func prepareEgressMarkFlows(snatIP net.IP, mark, podOFPort, podOFPortRemote uint tunDstFieldName = "tun_ipv6_dst" ipProtoStr = "ipv6" } + if trafficShaping { + nextTableName = "EgressQoS" + ctStateMatch = "+trk" + } else { + nextTableName = "L2ForwardingCalc" + ctStateMatch = "+new+trk" + } return []expectTableFlows{ { "EgressMark", []*ofTestUtils.ExpectFlow{ { - MatchStr: fmt.Sprintf("priority=200,ct_state=+new+trk,%s,%s=%s", ipProtoStr, tunDstFieldName, snatIP), - ActStr: fmt.Sprintf("set_field:0x%x/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", mark), + MatchStr: fmt.Sprintf("priority=200,ct_state=%s,%s,%s=%s", ctStateMatch, ipProtoStr, tunDstFieldName, snatIP), + ActStr: fmt.Sprintf("set_field:0x%x/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:%s", mark, nextTableName), }, { - MatchStr: fmt.Sprintf("priority=200,ct_state=+new+trk,%s,in_port=%d", ipProtoStr, podOFPort), - ActStr: fmt.Sprintf("set_field:0x%x/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", mark), + MatchStr: fmt.Sprintf("priority=200,ct_state=%s,%s,in_port=%d", ctStateMatch, ipProtoStr, podOFPort), + ActStr: fmt.Sprintf("set_field:0x%x/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:%s", mark, nextTableName), }, { MatchStr: fmt.Sprintf("priority=200,%s,in_port=%d", ipProtoStr, podOFPortRemote), @@ -1785,11 +1792,16 @@ func prepareTrafficControlFlows(sourceOFPorts []uint32, targetOFPort, returnOFPo } func TestEgressMarkFlows(t *testing.T) { + testEgressMarkFlows(t, true) + testEgressMarkFlows(t, false) +} + +func testEgressMarkFlows(t *testing.T, trafficShaping bool) { // Reset OVS metrics (Prometheus) and reinitialize them to test. legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), false, false, false, true, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), false, false, false, true, trafficShaping, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -1817,8 +1829,8 @@ func TestEgressMarkFlows(t *testing.T) { vMAC := config.globalMAC gwMAC := config.nodeConfig.GatewayConfig.MAC - expectedFlows := append(prepareEgressMarkFlows(snatIP, snatMark, podOFPort, podOFPortRemote, vMAC, gwMAC), - prepareEgressMarkFlows(snatIPV6, snatMarkV6, podOFPortV6, podOFPortRemoteV6, vMAC, gwMAC)...) + expectedFlows := append(prepareEgressMarkFlows(snatIP, snatMark, podOFPort, podOFPortRemote, vMAC, gwMAC, trafficShaping), + prepareEgressMarkFlows(snatIPV6, snatMarkV6, podOFPortV6, podOFPortRemoteV6, vMAC, gwMAC, trafficShaping)...) c.InstallSNATMarkFlows(snatIP, snatMark) c.InstallSNATMarkFlows(snatIPV6, snatMarkV6) @@ -1846,7 +1858,7 @@ func TestTrafficControlFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), false, false, false, false, false, false, false, false, false, true, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), false, false, false, false, false, false, false, false, false, false, true, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br))