Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: graysonwu <wgrayson@vmware.com>
  • Loading branch information
GraysonWu committed Sep 27, 2023
1 parent d4592a5 commit 169b417
Show file tree
Hide file tree
Showing 15 changed files with 174 additions and 46 deletions.
2 changes: 1 addition & 1 deletion docs/egress.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ to one Node manually.

### RateLimit

The `rateLimit` field specifies north-south egress traffic of the Egress. `peak`
The `rateLimit` field impose rate-limiting on egress traffic of the Egress. `peak`
specifies the maximum mbps. `burst` specifies maximum burst mbps for throttle.
All backend workloads selected by a rate-limited Egress share the same bandwidth
while sending egress traffic via this Egress.
Expand Down
14 changes: 9 additions & 5 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ type EgressController struct {
serviceCIDRUpdateRetryDelay time.Duration

meterTracker map[uint32]*crdv1b1.RateLimit
meterTrackerMutex sync.RWMutex
meterTrackerMutex sync.Mutex
}

func NewEgressController(
Expand Down Expand Up @@ -555,6 +555,7 @@ func (c *EgressController) unrealizeEgressIP(egressName, egressIP string) error
}

func (c *EgressController) uninstallEgressQoSMeter(meterID uint32) error {
// Skip the uninstallation on non-Egress Node.
if meterID == 0 {
return nil
}
Expand Down Expand Up @@ -766,10 +767,13 @@ func (c *EgressController) syncEgress(egressName string) error {
}
}

// rateLimitChanged will be true if this Egress update its rateLimit or this is a new Egress.
rateLimitChanged := eState.hasRateLimit != (egress.Spec.RateLimit != nil)
// rateLimitExistenceChanged will be true if
// 1. a new Egress with rate-limit is created
// 2. an existing Egress removes its rate-limit
// 3. an existing Egress adds rate-limit from a non-rate-limit state
rateLimitExistenceChanged := eState.hasRateLimit != (egress.Spec.RateLimit != nil)
// Realize the latest EgressIP and get the desired mark.
mark, err := c.realizeEgressIP(egressName, desiredEgressIP, egress.Spec.RateLimit, rateLimitChanged)
mark, err := c.realizeEgressIP(egressName, desiredEgressIP, egress.Spec.RateLimit, rateLimitExistenceChanged)
if err != nil {
return err
}
Expand Down Expand Up @@ -828,7 +832,7 @@ func (c *EgressController) syncEgress(egressName string) error {
ofPort := ifaces[0].OFPort
if eState.ofPorts.Has(ofPort) {
staleOFPorts.Delete(ofPort)
if !rateLimitChanged {
if !rateLimitExistenceChanged {
continue
}
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ func (c *client) initialize() error {
}

for _, activeFeature := range c.activatedFeatures {
if err := c.ofEntryOperations.AddOFEntries(activeFeature.initOFEntries()); err != nil {
if err := c.ofEntryOperations.AddOFEntries(activeFeature.initGroups()); err != nil {
return fmt.Errorf("failed to install feature %v initial groups: %v", activeFeature.getFeatureName(), err)
}
if err := c.ofEntryOperations.AddAll(activeFeature.initFlows()); err != nil {
Expand Down Expand Up @@ -1092,8 +1092,15 @@ func (c *client) ReplayFlows() {
}

for _, activeFeature := range c.activatedFeatures {
if err := c.ofEntryOperations.AddOFEntries(activeFeature.replayOFEntries()); err != nil {
klog.ErrorS(err, "Error when replaying feature OF entries", "feature", activeFeature.getFeatureName())
for _, meter := range activeFeature.replayMeters() {
// Openflow bundle message doesn't support meter. Add meter individually instead of
// calling AddOFEntries function.
if err := meter.Add(); err != nil {
klog.ErrorS(err, "Error when replaying feature meters", "feature", activeFeature.getFeatureName())
}
}
if err := c.ofEntryOperations.AddOFEntries(activeFeature.replayGroups()); err != nil {
klog.ErrorS(err, "Error when replaying feature groups", "feature", activeFeature.getFeatureName())
}
if err := c.ofEntryOperations.AddAll(activeFeature.replayFlows()); err != nil {
klog.ErrorS(err, "Error when replaying feature flows", "feature", activeFeature.getFeatureName())
Expand Down
97 changes: 89 additions & 8 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1555,29 +1555,52 @@ func Test_client_InstallSNATMarkFlows(t *testing.T) {
testCases := []struct {
name string
snatIP net.IP
meterSupport bool
expectedFlows []string
}{
{
name: "IPv4 SNAT IP",
snatIP: net.ParseIP("192.168.77.100"),
name: "IPv4 SNAT IP",
snatIP: net.ParseIP("192.168.77.100"),
meterSupport: false,
expectedFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=200,ct_state=+new+trk,ip,tun_dst=192.168.77.100 actions=set_field:0x64/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
},
{
name: "IPv6 SNAT IP",
snatIP: net.ParseIP("fec0:192:168:77::100"),
name: "IPv6 SNAT IP",
snatIP: net.ParseIP("fec0:192:168:77::100"),
meterSupport: false,
expectedFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=200,ct_state=+new+trk,ipv6,tun_ipv6_dst=fec0:192:168:77::100 actions=set_field:0x64/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
},
{
name: "IPv4 SNAT IP meter",
snatIP: net.ParseIP("192.168.77.100"),
meterSupport: true,
expectedFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=200,ct_state=+new+trk,ip,tun_dst=192.168.77.100 actions=meter:100,set_field:0x64/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
},
{
name: "IPv6 SNAT IP meter",
snatIP: net.ParseIP("fec0:192:168:77::100"),
meterSupport: true,
expectedFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=200,ct_state=+new+trk,ipv6,tun_ipv6_dst=fec0:192:168:77::100 actions=meter:100,set_field:0x64/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
m := oftest.NewMockOFEntryOperations(ctrl)

fc := newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap)
if tc.meterSupport {
fc.featureEgress.ovsMetersAreSupported = tc.meterSupport
fc.featureEgress.cachedMeter.Store(uint32(100), nil)
}
defer resetPipelines()

m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1)
Expand All @@ -1602,22 +1625,33 @@ func Test_client_InstallPodSNATFlows(t *testing.T) {

testCases := []struct {
name string
meterSupport bool
snatMark uint32
expectedFlows []string
}{
{
name: "SNAT on Local",
snatMark: uint32(100),
name: "SNAT on Local",
meterSupport: false,
snatMark: uint32(100),
expectedFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=200,ct_state=+new+trk,ip,in_port=100 actions=set_field:0x64/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
},
{
name: "SNAT on Remote",
name: "SNAT on Remote",
meterSupport: false,
expectedFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=200,ip,in_port=100 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:ff->eth_dst,set_field:192.168.77.101->tun_dst,set_field:0x10/0xf0->reg0,set_field:0x80000/0x80000->reg0,goto_table:L2ForwardingCalc",
},
},
{
name: "SNAT on Local meter",
meterSupport: true,
snatMark: uint32(100),
expectedFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=200,ct_state=+new+trk,ip,in_port=100 actions=meter:100,set_field:0x64/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
},
}

for _, tc := range testCases {
Expand All @@ -1626,6 +1660,10 @@ func Test_client_InstallPodSNATFlows(t *testing.T) {
m := oftest.NewMockOFEntryOperations(ctrl)

fc := newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap)
if tc.meterSupport {
fc.featureEgress.ovsMetersAreSupported = tc.meterSupport
fc.featureEgress.cachedMeter.Store(uint32(100), nil)
}
defer resetPipelines()

m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1)
Expand All @@ -1644,6 +1682,35 @@ func Test_client_InstallPodSNATFlows(t *testing.T) {
}
}

func Test_client_InstallEgressQoSMeter(t *testing.T) {
meterID := uint32(100)
meterRate := uint32(100)
meterBurst := uint32(200)

ctrl := gomock.NewController(t)
m := oftest.NewMockOFEntryOperations(ctrl)
bridge := ovsoftest.NewMockBridge(ctrl)
fc := newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap)
fc.bridge = bridge
fc.ovsMetersAreSupported = true
defer resetPipelines()

meter := ovsoftest.NewMockMeter(ctrl)
meterBuilder := ovsoftest.NewMockMeterBandBuilder(ctrl)
bridge.EXPECT().NewMeter(binding.MeterIDType(meterID), ofctrl.MeterBurst|ofctrl.MeterKbps).Return(meter).Times(1)
meter.EXPECT().MeterBand().Return(meterBuilder).Times(1)
meterBuilder.EXPECT().MeterType(ofctrl.MeterDrop).Return(meterBuilder).Times(1)
meterBuilder.EXPECT().Rate(1000 * meterRate).Return(meterBuilder).Times(1)
meterBuilder.EXPECT().Burst(1000 * meterBurst).Return(meterBuilder).Times(1)
meterBuilder.EXPECT().Done().Return(meter).Times(1)
meter.EXPECT().Add().Return(nil).Times(1)

assert.NoError(t, fc.InstallEgressQoSMeter(meterID, meterRate, meterBurst))

meter.EXPECT().Delete().Return(nil).Times(1)
assert.NoError(t, fc.UninstallEgressQoSMeter(meterID))
}

func Test_client_InstallTraceflowFlows(t *testing.T) {
type fields struct {
}
Expand Down Expand Up @@ -2606,7 +2673,7 @@ func Test_client_ReplayFlows(t *testing.T) {
"cookie=0x1060000000000, table=L3Forwarding, priority=199,ip,reg0=0x2000/0x2000,nw_dst=192.168.78.101 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:f0->eth_dst,set_field:192.168.78.101->tun_dst,set_field:0x10/0xf0->reg0,goto_table:L3DecTTL",
)
// Feature Network Policy replays flows.
fc.featureNetworkPolicy.initOFEntries()
fc.featureNetworkPolicy.initGroups()
expectedGroups := []string{
"group_id=1,type=all,bucket=bucket_id:0,actions=resubmit:EgressRule,bucket=bucket_id:1,actions=set_field:0x400000/0x600000->reg0,resubmit:Output",
"group_id=2,type=all,bucket=bucket_id:0,actions=resubmit:EgressMetric,bucket=bucket_id:1,actions=set_field:0x400000/0x600000->reg0,resubmit:Output",
Expand Down Expand Up @@ -2707,6 +2774,20 @@ func Test_client_ReplayFlows(t *testing.T) {
meterBuilder.EXPECT().Done().Return(meter).Times(1)
meter.EXPECT().Add().Return(nil).Times(1)
}
egressMeterID := uint32(1)
egressMeterRate := uint32(100)
egressMeterBurst := uint32(200)
egressMeter := fc.genOFMeter(binding.MeterIDType(egressMeterID), ofctrl.MeterBurst|ofctrl.MeterKbps, egressMeterRate, egressMeterBurst)
fc.featureEgress.cachedMeter.Store(egressMeterID, egressMeter)
meter := ovsoftest.NewMockMeter(ctrl)
meterBuilder := ovsoftest.NewMockMeterBandBuilder(ctrl)
bridge.EXPECT().NewMeter(binding.MeterIDType(egressMeterID), ofctrl.MeterBurst|ofctrl.MeterKbps).Return(meter).Times(1)
meter.EXPECT().MeterBand().Return(meterBuilder).Times(1)
meterBuilder.EXPECT().MeterType(ofctrl.MeterDrop).Return(meterBuilder).Times(1)
meterBuilder.EXPECT().Rate(1000 * egressMeterRate).Return(meterBuilder).Times(1)
meterBuilder.EXPECT().Burst(1000 * egressMeterRate).Return(meterBuilder).Times(1)
meterBuilder.EXPECT().Done().Return(meter).Times(1)
meter.EXPECT().Add().Return(nil).Times(1)
}

fc.ReplayFlows()
Expand Down
8 changes: 6 additions & 2 deletions pkg/agent/openflow/egress.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,15 @@ func (f *featureEgress) replayFlows() []*openflow15.FlowMod {
return getCachedFlowMessages(f.cachedFlows)
}

func (f *featureEgress) initOFEntries() []binding.OFEntry {
func (f *featureEgress) initGroups() []binding.OFEntry {
return nil
}

func (f *featureEgress) replayOFEntries() []binding.OFEntry {
func (f *featureEgress) replayGroups() []binding.OFEntry {
return nil
}

func (f *featureEgress) replayMeters() []binding.OFEntry {
var meters []binding.OFEntry
f.cachedMeter.Range(func(id, value interface{}) bool {
meter := value.(binding.Meter)
Expand Down
8 changes: 6 additions & 2 deletions pkg/agent/openflow/externalnode_connectivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,15 @@ func (f *featureExternalNodeConnectivity) replayFlows() []*openflow15.FlowMod {
return flows
}

func (f *featureExternalNodeConnectivity) initOFEntries() []binding.OFEntry {
func (f *featureExternalNodeConnectivity) initGroups() []binding.OFEntry {
return nil
}

func (f *featureExternalNodeConnectivity) replayOFEntries() []binding.OFEntry {
func (f *featureExternalNodeConnectivity) replayGroups() []binding.OFEntry {
return nil
}

func (f *featureExternalNodeConnectivity) replayMeters() []binding.OFEntry {
return nil
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/agent/openflow/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ type feature interface {
getFeatureName() string
// getRequiredTables returns a slice of required tables of the feature.
getRequiredTables() []*Table
// initOFEntries returns the OpenFlow entries, e.g. Groups, Meters, of the feature needed in the initialization.
initOFEntries() []binding.OFEntry
// initGroups returns the OpenFlow groups of the feature needed in the initialization.
initGroups() []binding.OFEntry
// initFlows returns the Openflow messages of initial flows of the feature.
initFlows() []*openflow15.FlowMod
// replayOFEntries returns the fixed and cached Openflow entries, e.g. Groups, Meters, that need to be replayed after OVS is reconnected.
replayOFEntries() []binding.OFEntry
// replayMeters returns the fixed and cached Openflow meters that need to be replayed after OVS is reconnected.
replayMeters() []binding.OFEntry
// replayGroups returns the fixed and cached Openflow groups that need to be replayed after OVS is reconnected.
replayGroups() []binding.OFEntry
// replayFlows returns the Openflow messages of fixed and cached flows that need to be replayed after OVS is reconnected.
replayFlows() []*openflow15.FlowMod
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/agent/openflow/multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (f *featureMulticast) multicastPodMetricFlows(podIP net.IP, podOFPort uint3
}
}

func (f *featureMulticast) replayOFEntries() []binding.OFEntry {
func (f *featureMulticast) replayGroups() []binding.OFEntry {
var groups []binding.OFEntry
f.groupCache.Range(func(id, value interface{}) bool {
group := value.(binding.Group)
Expand All @@ -192,7 +192,11 @@ func (f *featureMulticast) replayOFEntries() []binding.OFEntry {
return groups
}

func (f *featureMulticast) initOFEntries() []binding.OFEntry {
func (f *featureMulticast) initGroups() []binding.OFEntry {
return nil
}

func (f *featureMulticast) replayMeters() []binding.OFEntry {
return nil
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/agent/openflow/multicluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,15 @@ func (f *featureMulticluster) replayFlows() []*openflow15.FlowMod {
return getCachedFlowMessages(f.cachedFlows)
}

func (f *featureMulticluster) initOFEntries() []binding.OFEntry {
func (f *featureMulticluster) initGroups() []binding.OFEntry {
return nil
}

func (f *featureMulticluster) replayOFEntries() []binding.OFEntry {
func (f *featureMulticluster) replayGroups() []binding.OFEntry {
return nil
}

func (f *featureMulticluster) replayMeters() []binding.OFEntry {
return nil
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/agent/openflow/network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2269,7 +2269,7 @@ func (f *featureNetworkPolicy) initLoggingFlows() []binding.Flow {
return flows
}

func (f *featureNetworkPolicy) initOFEntries() []binding.OFEntry {
func (f *featureNetworkPolicy) initGroups() []binding.OFEntry {
var entries []binding.OFEntry
candidateTables := []*Table{EgressRuleTable, EgressMetricTable, IngressRuleTable, IngressMetricTable}
if f.enableMulticast {
Expand Down Expand Up @@ -2301,12 +2301,16 @@ func (f *featureNetworkPolicy) initOFEntries() []binding.OFEntry {
return entries
}

func (f *featureNetworkPolicy) replayMeters() []binding.OFEntry {
return nil
}

func (f *featureNetworkPolicy) getLoggingAndResubmitGroupID(nextTable uint8) binding.GroupIDType {
groupKey := fmt.Sprintf("%d", nextTable)
group, _ := f.loggingGroupCache.Load(groupKey)
return group.(binding.Group).GetID()
}

func (f *featureNetworkPolicy) replayOFEntries() []binding.OFEntry {
func (f *featureNetworkPolicy) replayGroups() []binding.OFEntry {
return nil
}
4 changes: 3 additions & 1 deletion pkg/agent/openflow/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ const (
PacketInNPStoreDenyOperation = 0b100

// We use OpenFlow Meter for packetIn rate limiting on OVS side.
// Meter Entry ID. Note: 1-255 are reserved for Egress QoS.
// Meter Entry ID.
// 1-255 are reserved for Egress QoS. The Egress QoS meterID leverage the same
// number as the mark allocated to the EgressIP.
PacketInMeterIDNP = 256
PacketInMeterIDTF = 257
PacketInMeterIDDNS = 258
Expand Down
8 changes: 6 additions & 2 deletions pkg/agent/openflow/pod_connectivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,14 @@ func (f *featurePodConnectivity) trafficControlCommonFlows() []binding.Flow {
}
}

func (f *featurePodConnectivity) initOFEntries() []binding.OFEntry {
func (f *featurePodConnectivity) initGroups() []binding.OFEntry {
return nil
}

func (f *featurePodConnectivity) replayOFEntries() []binding.OFEntry {
func (f *featurePodConnectivity) replayGroups() []binding.OFEntry {
return nil
}

func (f *featurePodConnectivity) replayMeters() []binding.OFEntry {
return nil
}
Loading

0 comments on commit 169b417

Please sign in to comment.