From 1c47b502de3d7796fa7db4ff1d10f4143926cdda Mon Sep 17 00:00:00 2001 From: Shuyang Xin Date: Sun, 15 May 2022 18:20:26 +0800 Subject: [PATCH] Support protocol-independent NPL annotation * Add protocol string to new NPLAnnotation set the protocols map as a deprecated value. * Support Node port for UDP and TCP using the different number for a single Pod on Windows. issue antrea-io#3826 Signed-off-by: Shuyang Xin --- pkg/agent/nodeportlocal/k8s/annotations.go | 3 +- pkg/agent/nodeportlocal/k8s/npl_controller.go | 13 +- pkg/agent/nodeportlocal/npl_agent_test.go | 2 +- .../nodeportlocal/portcache/port_table.go | 38 +--- .../portcache/port_table_linux.go | 44 +++- .../portcache/port_table_test.go | 2 +- .../portcache/port_table_windows.go | 191 +++++------------- pkg/agent/nodeportlocal/rules/netnat_rule.go | 10 +- pkg/agent/nodeportlocal/rules/types.go | 1 + .../nodeportlocal/testing/annotations.go | 21 +- test/e2e/framework.go | 4 +- test/e2e/nodeportlocal_test.go | 10 +- 12 files changed, 121 insertions(+), 218 deletions(-) diff --git a/pkg/agent/nodeportlocal/k8s/annotations.go b/pkg/agent/nodeportlocal/k8s/annotations.go index b8dc82d3a2d..c5a69a73df8 100644 --- a/pkg/agent/nodeportlocal/k8s/annotations.go +++ b/pkg/agent/nodeportlocal/k8s/annotations.go @@ -39,7 +39,8 @@ type NPLAnnotation struct { PodPort int `json:"podPort"` NodeIP string `json:"nodeIP"` NodePort int `json:"nodePort"` - Protocols []string `json:"protocols"` + Protocol string `json:"protocol"` + Protocols []string `json:"protocols"` // deprecated, array with a single member which is equal to the Protocol field } func toJSON(serialize interface{}) string { diff --git a/pkg/agent/nodeportlocal/k8s/npl_controller.go b/pkg/agent/nodeportlocal/k8s/npl_controller.go index 4f7ddf81660..362dfa2bf63 100644 --- a/pkg/agent/nodeportlocal/k8s/npl_controller.go +++ b/pkg/agent/nodeportlocal/k8s/npl_controller.go @@ -447,7 +447,7 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error { } } - nplAnnotationsRequiredMap := map[int]NPLAnnotation{} + nplAnnotationsRequiredMap := map[string]NPLAnnotation{} nplAnnotationsRequired := []NPLAnnotation{} hostPorts := make(map[string]int) @@ -491,7 +491,7 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error { return fmt.Errorf("failed to parse port number and protocol from %s for Pod %s: %v", targetPortProto, key, err) } podPorts[targetPortProto] = struct{}{} - portData := c.portTable.GetEntry(podIP, port) + portData := c.portTable.GetEntry(podIP, port, protocol) if portData != nil && !portData.ProtocolInUse(protocol) { // If the PortTable has an entry for the Pod but does not have an // entry with protocol, we enforce AddRule for the missing Protocol. @@ -510,14 +510,12 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error { nodePort = portData.NodePort } - if val, ok := nplAnnotationsRequiredMap[nodePort]; ok { - val.Protocols = append(val.Protocols, protocol) - nplAnnotationsRequiredMap[nodePort] = val - } else { - nplAnnotationsRequiredMap[nodePort] = NPLAnnotation{ + if _, ok := nplAnnotationsRequiredMap[portcache.NodePortProtoFormat(nodePort, protocol)]; !ok { + nplAnnotationsRequiredMap[portcache.NodePortProtoFormat(nodePort, protocol)] = NPLAnnotation{ PodPort: port, NodeIP: pod.Status.HostIP, NodePort: nodePort, + Protocol: protocol, Protocols: []string{protocol}, } } @@ -604,6 +602,7 @@ func (c *NPLController) waitForRulesInitialization() { NodePort: npl.NodePort, PodPort: npl.PodPort, PodIP: pod.Status.PodIP, + Protocol: npl.Protocol, Protocols: npl.Protocols, }) } diff --git a/pkg/agent/nodeportlocal/npl_agent_test.go b/pkg/agent/nodeportlocal/npl_agent_test.go index c577e7e99b0..2c1efbf0579 100644 --- a/pkg/agent/nodeportlocal/npl_agent_test.go +++ b/pkg/agent/nodeportlocal/npl_agent_test.go @@ -66,7 +66,7 @@ const ( func newPortTable(mockIPTables rules.PodPortRules, mockPortOpener portcache.LocalPortOpener) *portcache.PortTable { return &portcache.PortTable{ - NodePortTable: make(map[int]*portcache.NodePortData), + NodePortTable: make(map[string]*portcache.NodePortData), PodEndpointTable: make(map[string]*portcache.NodePortData), StartPort: defaultStartPort, EndPort: defaultEndPort, diff --git a/pkg/agent/nodeportlocal/portcache/port_table.go b/pkg/agent/nodeportlocal/portcache/port_table.go index b4d77aef076..a2923f74689 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table.go +++ b/pkg/agent/nodeportlocal/portcache/port_table.go @@ -25,26 +25,10 @@ import ( "antrea.io/antrea/pkg/agent/nodeportlocal/rules" ) -var ( - supportedProtocols = []string{"tcp", "udp"} -) - // protocolSocketState represents the state of the socket corresponding to a // given (Node port, protocol) tuple. type protocolSocketState int -const ( - // stateOpen means that a listening socket has been opened for the - // protocol (as a means to reserve the port for this protocol), but no - // NPL rule has been installed for it. - stateOpen protocolSocketState = iota - // stateInUse means that a listening socket has been opened AND a NPL - // rule has been installed. - stateInUse - // stateClosed means that the socket has been closed. - stateClosed -) - type ProtocolSocketData struct { Protocol string State protocolSocketState @@ -83,7 +67,7 @@ type LocalPortOpener interface { type localPortOpener struct{} type PortTable struct { - NodePortTable map[int]*NodePortData + NodePortTable map[string]*NodePortData PodEndpointTable map[string]*NodePortData StartPort int EndPort int @@ -95,7 +79,7 @@ type PortTable struct { func NewPortTable(start, end int) (*PortTable, error) { ptable := PortTable{ - NodePortTable: make(map[int]*NodePortData), + NodePortTable: make(map[string]*NodePortData), PodEndpointTable: make(map[string]*NodePortData), StartPort: start, EndPort: end, @@ -112,21 +96,10 @@ func NewPortTable(start, end int) (*PortTable, error) { func (pt *PortTable) CleanupAllEntries() { pt.tableLock.Lock() defer pt.tableLock.Unlock() - pt.NodePortTable = make(map[int]*NodePortData) + pt.NodePortTable = make(map[string]*NodePortData) pt.PodEndpointTable = make(map[string]*NodePortData) } -func (pt *PortTable) GetEntry(ip string, port int) *NodePortData { - pt.tableLock.RLock() - defer pt.tableLock.RUnlock() - // Return pointer to copy of data from the PodEndpointTable. - if data := pt.getEntryByPodIPPort(ip, port); data != nil { - dataCopy := *data - return &dataCopy - } - return nil -} - func (pt *PortTable) GetDataForPodIP(ip string) []NodePortData { pt.tableLock.RLock() defer pt.tableLock.RUnlock() @@ -156,6 +129,11 @@ func (pt *PortTable) RuleExists(podIP string, podPort int, protocol string) bool return false } +// nodePortProtoFormat formats the nodeport, protocol to string port:protocol. +func NodePortProtoFormat(nodeport int, protocol string) string { + return fmt.Sprintf("%d:%s", nodeport, protocol) +} + // podIPPortFormat formats the ip, port to string ip:port. func podIPPortFormat(ip string, port int) string { return fmt.Sprintf("%s:%d", ip, port) diff --git a/pkg/agent/nodeportlocal/portcache/port_table_linux.go b/pkg/agent/nodeportlocal/portcache/port_table_linux.go index fb55b60480e..dbaa5a32b86 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table_linux.go +++ b/pkg/agent/nodeportlocal/portcache/port_table_linux.go @@ -19,6 +19,7 @@ package portcache import ( "fmt" + "strconv" "time" "k8s.io/klog/v2" @@ -26,9 +27,38 @@ import ( "antrea.io/antrea/pkg/agent/nodeportlocal/rules" ) +const ( + // stateOpen means that a listening socket has been opened for the + // protocol (as a means to reserve the port for this protocol), but no + // NPL rule has been installed for it. + stateOpen protocolSocketState = iota + // stateInUse means that a listening socket has been opened AND a NPL + // rule has been installed. + stateInUse + // stateClosed means that the socket has been closed. + stateClosed +) + +var ( + supportedProtocols = []string{"tcp", "udp"} +) + +func (pt *PortTable) GetEntry(ip string, port int, protocol string) *NodePortData { + var _ = protocol + pt.tableLock.RLock() + defer pt.tableLock.RUnlock() + // Return pointer to copy of data from the PodEndpointTable. + if data := pt.getEntryByPodIPPort(ip, port); data != nil { + dataCopy := *data + return &dataCopy + } + return nil +} + func openSocketsForPort(localPortOpener LocalPortOpener, port int) ([]ProtocolSocketData, error) { - // port needs to be available for all supported protocols: we want to use the same port + // Port needs to be available for all supported protocols: we want to use the same port // number for all protocols and we don't know at this point which protocols are needed. + // This is to preserve the legacy behavior of allocating the same nodePort for all protocols. protocols := make([]ProtocolSocketData, 0, len(supportedProtocols)) for _, protocol := range supportedProtocols { socket, err := localPortOpener.OpenLocalPort(port, protocol) @@ -54,7 +84,7 @@ func (pt *PortTable) getFreePort(podIP string, podPort int) (int, []ProtocolSock // handle wrap around port = port - numPorts } - if _, ok := pt.NodePortTable[port]; ok { + if _, ok := pt.NodePortTable[strconv.Itoa(port)]; ok { // port is already taken continue } @@ -172,7 +202,7 @@ func (pt *PortTable) AddRule(podIP string, podPort int, protocol string) (int, e protocolSocketData.State = stateInUse if !exists { - pt.NodePortTable[nodePort] = npData + pt.NodePortTable[strconv.Itoa(nodePort)] = npData pt.PodEndpointTable[podIPPortFormat(podIP, podPort)] = npData } return npData.NodePort, nil @@ -210,7 +240,7 @@ func (pt *PortTable) DeleteRule(podIP string, podPort int, protocol string) erro if err := data.CloseSockets(); err != nil { return err } - delete(pt.NodePortTable, data.NodePort) + delete(pt.NodePortTable, strconv.Itoa(data.NodePort)) delete(pt.PodEndpointTable, podIPPortFormat(podIP, podPort)) } return nil @@ -231,7 +261,7 @@ func (pt *PortTable) DeleteRulesForPod(podIP string) error { } podEntry.Protocols = podEntry.Protocols[1:] } - delete(pt.NodePortTable, podEntry.NodePort) + delete(pt.NodePortTable, strconv.Itoa(podEntry.NodePort)) delete(pt.PodEndpointTable, podIPPortFormat(podIP, podEntry.PodPort)) } return nil @@ -293,8 +323,8 @@ func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<- } protocolSocketData.State = stateInUse } - pt.NodePortTable[nplPort.NodePort] = npData - pt.PodEndpointTable[podIPPortFormat(nplPort.PodIP, nplPort.PodPort)] = pt.NodePortTable[nplPort.NodePort] + pt.NodePortTable[strconv.Itoa(nplPort.NodePort)] = npData + pt.PodEndpointTable[podIPPortFormat(nplPort.PodIP, nplPort.PodPort)] = pt.NodePortTable[strconv.Itoa(nplPort.NodePort)] } // retry mechanism as iptables-restore can fail if other components (in Antrea or other // software) are accessing iptables. diff --git a/pkg/agent/nodeportlocal/portcache/port_table_test.go b/pkg/agent/nodeportlocal/portcache/port_table_test.go index 6b681455a75..271ffcc2853 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table_test.go +++ b/pkg/agent/nodeportlocal/portcache/port_table_test.go @@ -35,7 +35,7 @@ const ( func newPortTable(mockIPTables rules.PodPortRules, mockPortOpener LocalPortOpener) *PortTable { return &PortTable{ - NodePortTable: make(map[int]*NodePortData), + NodePortTable: make(map[string]*NodePortData), PodEndpointTable: make(map[string]*NodePortData), StartPort: startPort, EndPort: endPort, diff --git a/pkg/agent/nodeportlocal/portcache/port_table_windows.go b/pkg/agent/nodeportlocal/portcache/port_table_windows.go index 8fe5a48ce90..ef5d6ee539a 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table_windows.go +++ b/pkg/agent/nodeportlocal/portcache/port_table_windows.go @@ -25,36 +25,48 @@ import ( "antrea.io/antrea/pkg/agent/nodeportlocal/rules" ) +const ( + // stateInUse means that the NPL rule has been installed. + stateInUse protocolSocketState = 1 +) + +// podIPPortFormat formats the ip, port to string ip:port. +func podIPPortProtoFormat(ip string, port int, protocol string) string { + return fmt.Sprintf("%s:%d:%s", ip, port, protocol) +} + +func (pt *PortTable) getEntryByPodIPPortProto(ip string, port int, protocol string) *NodePortData { + return pt.PodEndpointTable[podIPPortProtoFormat(ip, port, protocol)] +} + +func (pt *PortTable) GetEntry(ip string, port int, protocol string) *NodePortData { + pt.tableLock.RLock() + defer pt.tableLock.RUnlock() + // Return pointer to copy of data from the PodEndpointTable. + if data := pt.getEntryByPodIPPortProto(ip, port, protocol); data != nil { + dataCopy := *data + return &dataCopy + } + return nil +} + func addRuleForPort(podPortRules rules.PodPortRules, port int, podIP string, podPort int, protocol string) ([]ProtocolSocketData, error) { - // port needs to be available for all supported protocols: all available protocols - // should be returned if any NetNatStaticMapping rule can be inserted to an unused port. - protocols := make([]ProtocolSocketData, 0, len(supportedProtocols)) + // Only the protocol used here should be returned if NetNatStaticMapping rule + // can be inserted to an unused protocol port. + protocols := make([]ProtocolSocketData, 0, 1) err := podPortRules.AddRule(port, podIP, podPort, protocol) if err != nil { klog.ErrorS(err, "Local port cannot be opened", "port", port, "protocol", protocol) return nil, err } - for _, proto := range supportedProtocols { - protocols = append(protocols, ProtocolSocketData{ - Protocol: proto, - State: stateOpen, - socket: nil, - }) - } + protocols = append(protocols, ProtocolSocketData{ + Protocol: protocol, + State: stateInUse, + socket: nil, + }) return protocols, nil } -func updateCloseState(protocols []ProtocolSocketData) error { - for idx := range protocols { - protocolSocketData := &protocols[idx] - if protocolSocketData.State != stateOpen { - continue - } - protocolSocketData.State = stateClosed - } - return nil -} - func (pt *PortTable) addRuleforFreePort(podIP string, podPort int, protocol string) (int, []ProtocolSocketData, error) { klog.V(2).InfoS("Looking for free Node port on Windows", "podIP", podIP, "podPort", podPort, "protocol", protocol) numPorts := pt.EndPort - pt.StartPort + 1 @@ -64,8 +76,8 @@ func (pt *PortTable) addRuleforFreePort(podIP string, podPort int, protocol stri // handle wrap around port = port - numPorts } - if _, ok := pt.NodePortTable[port]; ok { - // port is already taken + if _, ok := pt.NodePortTable[NodePortProtoFormat(port, protocol)]; ok { + // protocol port is already taken continue } @@ -87,11 +99,11 @@ func (pt *PortTable) addRuleforFreePort(podIP string, podPort int, protocol stri func (pt *PortTable) AddRule(podIP string, podPort int, protocol string) (int, error) { pt.tableLock.Lock() defer pt.tableLock.Unlock() - npData := pt.getEntryByPodIPPort(podIP, podPort) + npData := pt.getEntryByPodIPPortProto(podIP, podPort, protocol) exists := (npData != nil) if !exists { nodePort, protocols, err := pt.addRuleforFreePort(podIP, podPort, protocol) - //success means port, protocol available, state is open. + //success means port, protocol available. if err != nil { return 0, err } @@ -101,73 +113,27 @@ func (pt *PortTable) AddRule(podIP string, podPort int, protocol string) (int, e PodPort: podPort, Protocols: protocols, } - protocolSocketData := npData.FindProtocol(protocol) - protocolSocketData.State = stateInUse - pt.NodePortTable[nodePort] = npData - pt.PodEndpointTable[podIPPortFormat(podIP, podPort)] = npData + pt.NodePortTable[NodePortProtoFormat(nodePort, protocol)] = npData + pt.PodEndpointTable[podIPPortProtoFormat(podIP, podPort, protocol)] = npData } else { - protocolSocketData := npData.FindProtocol(protocol) - if protocolSocketData == nil { - return 0, fmt.Errorf("unknown protocol %s", protocol) - } - if protocolSocketData.State == stateInUse { - return 0, fmt.Errorf("rule for %s:%d:%s already exists", podIP, podPort, protocol) - } - if protocolSocketData.State == stateClosed { - return 0, fmt.Errorf("invalid socket state for %s:%d:%s", podIP, podPort, protocol) - } - - nodePort := npData.NodePort - if err := pt.PodPortRules.AddRule(nodePort, podIP, podPort, protocol); err != nil { - return 0, err - } - - protocolSocketData.State = stateInUse + // Only add rules for if the entry does not exist. + return 0, fmt.Errorf("existed windows nodeport entry for %s:%d:%s", podIP, podPort, protocol) } return npData.NodePort, nil } -// syncRules ensures that contents of the port table matches the netnat rules present on the Node. -func (pt *PortTable) syncRules() error { - pt.tableLock.Lock() - defer pt.tableLock.Unlock() - nplPorts := make([]rules.PodNodePort, 0, len(pt.NodePortTable)) - for _, npData := range pt.NodePortTable { - protocols := make([]string, 0, len(supportedProtocols)) - for _, protocol := range npData.Protocols { - if protocol.State == stateInUse { - protocols = append(protocols, protocol.Protocol) - } - } - nplPorts = append(nplPorts, rules.PodNodePort{ - NodePort: npData.NodePort, - PodPort: npData.PodPort, - PodIP: npData.PodIP, - Protocols: protocols, - }) - } - if err := pt.PodPortRules.AddAllRules(nplPorts); err != nil { - return err - } - return nil -} - -// RestoreRules should be called at Antrea Agent startup to restore a set of NPL rules. It is non-blocking but -// takes a channel parameter - synced, which will be closed when the necessary rules have been -// restored successfully. No other operations should be performed on the PortTable until the channel -// is closed. +// RestoreRules should be called at Antrea Agent startup to restore a set of NPL rules. func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<- struct{}) error { pt.tableLock.Lock() defer pt.tableLock.Unlock() for _, nplPort := range allNPLPorts { - protocols, err := addRuleForPort(pt.PodPortRules, nplPort.NodePort, nplPort.PodIP, nplPort.PodPort, nplPort.Protocols[0]) + protocols, err := addRuleForPort(pt.PodPortRules, nplPort.NodePort, nplPort.PodIP, nplPort.PodPort, nplPort.Protocol) if err != nil { // This will be handled gracefully by the NPL controller: if there is an // annotation using this port, it will be removed and replaced with a new // one with a valid port mapping. klog.ErrorS(err, "Cannot bind to local port, skipping it", "port", nplPort.NodePort) - updateCloseState(protocols) continue } @@ -177,79 +143,31 @@ func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<- PodIP: nplPort.PodIP, Protocols: protocols, } - for _, protocol := range nplPort.Protocols { - protocolSocketData := npData.FindProtocol(protocol) - if protocolSocketData == nil { - return fmt.Errorf("unknown protocol %s", protocol) - } - protocolSocketData.State = stateInUse - } - pt.NodePortTable[nplPort.NodePort] = npData - pt.PodEndpointTable[podIPPortFormat(nplPort.PodIP, nplPort.PodPort)] = pt.NodePortTable[nplPort.NodePort] - } - - go func() { - defer close(synced) - if err := pt.syncRules(); err != nil { - klog.ErrorS(err, "Failed to restore netnat rules") - } - }() - return nil -} - -func (d *NodePortData) UpdateCloseStates() error { - for idx := range d.Protocols { - protocolSocketData := &d.Protocols[idx] - switch protocolSocketData.State { - case stateClosed: - // already closed - continue - case stateInUse: - // should not happen - return fmt.Errorf("protocol %s is still in use, cannot update status to closed", protocolSocketData.Protocol) - case stateOpen: - protocolSocketData.State = stateClosed - default: - return fmt.Errorf("invalid protocol state") - } + pt.PodEndpointTable[podIPPortProtoFormat(nplPort.PodIP, nplPort.PodPort, nplPort.Protocol)] = pt.NodePortTable[NodePortProtoFormat(nplPort.NodePort, nplPort.Protocol)] + pt.NodePortTable[NodePortProtoFormat(nplPort.NodePort, nplPort.Protocol)] = npData } + // No need to sync up again because addRuleForPort has updated all rules on Windows + close(synced) return nil } func (pt *PortTable) DeleteRule(podIP string, podPort int, protocol string) error { pt.tableLock.Lock() defer pt.tableLock.Unlock() - data := pt.getEntryByPodIPPort(podIP, podPort) + data := pt.getEntryByPodIPPortProto(podIP, podPort, protocol) if data == nil { // Delete not required when the PortTable entry does not exist return nil } - numProtocolsInUse := 0 var protocolSocketData *ProtocolSocketData - for idx, pData := range data.Protocols { - if pData.State != stateInUse { - continue - } - numProtocolsInUse++ - if pData.Protocol == protocol { - protocolSocketData = &data.Protocols[idx] - } - } + protocolSocketData = &data.Protocols[0] if protocolSocketData != nil { if err := pt.PodPortRules.DeleteRule(data.NodePort, podIP, podPort, protocol); err != nil { return err } - protocolSocketData.State = stateOpen - numProtocolsInUse-- - } - if numProtocolsInUse == 0 { - // Node port is not needed anymore: status update - if err := data.UpdateCloseStates(); err != nil { - return err - } - delete(pt.NodePortTable, data.NodePort) - delete(pt.PodEndpointTable, podIPPortFormat(podIP, podPort)) } + delete(pt.NodePortTable, NodePortProtoFormat(data.NodePort, protocol)) + delete(pt.PodEndpointTable, podIPPortProtoFormat(podIP, podPort, protocol)) return nil } @@ -258,15 +176,14 @@ func (pt *PortTable) DeleteRulesForPod(podIP string) error { defer pt.tableLock.Unlock() podEntries := pt.getDataForPodIP(podIP) for _, podEntry := range podEntries { - for len(podEntry.Protocols) > 0 { + if len(podEntry.Protocols) > 0 { protocolSocketData := podEntry.Protocols[0] if err := pt.PodPortRules.DeleteRule(podEntry.NodePort, podIP, podEntry.PodPort, protocolSocketData.Protocol); err != nil { return err } - podEntry.Protocols = podEntry.Protocols[1:] + delete(pt.PodEndpointTable, podIPPortProtoFormat(podIP, podEntry.PodPort, protocolSocketData.Protocol)) + delete(pt.NodePortTable, NodePortProtoFormat(podEntry.NodePort, protocolSocketData.Protocol)) } - delete(pt.NodePortTable, podEntry.NodePort) - delete(pt.PodEndpointTable, podIPPortFormat(podIP, podEntry.PodPort)) } return nil } diff --git a/pkg/agent/nodeportlocal/rules/netnat_rule.go b/pkg/agent/nodeportlocal/rules/netnat_rule.go index 784fd559f52..e5c36effffc 100644 --- a/pkg/agent/nodeportlocal/rules/netnat_rule.go +++ b/pkg/agent/nodeportlocal/rules/netnat_rule.go @@ -81,14 +81,8 @@ func (nn *netnatRules) AddRule(nodePort int, podIP string, podPort int, protocol // AddAllRules constructs a list of NPL rules and performs NetNatStaticMapping replacement. func (nn *netnatRules) AddAllRules(nplList []PodNodePort) error { for _, nplData := range nplList { - for _, protocol := range nplData.Protocols { - nodePort16 := util.PortToUint16(nplData.NodePort) - podPort16 := util.PortToUint16(nplData.PodPort) - podAddr := fmt.Sprintf("%s:%d", nplData.PodIP, podPort16) - if err := util.ReplaceNetNatStaticMapping(antreaNatNPL, "0.0.0.0", nodePort16, nplData.PodIP, podPort16, protocol); err != nil { - return err - } - klog.InfoS("Successfully added NetNatStaticMapping rule", "podAddr", podAddr, "nodePort", nodePort16, "protocol", protocol) + if err := nn.AddRule(nplData.NodePort, nplData.PodIP, nplData.PodPort, nplData.Protocol); err != nil { + return err } } return nil diff --git a/pkg/agent/nodeportlocal/rules/types.go b/pkg/agent/nodeportlocal/rules/types.go index 0ebd4626391..21b89e5eb82 100644 --- a/pkg/agent/nodeportlocal/rules/types.go +++ b/pkg/agent/nodeportlocal/rules/types.go @@ -19,5 +19,6 @@ type PodNodePort struct { NodePort int PodPort int PodIP string + Protocol string Protocols []string } diff --git a/pkg/agent/nodeportlocal/testing/annotations.go b/pkg/agent/nodeportlocal/testing/annotations.go index d7ff30e2b25..415a231af0f 100644 --- a/pkg/agent/nodeportlocal/testing/annotations.go +++ b/pkg/agent/nodeportlocal/testing/annotations.go @@ -40,24 +40,18 @@ func NewExpectedNPLAnnotations(nodeIP *string, nplStartPort, nplEndPort int) *Ex } } -func (a *ExpectedNPLAnnotations) find(podPort int) *nplk8s.NPLAnnotation { +func (a *ExpectedNPLAnnotations) find(podPort int, protocol string) *nplk8s.NPLAnnotation { for _, annotation := range a.annotations { - if annotation.PodPort == podPort { + if annotation.PodPort == podPort && annotation.Protocol == protocol { return &annotation } } return nil } -func (a *ExpectedNPLAnnotations) Add(nodePort *int, podPort int, protocols ...string) *ExpectedNPLAnnotations { - for i, annotation := range a.annotations { - if annotation.PodPort == podPort { - annotation.Protocols = append(annotation.Protocols, protocols...) - a.annotations[i] = annotation - return a - } - } - annotation := nplk8s.NPLAnnotation{PodPort: podPort, Protocols: protocols} +func (a *ExpectedNPLAnnotations) Add(nodePort *int, podPort int, protocol string) *ExpectedNPLAnnotations { + protocols := []string{protocol} + annotation := nplk8s.NPLAnnotation{PodPort: podPort, Protocol: protocol, Protocols: protocols} if nodePort != nil { annotation.NodePort = *nodePort } @@ -70,11 +64,8 @@ func (a *ExpectedNPLAnnotations) Add(nodePort *int, podPort int, protocols ...st func (a *ExpectedNPLAnnotations) Check(t *testing.T, nplValue []nplk8s.NPLAnnotation) { assert.Equal(t, len(a.annotations), len(nplValue), "Invalid number of NPL annotations") - nodePorts := make(map[int]bool) for _, nplAnnotation := range nplValue { - assert.NotContains(t, nodePorts, nplAnnotation.NodePort, "Duplicate Node ports in NPL annotations") - nodePorts[nplAnnotation.NodePort] = true - expectedAnnotation := a.find(nplAnnotation.PodPort) + expectedAnnotation := a.find(nplAnnotation.PodPort, nplAnnotation.Protocol) if !assert.NotNilf(t, expectedAnnotation, "Unexpected annotation with PodPort %d", nplAnnotation.PodPort) { continue } diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 33917e81f40..844af4a2f61 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -1782,7 +1782,7 @@ func (data *TestData) runNetcatCommandFromTestPod(podName string, ns string, ser return data.runNetcatCommandFromTestPodWithProtocol(podName, ns, busyboxContainerName, server, port, "tcp") } -func (data *TestData) runNetcatCommandFromTestPodWithProtocol(podName string, ns string, ctrName string, server string, port int32, protocol string) error { +func (data *TestData) runNetcatCommandFromTestPodWithProtocol(podName string, ns string, containerName string, server string, port int32, protocol string) error { // No parameter required for TCP connections. protocolOption := "" if protocol == "udp" { @@ -1796,7 +1796,7 @@ func (data *TestData) runNetcatCommandFromTestPodWithProtocol(podName string, ns protocolOption, server, port), } - stdout, stderr, err := data.RunCommandFromPod(ns, podName, ctrName, cmd) + stdout, stderr, err := data.RunCommandFromPod(ns, podName, containerName, cmd) if err == nil { return nil } diff --git a/test/e2e/nodeportlocal_test.go b/test/e2e/nodeportlocal_test.go index 107754c6688..6168bb3d959 100644 --- a/test/e2e/nodeportlocal_test.go +++ b/test/e2e/nodeportlocal_test.go @@ -188,7 +188,6 @@ func checkNPLRulesForWindowsPod(t *testing.T, data *TestData, r *require.Asserti } } checkForNPLRuleInNetNat(t, data, r, antreaPod, nodeName, rules, present) - //don't need to check listening socket on windows } func buildRuleForPod(rule nplRuleData) []string { @@ -335,14 +334,7 @@ func deleteNPLRuleFromIPTables(t *testing.T, data *TestData, r *require.Assertio func deleteNPLRuleFromNetNat(t *testing.T, data *TestData, r *require.Assertions, antreaPod string, rule nplRuleData) { t.Logf("Deleting Netnat rule for %v", rule) - const timeout = 30 * time.Second - err := wait.Poll(time.Second, timeout, func() (bool, error) { - _, _, _, err := data.RunCommandOnNode(rule.nodeIP, "Remove-NetNatStaticMapping -NatName antrea-nat -StaticMappingID 1 -Confirm:$false") - if err != nil { - return false, fmt.Errorf("Error when deleting Netnat rule: %v", err) - } - return true, nil - }) + _, _, _, err := data.RunCommandOnNode(rule.nodeIP, "Remove-NetNatStaticMapping -NatName antrea-nat -StaticMappingID 1 -Confirm:$false") r.NoError(err, "Error when deleting Netnat rule") }