Skip to content

Commit

Permalink
Support protocol-independent NPL annotation
Browse files Browse the repository at this point in the history
* Add protocol string to new NPLAnnotation set the protocols map as a deprecated value.
* Support Node port for UDP and TCP using the different number for a single Pod on Windows.

issue antrea-io#3826

Signed-off-by: Shuyang Xin <gavinx@vmware.com>
  • Loading branch information
XinShuYang committed May 25, 2022
1 parent 5a26fc9 commit 1c47b50
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 218 deletions.
3 changes: 2 additions & 1 deletion pkg/agent/nodeportlocal/k8s/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ type NPLAnnotation struct {
PodPort int `json:"podPort"`
NodeIP string `json:"nodeIP"`
NodePort int `json:"nodePort"`
Protocols []string `json:"protocols"`
Protocol string `json:"protocol"`
Protocols []string `json:"protocols"` // deprecated, array with a single member which is equal to the Protocol field
}

func toJSON(serialize interface{}) string {
Expand Down
13 changes: 6 additions & 7 deletions pkg/agent/nodeportlocal/k8s/npl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {
}
}

nplAnnotationsRequiredMap := map[int]NPLAnnotation{}
nplAnnotationsRequiredMap := map[string]NPLAnnotation{}
nplAnnotationsRequired := []NPLAnnotation{}

hostPorts := make(map[string]int)
Expand Down Expand Up @@ -491,7 +491,7 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {
return fmt.Errorf("failed to parse port number and protocol from %s for Pod %s: %v", targetPortProto, key, err)
}
podPorts[targetPortProto] = struct{}{}
portData := c.portTable.GetEntry(podIP, port)
portData := c.portTable.GetEntry(podIP, port, protocol)
if portData != nil && !portData.ProtocolInUse(protocol) {
// If the PortTable has an entry for the Pod but does not have an
// entry with protocol, we enforce AddRule for the missing Protocol.
Expand All @@ -510,14 +510,12 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {
nodePort = portData.NodePort
}

if val, ok := nplAnnotationsRequiredMap[nodePort]; ok {
val.Protocols = append(val.Protocols, protocol)
nplAnnotationsRequiredMap[nodePort] = val
} else {
nplAnnotationsRequiredMap[nodePort] = NPLAnnotation{
if _, ok := nplAnnotationsRequiredMap[portcache.NodePortProtoFormat(nodePort, protocol)]; !ok {
nplAnnotationsRequiredMap[portcache.NodePortProtoFormat(nodePort, protocol)] = NPLAnnotation{
PodPort: port,
NodeIP: pod.Status.HostIP,
NodePort: nodePort,
Protocol: protocol,
Protocols: []string{protocol},
}
}
Expand Down Expand Up @@ -604,6 +602,7 @@ func (c *NPLController) waitForRulesInitialization() {
NodePort: npl.NodePort,
PodPort: npl.PodPort,
PodIP: pod.Status.PodIP,
Protocol: npl.Protocol,
Protocols: npl.Protocols,
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/nodeportlocal/npl_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const (

func newPortTable(mockIPTables rules.PodPortRules, mockPortOpener portcache.LocalPortOpener) *portcache.PortTable {
return &portcache.PortTable{
NodePortTable: make(map[int]*portcache.NodePortData),
NodePortTable: make(map[string]*portcache.NodePortData),
PodEndpointTable: make(map[string]*portcache.NodePortData),
StartPort: defaultStartPort,
EndPort: defaultEndPort,
Expand Down
38 changes: 8 additions & 30 deletions pkg/agent/nodeportlocal/portcache/port_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,10 @@ import (
"antrea.io/antrea/pkg/agent/nodeportlocal/rules"
)

var (
supportedProtocols = []string{"tcp", "udp"}
)

// protocolSocketState represents the state of the socket corresponding to a
// given (Node port, protocol) tuple.
type protocolSocketState int

const (
// stateOpen means that a listening socket has been opened for the
// protocol (as a means to reserve the port for this protocol), but no
// NPL rule has been installed for it.
stateOpen protocolSocketState = iota
// stateInUse means that a listening socket has been opened AND a NPL
// rule has been installed.
stateInUse
// stateClosed means that the socket has been closed.
stateClosed
)

type ProtocolSocketData struct {
Protocol string
State protocolSocketState
Expand Down Expand Up @@ -83,7 +67,7 @@ type LocalPortOpener interface {
type localPortOpener struct{}

type PortTable struct {
NodePortTable map[int]*NodePortData
NodePortTable map[string]*NodePortData
PodEndpointTable map[string]*NodePortData
StartPort int
EndPort int
Expand All @@ -95,7 +79,7 @@ type PortTable struct {

func NewPortTable(start, end int) (*PortTable, error) {
ptable := PortTable{
NodePortTable: make(map[int]*NodePortData),
NodePortTable: make(map[string]*NodePortData),
PodEndpointTable: make(map[string]*NodePortData),
StartPort: start,
EndPort: end,
Expand All @@ -112,21 +96,10 @@ func NewPortTable(start, end int) (*PortTable, error) {
func (pt *PortTable) CleanupAllEntries() {
pt.tableLock.Lock()
defer pt.tableLock.Unlock()
pt.NodePortTable = make(map[int]*NodePortData)
pt.NodePortTable = make(map[string]*NodePortData)
pt.PodEndpointTable = make(map[string]*NodePortData)
}

func (pt *PortTable) GetEntry(ip string, port int) *NodePortData {
pt.tableLock.RLock()
defer pt.tableLock.RUnlock()
// Return pointer to copy of data from the PodEndpointTable.
if data := pt.getEntryByPodIPPort(ip, port); data != nil {
dataCopy := *data
return &dataCopy
}
return nil
}

func (pt *PortTable) GetDataForPodIP(ip string) []NodePortData {
pt.tableLock.RLock()
defer pt.tableLock.RUnlock()
Expand Down Expand Up @@ -156,6 +129,11 @@ func (pt *PortTable) RuleExists(podIP string, podPort int, protocol string) bool
return false
}

// nodePortProtoFormat formats the nodeport, protocol to string port:protocol.
func NodePortProtoFormat(nodeport int, protocol string) string {
return fmt.Sprintf("%d:%s", nodeport, protocol)
}

// podIPPortFormat formats the ip, port to string ip:port.
func podIPPortFormat(ip string, port int) string {
return fmt.Sprintf("%s:%d", ip, port)
Expand Down
44 changes: 37 additions & 7 deletions pkg/agent/nodeportlocal/portcache/port_table_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,46 @@ package portcache

import (
"fmt"
"strconv"
"time"

"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/nodeportlocal/rules"
)

const (
// stateOpen means that a listening socket has been opened for the
// protocol (as a means to reserve the port for this protocol), but no
// NPL rule has been installed for it.
stateOpen protocolSocketState = iota
// stateInUse means that a listening socket has been opened AND a NPL
// rule has been installed.
stateInUse
// stateClosed means that the socket has been closed.
stateClosed
)

var (
supportedProtocols = []string{"tcp", "udp"}
)

func (pt *PortTable) GetEntry(ip string, port int, protocol string) *NodePortData {
var _ = protocol
pt.tableLock.RLock()
defer pt.tableLock.RUnlock()
// Return pointer to copy of data from the PodEndpointTable.
if data := pt.getEntryByPodIPPort(ip, port); data != nil {
dataCopy := *data
return &dataCopy
}
return nil
}

func openSocketsForPort(localPortOpener LocalPortOpener, port int) ([]ProtocolSocketData, error) {
// port needs to be available for all supported protocols: we want to use the same port
// Port needs to be available for all supported protocols: we want to use the same port
// number for all protocols and we don't know at this point which protocols are needed.
// This is to preserve the legacy behavior of allocating the same nodePort for all protocols.
protocols := make([]ProtocolSocketData, 0, len(supportedProtocols))
for _, protocol := range supportedProtocols {
socket, err := localPortOpener.OpenLocalPort(port, protocol)
Expand All @@ -54,7 +84,7 @@ func (pt *PortTable) getFreePort(podIP string, podPort int) (int, []ProtocolSock
// handle wrap around
port = port - numPorts
}
if _, ok := pt.NodePortTable[port]; ok {
if _, ok := pt.NodePortTable[strconv.Itoa(port)]; ok {
// port is already taken
continue
}
Expand Down Expand Up @@ -172,7 +202,7 @@ func (pt *PortTable) AddRule(podIP string, podPort int, protocol string) (int, e

protocolSocketData.State = stateInUse
if !exists {
pt.NodePortTable[nodePort] = npData
pt.NodePortTable[strconv.Itoa(nodePort)] = npData
pt.PodEndpointTable[podIPPortFormat(podIP, podPort)] = npData
}
return npData.NodePort, nil
Expand Down Expand Up @@ -210,7 +240,7 @@ func (pt *PortTable) DeleteRule(podIP string, podPort int, protocol string) erro
if err := data.CloseSockets(); err != nil {
return err
}
delete(pt.NodePortTable, data.NodePort)
delete(pt.NodePortTable, strconv.Itoa(data.NodePort))
delete(pt.PodEndpointTable, podIPPortFormat(podIP, podPort))
}
return nil
Expand All @@ -231,7 +261,7 @@ func (pt *PortTable) DeleteRulesForPod(podIP string) error {
}
podEntry.Protocols = podEntry.Protocols[1:]
}
delete(pt.NodePortTable, podEntry.NodePort)
delete(pt.NodePortTable, strconv.Itoa(podEntry.NodePort))
delete(pt.PodEndpointTable, podIPPortFormat(podIP, podEntry.PodPort))
}
return nil
Expand Down Expand Up @@ -293,8 +323,8 @@ func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<-
}
protocolSocketData.State = stateInUse
}
pt.NodePortTable[nplPort.NodePort] = npData
pt.PodEndpointTable[podIPPortFormat(nplPort.PodIP, nplPort.PodPort)] = pt.NodePortTable[nplPort.NodePort]
pt.NodePortTable[strconv.Itoa(nplPort.NodePort)] = npData
pt.PodEndpointTable[podIPPortFormat(nplPort.PodIP, nplPort.PodPort)] = pt.NodePortTable[strconv.Itoa(nplPort.NodePort)]
}
// retry mechanism as iptables-restore can fail if other components (in Antrea or other
// software) are accessing iptables.
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/nodeportlocal/portcache/port_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (

func newPortTable(mockIPTables rules.PodPortRules, mockPortOpener LocalPortOpener) *PortTable {
return &PortTable{
NodePortTable: make(map[int]*NodePortData),
NodePortTable: make(map[string]*NodePortData),
PodEndpointTable: make(map[string]*NodePortData),
StartPort: startPort,
EndPort: endPort,
Expand Down
Loading

0 comments on commit 1c47b50

Please sign in to comment.