Skip to content

Commit

Permalink
Merge pull request kubernetes#98838 from aojea/automated-cherry-pick-…
Browse files Browse the repository at this point in the history
…of-#98755-upstream-release-1.20

Automated cherry pick of kubernetes#98755 upstream release 1.20
  • Loading branch information
k8s-ci-robot authored Feb 10, 2021
2 parents 217a1c4 + 6a31f8d commit a3d4ea9
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 61 deletions.
15 changes: 9 additions & 6 deletions pkg/kubelet/dockershim/network/hostport/fake_iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,14 @@ func (f *fakeIPTables) ensureRule(position utiliptables.RulePosition, tableName
return true, nil
}

if position == utiliptables.Prepend {
switch position {
case utiliptables.Prepend:
chain.rules = append([]string{rule}, chain.rules...)
} else if position == utiliptables.Append {
case utiliptables.Append:
chain.rules = append(chain.rules, rule)
} else {
default:
return false, fmt.Errorf("unknown position argument %q", position)
}

return false, nil
}

Expand Down Expand Up @@ -185,7 +185,7 @@ func normalizeRule(rule string) (string, error) {

// Normalize un-prefixed IP addresses like iptables does
if net.ParseIP(arg) != nil {
arg = arg + "/32"
arg += "/32"
}

if len(normalized) > 0 {
Expand Down Expand Up @@ -281,7 +281,10 @@ func (f *fakeIPTables) restore(restoreTableName utiliptables.Table, data []byte,
if strings.HasPrefix(line, ":") {
chainName := utiliptables.Chain(strings.Split(line[1:], " ")[0])
if flush == utiliptables.FlushTables {
table, chain, _ := f.getChain(tableName, chainName)
table, chain, err := f.getChain(tableName, chainName)
if err != nil {
return err
}
if chain != nil {
delete(table.chains, string(chainName))
}
Expand Down
32 changes: 26 additions & 6 deletions pkg/kubelet/dockershim/network/hostport/hostport.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package hostport
import (
"fmt"
"net"
"strconv"
"strings"

"k8s.io/klog/v2"
Expand Down Expand Up @@ -53,7 +54,18 @@ type PodPortMapping struct {
IP net.IP
}

// ipFamily refers to a specific family if not empty, i.e. "4" or "6".
type ipFamily string

// Constants for valid IPFamily:
const (
IPv4 ipFamily = "4"
IPv6 ipFamily = "6"
)

type hostport struct {
ipFamily ipFamily
ip string
port int32
protocol string
}
Expand All @@ -78,19 +90,23 @@ func openLocalPort(hp *hostport) (closeable, error) {
// bind()ed but not listen()ed, and at least the default debian netcat
// has no way to avoid about 10 seconds of retries.
var socket closeable
// open the socket on the HostIP and HostPort specified
address := net.JoinHostPort(hp.ip, strconv.Itoa(int(hp.port)))
switch hp.protocol {
case "tcp":
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", hp.port))
network := "tcp" + string(hp.ipFamily)
listener, err := net.Listen(network, address)
if err != nil {
return nil, err
}
socket = listener
case "udp":
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", hp.port))
network := "udp" + string(hp.ipFamily)
addr, err := net.ResolveUDPAddr(network, address)
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
conn, err := net.ListenUDP(network, addr)
if err != nil {
return nil, err
}
Expand All @@ -103,8 +119,10 @@ func openLocalPort(hp *hostport) (closeable, error) {
}

// portMappingToHostport creates hostport structure based on input portmapping
func portMappingToHostport(portMapping *PortMapping) hostport {
func portMappingToHostport(portMapping *PortMapping, family ipFamily) hostport {
return hostport{
ipFamily: family,
ip: portMapping.HostIP,
port: portMapping.HostPort,
protocol: strings.ToLower(string(portMapping.Protocol)),
}
Expand All @@ -124,9 +142,11 @@ func ensureKubeHostportChains(iptables utiliptables.Interface, natInterfaceName
{utiliptables.TableNAT, utiliptables.ChainOutput},
{utiliptables.TableNAT, utiliptables.ChainPrerouting},
}
args := []string{"-m", "comment", "--comment", "kube hostport portals",
args := []string{
"-m", "comment", "--comment", "kube hostport portals",
"-m", "addrtype", "--dst-type", "LOCAL",
"-j", string(kubeHostportsChain)}
"-j", string(kubeHostportsChain),
}
for _, tc := range tableChainsNeedJumpServices {
// KUBE-HOSTPORTS chain needs to be appended to the system chains.
// This ensures KUBE-SERVICES chain gets processed first.
Expand Down
71 changes: 52 additions & 19 deletions pkg/kubelet/dockershim/network/hostport/hostport_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type hostportManager struct {
mu sync.Mutex
}

// NewHostportManager creates a new HostPortManager
func NewHostportManager(iptables utiliptables.Interface) HostPortManager {
h := &hostportManager{
hostPortMap: make(map[hostport]closeable),
Expand All @@ -78,25 +79,24 @@ func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInt
return nil
}
podFullName := getPodFullName(podPortMapping)

// skip if there is no hostport needed
hostportMappings := gatherHostportMappings(podPortMapping)
if len(hostportMappings) == 0 {
return nil
}

// IP.To16() returns nil if IP is not a valid IPv4 or IPv6 address
if podPortMapping.IP.To16() == nil {
return fmt.Errorf("invalid or missing IP of pod %s", podFullName)
}
podIP := podPortMapping.IP.String()
isIPv6 := utilnet.IsIPv6(podPortMapping.IP)

// skip if there is no hostport needed
hostportMappings := gatherHostportMappings(podPortMapping, isIPv6)
if len(hostportMappings) == 0 {
return nil
}

if isIPv6 != hm.iptables.IsIPv6() {
return fmt.Errorf("HostPortManager IP family mismatch: %v, isIPv6 - %v", podIP, isIPv6)
}

if err = ensureKubeHostportChains(hm.iptables, natInterfaceName); err != nil {
if err := ensureKubeHostportChains(hm.iptables, natInterfaceName); err != nil {
return err
}

Expand Down Expand Up @@ -152,10 +152,17 @@ func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInt

// DNAT to the podIP:containerPort
hostPortBinding := net.JoinHostPort(podIP, strconv.Itoa(int(pm.ContainerPort)))
writeLine(natRules, "-A", string(chain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, podFullName, pm.HostPort),
"-m", protocol, "-p", protocol,
"-j", "DNAT", fmt.Sprintf("--to-destination=%s", hostPortBinding))
if pm.HostIP == "" || pm.HostIP == "0.0.0.0" || pm.HostIP == "::" {
writeLine(natRules, "-A", string(chain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, podFullName, pm.HostPort),
"-m", protocol, "-p", protocol,
"-j", "DNAT", fmt.Sprintf("--to-destination=%s", hostPortBinding))
} else {
writeLine(natRules, "-A", string(chain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, podFullName, pm.HostPort),
"-m", protocol, "-p", protocol, "-d", pm.HostIP,
"-j", "DNAT", fmt.Sprintf("--to-destination=%s", hostPortBinding))
}
}

// getHostportChain should be able to provide unique hostport chain name using hash
Expand Down Expand Up @@ -198,8 +205,8 @@ func (hm *hostportManager) Remove(id string, podPortMapping *PodPortMapping) (er
return nil
}

hostportMappings := gatherHostportMappings(podPortMapping)
if len(hostportMappings) <= 0 {
hostportMappings := gatherHostportMappings(podPortMapping, hm.iptables.IsIPv6())
if len(hostportMappings) == 0 {
return nil
}

Expand Down Expand Up @@ -231,6 +238,12 @@ func (hm *hostportManager) Remove(id string, podPortMapping *PodPortMapping) (er
}
}

// exit if there is nothing to remove
// don´t forget to clean up opened pod host ports
if len(existingChainsToRemove) == 0 {
return hm.closeHostports(hostportMappings)
}

natChains := bytes.NewBuffer(nil)
natRules := bytes.NewBuffer(nil)
writeLine(natChains, "*nat")
Expand All @@ -245,7 +258,7 @@ func (hm *hostportManager) Remove(id string, podPortMapping *PodPortMapping) (er
}
writeLine(natRules, "COMMIT")

if err = hm.syncIPTables(append(natChains.Bytes(), natRules.Bytes()...)); err != nil {
if err := hm.syncIPTables(append(natChains.Bytes(), natRules.Bytes()...)); err != nil {
return err
}

Expand Down Expand Up @@ -279,7 +292,12 @@ func (hm *hostportManager) openHostports(podPortMapping *PodPortMapping) (map[ho
continue
}

hp := portMappingToHostport(pm)
// HostIP IP family is not handled by this port opener
if pm.HostIP != "" && utilnet.IsIPv6String(pm.HostIP) != hm.iptables.IsIPv6() {
continue
}

hp := portMappingToHostport(pm, hm.getIPFamily())
socket, err := hm.portOpener(&hp)
if err != nil {
retErr = fmt.Errorf("cannot open hostport %d for pod %s: %v", pm.HostPort, getPodFullName(podPortMapping), err)
Expand All @@ -304,38 +322,53 @@ func (hm *hostportManager) openHostports(podPortMapping *PodPortMapping) (map[ho
func (hm *hostportManager) closeHostports(hostportMappings []*PortMapping) error {
errList := []error{}
for _, pm := range hostportMappings {
hp := portMappingToHostport(pm)
hp := portMappingToHostport(pm, hm.getIPFamily())
if socket, ok := hm.hostPortMap[hp]; ok {
klog.V(2).Infof("Closing host port %s", hp.String())
if err := socket.Close(); err != nil {
errList = append(errList, fmt.Errorf("failed to close host port %s: %v", hp.String(), err))
continue
}
delete(hm.hostPortMap, hp)
} else {
klog.V(5).Infof("host port %s does not have an open socket", hp.String())
}
}
return utilerrors.NewAggregate(errList)
}

// getIPFamily returns the hostPortManager IP family
func (hm *hostportManager) getIPFamily() ipFamily {
family := IPv4
if hm.iptables.IsIPv6() {
family = IPv6
}
return family
}

// getHostportChain takes id, hostport and protocol for a pod and returns associated iptables chain.
// This is computed by hashing (sha256) then encoding to base32 and truncating with the prefix
// "KUBE-HP-". We do this because IPTables Chain Names must be <= 28 chars long, and the longer
// they are the harder they are to read.
// WARNING: Please do not change this function. Otherwise, HostportManager may not be able to
// identify existing iptables chains.
func getHostportChain(id string, pm *PortMapping) utiliptables.Chain {
hash := sha256.Sum256([]byte(id + strconv.Itoa(int(pm.HostPort)) + string(pm.Protocol)))
hash := sha256.Sum256([]byte(id + strconv.Itoa(int(pm.HostPort)) + string(pm.Protocol) + pm.HostIP))
encoded := base32.StdEncoding.EncodeToString(hash[:])
return utiliptables.Chain(kubeHostportChainPrefix + encoded[:16])
}

// gatherHostportMappings returns all the PortMappings which has hostport for a pod
func gatherHostportMappings(podPortMapping *PodPortMapping) []*PortMapping {
// it filters the PortMappings that use HostIP and doesn't match the IP family specified
func gatherHostportMappings(podPortMapping *PodPortMapping, isIPv6 bool) []*PortMapping {
mappings := []*PortMapping{}
for _, pm := range podPortMapping.PortMappings {
if pm.HostPort <= 0 {
continue
}
if pm.HostIP != "" && utilnet.IsIPv6String(pm.HostIP) != isIPv6 {
continue
}
mappings = append(mappings, pm)
}
return mappings
Expand Down
Loading

0 comments on commit a3d4ea9

Please sign in to comment.