Skip to content

Commit

Permalink
Use OpenFlow group for Network Policy logging (antrea-io#5061)
Browse files Browse the repository at this point in the history
OpenFlow groups are created for Network Policy logging purpose, which are used
as the OpenFlow action if logging is enabled on the related resource.

Having tested with the worst case that enabling logging on an Antrea-native
policy rule to drop all UDP packets, which expects that all UDP packets are sent
to antrea-agent via packetIn channel, OVS CPU usage is changede from about 1.0%
to 1.3% comparing to the previous implementations in which packet is firstly
dropped by OVS meter before applying the policy actions and logging actions.
The CPU usage increasement introduced by this change is acceptable.

Besides, this change also explicitly set parameter enableLogging as "false" in
function "defaultDropFlow" when it is called by "flowsToTrace". This is to
avoid generating a unnecessary logging packet in a trace flow request.

Signed-off-by: wenyingd <wenyingd@vmware.com>
  • Loading branch information
wenyingd authored Jul 21, 2023
1 parent 9c09f7c commit 31c3289
Show file tree
Hide file tree
Showing 35 changed files with 811 additions and 234 deletions.
3 changes: 2 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func run(o *Options) error {
ovsCtlClient := ovsctl.NewClient(o.config.OVSBridge)
ovsBridgeMgmtAddr := ofconfig.GetMgmtAddress(o.config.OVSRunDir, o.config.OVSBridge)
multicastEnabled := features.DefaultFeatureGate.Enabled(features.Multicast) && o.config.Multicast.Enable
groupIDAllocator := openflow.NewGroupAllocator()
ofClient := openflow.NewClient(o.config.OVSBridge,
ovsBridgeMgmtAddr,
nodeIPTracker,
Expand All @@ -162,6 +163,7 @@ func run(o *Options) error {
multicastEnabled,
features.DefaultFeatureGate.Enabled(features.TrafficControl),
enableMulticlusterGW,
groupIDAllocator,
)

var serviceCIDRNet *net.IPNet
Expand Down Expand Up @@ -384,7 +386,6 @@ func run(o *Options) error {

var groupCounters []proxytypes.GroupCounter
groupIDUpdates := make(chan string, 100)
groupIDAllocator := openflow.NewGroupAllocator()
var v4GroupCounter, v6GroupCounter proxytypes.GroupCounter
if v4Enabled {
v4GroupCounter = proxytypes.NewGroupCounter(groupIDAllocator, groupIDUpdates)
Expand Down
2 changes: 1 addition & 1 deletion hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ MOCKGEN_TARGETS=(
"pkg/controller/querier ControllerQuerier testing"
"pkg/flowaggregator/exporter Interface testing"
"pkg/ipfix IPFIXExportingProcess,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess testing"
"pkg/ovs/openflow Bridge,Table,Flow,Action,CTAction,FlowBuilder,Group,BucketBuilder,PacketOutBuilder testing"
"pkg/ovs/openflow Bridge,Table,Flow,Action,CTAction,FlowBuilder,Group,BucketBuilder,PacketOutBuilder,Meter,MeterBandBuilder testing"
"pkg/ovs/ovsconfig OVSBridgeClient testing"
"pkg/ovs/ovsctl OVSCtlClient testing"
"pkg/ovs/ovsctl OVSOfctlRunner,OVSAppctlRunner ."
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/controller/networkpolicy/audit_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func getNetworkPolicyInfo(pktIn *ofctrl.PacketIn, packet *binding.Packet, c *Con
matchers := pktIn.GetMatches()
var match *ofctrl.MatchField
// Get table name.
tableID := pktIn.TableId
tableID := getPacketInTableID(pktIn)
ob.tableName = openflow.GetFlowTableName(tableID)

var localIP string
Expand Down
74 changes: 70 additions & 4 deletions pkg/agent/controller/networkpolicy/audit_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package networkpolicy

import (
"encoding/binary"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -210,12 +211,18 @@ func TestRedirectPacketLog(t *testing.T) {
func TestGetNetworkPolicyInfo(t *testing.T) {
prepareMockOFTablesWithCache()
generateMatch := func(regID int, data []byte) openflow15.MatchField {
baseData := make([]byte, 8, 8)
if regID%2 == 0 {
copy(baseData[0:4], data)
} else {
copy(baseData[4:8], data)
}
return openflow15.MatchField{
Class: openflow15.OXM_CLASS_PACKET_REGS,
// convert reg (4-byte) ID to xreg (8-byte) ID
Field: uint8(regID / 2),
HasMask: false,
Value: &openflow15.ByteArrayField{Data: data},
Value: &openflow15.ByteArrayField{Data: baseData},
}
}
testPriority, testRule, testLogLabel := "61800", "test-rule", "test-log-label"
Expand All @@ -225,11 +232,11 @@ func TestGetNetworkPolicyInfo(t *testing.T) {
dropCNPDispositionData := []byte{0x11, 0x00, 0x0c, 0x11}
dropK8sDispositionData := []byte{0x11, 0x00, 0x08, 0x11}
redirectDispositionData := []byte{0x11, 0x10, 0x00, 0x11}
// need 8 bytes (full register) of data for the conjunction
// this will be used for one of the following registers depending on the test case:
// use 4 bytes of data for the conjunction identifier, this will be used for one of
// the following registers depending on the test case:
// openflow.APConjIDField, openflow.TFEgressConjIDField, openflow.TFIngressConjIDField
// the data itself is not relevant
conjunctionData := []byte{0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11}
conjunctionData := []byte{0x11, 0x11, 0x11, 0x11}
srcIP := net.ParseIP("192.168.1.1")
destIP := net.ParseIP("192.168.1.2")
testPacket := &binding.Packet{
Expand All @@ -251,6 +258,7 @@ func TestGetNetworkPolicyInfo(t *testing.T) {
OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: 2},
})

antreaIngressRuleTableID := openflow.AntreaPolicyIngressRuleTable.GetID()
tests := []struct {
name string
tableID uint8
Expand All @@ -259,6 +267,7 @@ func TestGetNetworkPolicyInfo(t *testing.T) {
ob *logInfo
wantOb *logInfo
wantErr error
tableIDInReg *uint8
}{
{
name: "ANNP Allow Ingress",
Expand Down Expand Up @@ -370,6 +379,46 @@ func TestGetNetworkPolicyInfo(t *testing.T) {
logLabel: testLogLabel,
},
},
{
name: "Antrea-native Policy Allow from output table",
tableID: openflow.OutputTable.GetID(),
expectedCalls: func(mockClient *openflowtesting.MockClientMockRecorder) {
mockClient.GetPolicyInfoFromConjunction(gomock.Any()).Return(
true, testANNPRef, testPriority, testRule, testLogLabel)
},
dispositionData: allowDispositionData,
wantOb: &logInfo{
tableName: openflow.AntreaPolicyIngressRuleTable.GetName(),
disposition: actionAllow,
npRef: testANNPRef.ToString(),
ofPriority: testPriority,
ruleName: testRule,
direction: "Ingress",
appliedToRef: "default/destPod",
logLabel: testLogLabel,
},
tableIDInReg: &antreaIngressRuleTableID,
},
{
name: "Antrea-native Policy Drop from output table",
tableID: openflow.OutputTable.GetID(),
expectedCalls: func(mockClient *openflowtesting.MockClientMockRecorder) {
mockClient.GetPolicyInfoFromConjunction(gomock.Any()).Return(
true, testANNPRef, testPriority, testRule, testLogLabel)
},
dispositionData: dropCNPDispositionData,
wantOb: &logInfo{
tableName: openflow.AntreaPolicyIngressRuleTable.GetName(),
disposition: actionDrop,
npRef: testANNPRef.ToString(),
ofPriority: testPriority,
ruleName: testRule,
direction: "Ingress",
appliedToRef: "default/destPod",
logLabel: testLogLabel,
},
tableIDInReg: &antreaIngressRuleTableID,
},
}

for _, tc := range tests {
Expand All @@ -390,6 +439,23 @@ func TestGetNetworkPolicyInfo(t *testing.T) {
ingressMatch := generateMatch(regID, conjunctionData)
matchers = append(matchers, ingressMatch)
}
if tc.tableIDInReg != nil {
tableMatchRegID := openflow.PacketInTableField.GetRegID()
tableRegData := make([]byte, 4, 4)
binary.BigEndian.PutUint32(tableRegData[0:], uint32(*tc.tableIDInReg))
found := false
for _, m := range matchers {
if m.Class == openflow15.OXM_CLASS_PACKET_REGS && m.Field == uint8(tableMatchRegID/2) {
copy(m.Value.(*openflow15.ByteArrayField).Data[0:4], tableRegData)
found = true
break
}
}
if !found {
tableMatch := generateMatch(tableMatchRegID, tableRegData)
matchers = append(matchers, tableMatch)
}
}
pktIn := &ofctrl.PacketIn{
PacketIn: &openflow15.PacketIn{
TableId: tc.tableID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ var mockOFTables = map[*openflow.Table]uint8{
openflow.AntreaPolicyIngressRuleTable: uint8(12),
openflow.IngressRuleTable: uint8(13),
openflow.IngressDefaultTable: uint8(14),
openflow.OutputTable: uint8(28),
}

type antreaClientGetter struct {
Expand Down
22 changes: 21 additions & 1 deletion pkg/agent/controller/networkpolicy/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error {
matchers := pktIn.GetMatches()
var match *ofctrl.MatchField
// Get table ID
tableID := pktIn.TableId
tableID := getPacketInTableID(pktIn)
// Get disposition Allow, Drop or Reject
match = getMatchRegField(matchers, openflow.APDispositionField)
id, err := getInfoInReg(match, openflow.APDispositionField.GetRange().ToNXRange())
Expand Down Expand Up @@ -204,3 +204,23 @@ func isAntreaPolicyEgressTable(tableID uint8) bool {
}
return false
}

// getPacketInTableID returns the OVS table ID in which the packet is sent to antrea-agent. Since L2ForwardOutput is
// the table where all Antrea-native policies logging packets are sent to antrea-agent, "PacketInTableField" is used
// to store the real table requiring "sendToController" action. This function first parses the direct table where
// the packet leaves OVS pipeline, then checks whether "PacketInTableField" is set with a valid value or not. The value
// in the field is returned if yes.
func getPacketInTableID(pktIn *ofctrl.PacketIn) uint8 {
tableID := pktIn.TableId
matchers := pktIn.GetMatches()
if match := getMatchRegField(matchers, openflow.PacketInTableField); match != nil {
tableVal, err := getInfoInReg(match, openflow.PacketInTableField.GetRange().ToNXRange())
if err == nil {
return uint8(tableVal)
} else {
// This is not expected, so we log an error.
klog.ErrorS(err, "Unable to parse table ID from PacketInTableField in PacketIn message, using the packetIn.TableId", "table", tableID)
}
}
return tableID
}
4 changes: 2 additions & 2 deletions pkg/agent/controller/traceflow/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl
}

// Get output table.
if tableID == openflow.L2ForwardingOutTable.GetID() {
if tableID == openflow.OutputTable.GetID() {
ob := new(crdv1alpha1.Observation)
tunnelDstIP := ""
// decide according to packet.
Expand Down Expand Up @@ -323,7 +323,7 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl
// Output port is Pod port, packet is delivered.
ob.Action = crdv1alpha1.ActionDelivered
}
ob.ComponentInfo = openflow.L2ForwardingOutTable.GetName()
ob.ComponentInfo = openflow.OutputTable.GetName()
ob.Component = crdv1alpha1.ComponentForwarding
obs = append(obs, *ob)
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/agent/controller/traceflow/packetin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func prepareMockTables() {
openflow.IngressRuleTable: uint8(13),
openflow.IngressDefaultTable: uint8(14),
openflow.IngressMetricTable: uint8(15),
openflow.L2ForwardingOutTable: uint8(17),
openflow.OutputTable: uint8(17),
})
}

Expand Down Expand Up @@ -94,7 +94,7 @@ func Test_getNetworkPolicyObservation(t *testing.T) {
{
name: "ingress accept",
args: args{
tableID: openflow.L2ForwardingOutTable.GetID(),
tableID: openflow.OutputTable.GetID(),
ingress: true,
},
want: &crdv1alpha1.Observation{
Expand All @@ -118,7 +118,7 @@ func Test_getNetworkPolicyObservation(t *testing.T) {
{
name: "egress accept",
args: args{
tableID: openflow.L2ForwardingOutTable.GetID(),
tableID: openflow.OutputTable.GetID(),
ingress: false,
},
want: &crdv1alpha1.Observation{
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestParsePacketIn(t *testing.T) {
},
pktIn: &ofctrl.PacketIn{
PacketIn: &openflow15.PacketIn{
TableId: openflow.L2ForwardingOutTable.GetID(),
TableId: openflow.OutputTable.GetID(),
Match: openflow15.Match{
Fields: []openflow15.MatchField{*matchOutPort, *matchPktMark},
},
Expand Down Expand Up @@ -321,7 +321,7 @@ func TestParsePacketIn(t *testing.T) {
},
{
Component: crdv1alpha1.ComponentForwarding,
ComponentInfo: openflow.L2ForwardingOutTable.GetName(),
ComponentInfo: openflow.OutputTable.GetName(),
Action: crdv1alpha1.ActionForwardedOutOfOverlay,
},
},
Expand All @@ -345,7 +345,7 @@ func TestParsePacketIn(t *testing.T) {
},
pktIn: &ofctrl.PacketIn{
PacketIn: &openflow15.PacketIn{
TableId: openflow.L2ForwardingOutTable.GetID(),
TableId: openflow.OutputTable.GetID(),
Match: openflow15.Match{
Fields: []openflow15.MatchField{*matchTunDst, *matchOutPort},
},
Expand Down Expand Up @@ -384,7 +384,7 @@ func TestParsePacketIn(t *testing.T) {
},
{
Component: crdv1alpha1.ComponentForwarding,
ComponentInfo: openflow.L2ForwardingOutTable.GetName(),
ComponentInfo: openflow.OutputTable.GetName(),
Action: crdv1alpha1.ActionForwarded,
TunnelDstIP: egressIP,
},
Expand All @@ -408,7 +408,7 @@ func TestParsePacketIn(t *testing.T) {
},
pktIn: &ofctrl.PacketIn{
PacketIn: &openflow15.PacketIn{
TableId: openflow.L2ForwardingOutTable.GetID(),
TableId: openflow.OutputTable.GetID(),
Match: openflow15.Match{
Fields: []openflow15.MatchField{*matchOutPort, *matchTunDst, *matchPktMark},
},
Expand Down Expand Up @@ -446,7 +446,7 @@ func TestParsePacketIn(t *testing.T) {
},
{
Component: crdv1alpha1.ComponentForwarding,
ComponentInfo: openflow.L2ForwardingOutTable.GetName(),
ComponentInfo: openflow.OutputTable.GetName(),
Action: crdv1alpha1.ActionForwardedOutOfOverlay,
},
},
Expand Down Expand Up @@ -490,7 +490,7 @@ func TestParsePacketInLiveDuplicates(t *testing.T) {
}
pktIn := &ofctrl.PacketIn{
PacketIn: &openflow15.PacketIn{
TableId: openflow.L2ForwardingOutTable.GetID(),
TableId: openflow.OutputTable.GetID(),
Data: util.NewBuffer(getTestPacketBytes()),
},
}
Expand Down
31 changes: 16 additions & 15 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,12 +789,6 @@ func (c *client) initialize() error {
return fmt.Errorf("failed to install default flows: %v", err)
}

for _, activeFeature := range c.activatedFeatures {
if err := c.ofEntryOperations.AddAll(activeFeature.initFlows()); err != nil {
return fmt.Errorf("failed to install feature %v initial flows: %v", activeFeature.getFeatureName(), err)
}
}

if c.ovsMetersAreSupported {
if err := c.genPacketInMeter(PacketInMeterIDNP, PacketInMeterRateNP).Add(); err != nil {
return fmt.Errorf("failed to install OpenFlow meter entry (meterID:%d, rate:%d) for NetworkPolicy packet-in rate limiting: %v", PacketInMeterIDNP, PacketInMeterRateNP, err)
Expand All @@ -803,6 +797,16 @@ func (c *client) initialize() error {
return fmt.Errorf("failed to install OpenFlow meter entry (meterID:%d, rate:%d) for TraceFlow packet-in rate limiting: %v", PacketInMeterIDTF, PacketInMeterRateTF, err)
}
}

for _, activeFeature := range c.activatedFeatures {
if err := c.ofEntryOperations.AddOFEntries(activeFeature.initGroups()); err != nil {
return fmt.Errorf("failed to install feature %v initial groups: %v", activeFeature.getFeatureName(), err)
}
if err := c.ofEntryOperations.AddAll(activeFeature.initFlows()); err != nil {
return fmt.Errorf("failed to install feature %v initial flows: %v", activeFeature.getFeatureName(), err)
}
}

return nil
}

Expand Down Expand Up @@ -909,7 +913,8 @@ func (c *client) generatePipelines() {
c.enableMulticast,
c.proxyAll,
c.connectUplinkToBridge,
c.nodeType)
c.nodeType,
c.groupIDAllocator)
c.activatedFeatures = append(c.activatedFeatures, c.featureNetworkPolicy)
c.traceableFeatures = append(c.traceableFeatures, c.featureNetworkPolicy)

Expand Down Expand Up @@ -1018,14 +1023,10 @@ func (c *client) ReplayFlows() {
klog.Errorf("Error during flow replay: %v", err)
}

if c.featureService != nil {
c.featureService.replayGroups()
}
if c.enableMulticast {
c.featureMulticast.replayGroups()
}

for _, activeFeature := range c.activatedFeatures {
if err := c.ofEntryOperations.AddOFEntries(activeFeature.replayGroups()); err != nil {
klog.ErrorS(err, "Error when replaying feature groups", "feature", activeFeature.getFeatureName())
}
if err := c.ofEntryOperations.AddAll(activeFeature.replayFlows()); err != nil {
klog.ErrorS(err, "Error when replaying feature flows", "feature", activeFeature.getFeatureName())
}
Expand Down Expand Up @@ -1484,7 +1485,7 @@ func (c *client) InstallMulticlusterGatewayFlows(clusterID string,
// to set its target output port as 'antrea-tun0'. This flow will be on both Gateway and regular Node.
// - One flow in ClassifierTable for the tunnel traffic if it's not Encap mode.
// - One flow to match MC virtual MAC 'aa:bb:cc:dd:ee:f0' in ClassifierTable for Gateway only.
// - One flow in L2ForwardingOutTable to allow multicluster hairpin traffic for Gateway only.
// - One flow in OutputTable to allow multicluster hairpin traffic for Gateway only.
func (c *client) InstallMulticlusterClassifierFlows(tunnelOFPort uint32, isGateway bool) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
Expand Down
Loading

0 comments on commit 31c3289

Please sign in to comment.