Skip to content

Commit

Permalink
allow to reach ports from host on mac
Browse files Browse the repository at this point in the history
This is a pretty simple solution with some limitations, it uses
the existing port forwarding capabilities of docker to expose
the loadbalancer ports on the host, and then adds the same IP
of the container to the loopback address and start proxying from
user space the service ports on the service address to the portmapped
ports.

Limitations:
- Service port changes are not updated as docker does not dynamically
update forwarded ports
- Kind binary needs permissions to add IP address to interfaces and to
listen on privileged ports
  • Loading branch information
aojea committed Apr 5, 2024
1 parent 95142c4 commit fbad4f4
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 15 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module sigs.k8s.io/cloud-provider-kind
go 1.21

require (
github.com/google/go-cmp v0.6.0
github.com/pkg/errors v0.9.1
k8s.io/api v0.29.3
k8s.io/apimachinery v0.29.3
Expand Down Expand Up @@ -33,7 +34,6 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/safetext v0.0.0-20240104143208-7a7d9b3d812f // indirect
github.com/google/uuid v1.6.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ const (
ContainerPrefix = "kindccm"
// KIND constants
FixedNetworkName = "kind"
// NodeCCMLabelKey
NodeCCMLabelKey = "io.x-k8s.cloud-provider-kind.cluster"
// ClusterLabelKey
ClusterLabelKey = "io.x-k8s.cloud-provider-kind.cluster"
// NodeNameLabelKey
NodeNameLabelKey = "io.x-k8s.cloud-provider-kind.node.name"
)
52 changes: 52 additions & 0 deletions pkg/container/container.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package container

import (
"encoding/json"
"fmt"
"io"
"os/exec"
"strings"

"k8s.io/klog/v2"
kindexec "sigs.k8s.io/kind/pkg/exec"
)

Expand Down Expand Up @@ -114,6 +116,56 @@ func IPs(name string) (ipv4 string, ipv6 string, err error) {
return ips[0], ips[1], nil
}

// return a list with the map of the internal port to the external port
func PortMaps(name string) (map[string]string, error) {
// retrieve the IP address of the node using docker inspect
cmd := kindexec.Command(containerRuntime, "inspect",
"-f", "{{ json .NetworkSettings.Ports }}",
name, // ... against the "node" container
)

lines, err := kindexec.OutputLines(cmd)
if err != nil {
return nil, fmt.Errorf("failed to get container details: %w", err)
}
if len(lines) != 1 {
return nil, fmt.Errorf("file should only be one line, got %d lines: %w", len(lines), err)
}

type portMapping struct {
HostPort string `json:"HostPort"`
HostIP string `json:"HostIp"`
}

portMappings := make(map[string][]portMapping)
err = json.Unmarshal([]byte(lines[0]), &portMappings)
if err != nil {
return nil, err
}

result := map[string]string{}
for k, v := range portMappings {
protocol := "tcp"
parts := strings.Split(k, "/")
if len(parts) == 2 {
protocol = strings.ToLower(parts[1])
}
if protocol != "tcp" {
klog.Infof("skipping protocol %s not supported, only TCP", protocol)
continue
}

// TODO we just can get the first entry or look for ip families
for _, pm := range v {
if pm.HostPort != "" {
result[parts[0]] = pm.HostPort
break
}
}
}
return result, nil
}

func ListByLabel(label string) ([]string, error) {
cmd := kindexec.Command(containerRuntime,
"ps",
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func startCloudControllerManager(ctx context.Context, clusterName string, kubeCl
}

func cleanup() {
containers, err := container.ListByLabel(constants.NodeCCMLabelKey)
containers, err := container.ListByLabel(constants.NodeNameLabelKey)
if err != nil {
klog.Errorf("can't list containers: %v", err)
return
Expand Down
55 changes: 44 additions & 11 deletions pkg/loadbalancer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/base32"
"fmt"
"os"
"runtime"
"strings"

v1 "k8s.io/api/core/v1"
Expand All @@ -16,12 +17,17 @@ import (
)

type Server struct {
tunnelManager *tunnelManager
}

var _ cloudprovider.LoadBalancer = &Server{}

func NewServer() cloudprovider.LoadBalancer {
return &Server{}
s := &Server{}
if runtime.GOOS == "darwin" {
s.tunnelManager = NewTunnelManager()
}
return s
}

func (s *Server) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
Expand Down Expand Up @@ -82,7 +88,7 @@ func (s *Server) EnsureLoadBalancer(ctx context.Context, clusterName string, ser
}
if !container.Exist(name) {
klog.V(2).Infof("creating container for loadbalancer")
err := createLoadBalancer(clusterName, service, proxyImage)
err := s.createLoadBalancer(clusterName, service, proxyImage)
if err != nil {
return nil, err
}
Expand All @@ -108,28 +114,43 @@ func (s *Server) EnsureLoadBalancer(ctx context.Context, clusterName string, ser
}

func (s *Server) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
return proxyUpdateLoadBalancer(ctx, clusterName, service, nodes)
err := proxyUpdateLoadBalancer(ctx, clusterName, service, nodes)
if err != nil {
return err
}
if s.tunnelManager != nil {
return s.tunnelManager.setupTunnels(loadBalancerName(clusterName, service))
}
return nil
}

func (s *Server) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error {
return container.Delete(loadBalancerName(clusterName, service))
containerName := loadBalancerName(clusterName, service)
err := container.Delete(containerName)
if err != nil {
return err
}
if s.tunnelManager != nil {
return s.tunnelManager.removeTunnels(containerName)
}
return nil
}

// loadbalancer name is a unique name for the loadbalancer container
func loadBalancerName(clusterName string, service *v1.Service) string {
hash := sha256.Sum256([]byte(loadBalancerSimpleName(clusterName, service)))
hash := sha256.Sum256([]byte(LoadBalancerSimpleName(clusterName, service.Namespace, service.Name)))
encoded := base32.StdEncoding.EncodeToString(hash[:])
name := constants.ContainerPrefix + "-" + encoded[:40]

return name
}

func loadBalancerSimpleName(clusterName string, service *v1.Service) string {
return clusterName + "-" + service.Namespace + "-" + service.Name
func LoadBalancerSimpleName(clusterName string, namespace, name string) string {
return clusterName + "-" + namespace + "-" + name
}

// createLoadBalancer create a docker container with a loadbalancer
func createLoadBalancer(clusterName string, service *v1.Service, image string) error {
func (s *Server) createLoadBalancer(clusterName string, service *v1.Service, image string) error {
name := loadBalancerName(clusterName, service)

networkName := constants.FixedNetworkName
Expand All @@ -141,9 +162,9 @@ func createLoadBalancer(clusterName string, service *v1.Service, image string) e
"--detach", // run the container detached
"--tty", // allocate a tty for entrypoint logs
// label the node with the cluster ID
"--label", fmt.Sprintf("%s=%s", constants.NodeCCMLabelKey, clusterName),
"--label", fmt.Sprintf("%s=%s", constants.ClusterLabelKey, clusterName),
// label the node with the load balancer name
"--label", fmt.Sprintf("%s=%s", constants.NodeNameLabelKey, loadBalancerSimpleName(clusterName, service)),
"--label", fmt.Sprintf("%s=%s", constants.NodeNameLabelKey, LoadBalancerSimpleName(clusterName, service.Namespace, service.Name)),
// user a user defined docker network so we get embedded DNS
"--net", networkName,
"--init=false",
Expand All @@ -160,9 +181,21 @@ func createLoadBalancer(clusterName string, service *v1.Service, image string) e
"--sysctl=net.ipv6.conf.all.disable_ipv6=0", // enable IPv6
"--sysctl=net.ipv6.conf.all.forwarding=1", // allow ipv6 forwarding
"--sysctl=net.ipv4.conf.all.rp_filter=0", // disable rp filter
image,
}

if s.tunnelManager != nil {
// Forward the Service Ports to the host so they are accessible on Mac and Windows
for _, port := range service.Spec.Ports {
if port.Protocol != v1.ProtocolTCP {
continue
}
args = append(args, fmt.Sprintf("--publish=%d/%s", port.Port, "TCP"))
}
// Publish all ports in the host in random ports
args = append(args, fmt.Sprintf("--publish-all"))

Check failure on line 195 in pkg/loadbalancer/server.go

View workflow job for this annotation

GitHub Actions / test (1.22.x)

S1039: unnecessary use of fmt.Sprintf (gosimple)
}

args = append(args, image)
err := container.Create(name, args)
if err != nil {
return fmt.Errorf("failed to create continers %s %v: %w", name, args, err)
Expand Down
155 changes: 155 additions & 0 deletions pkg/loadbalancer/tunnel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package loadbalancer

import (
"fmt"
"io"
"net"
"os/exec"
"sync"

"k8s.io/klog/v2"

"sigs.k8s.io/cloud-provider-kind/pkg/container"
)

const (
ifaceName = "lo0"
)

type tunnelManager struct {
mu sync.Mutex
tunnels map[string]map[string]*tunnel // first key is the service namespace/name second key is the servicePort
}

func NewTunnelManager() *tunnelManager {
t := &tunnelManager{
tunnels: map[string]map[string]*tunnel{},
}
return t
}

func (t *tunnelManager) setupTunnels(containerName string) error {
// get the portmapping from the container and its internal IPs and forward them
// 1. Create the fake IP on the tunnel interface
// 2. Capture the traffic directed to that IP port and forward to the exposed port in the host
portmaps, err := container.PortMaps(containerName)
if err != nil {
return err
}
klog.V(0).Infof("found port maps %v associated to container %s", portmaps, containerName)

ipv4, _, err := container.IPs(containerName)
if err != nil {
return err
}

// TODO: IPv6
klog.V(0).Infof("setting IPv4 address %s associated to container %s", ipv4, containerName)
if err := exec.Command("ifconfig", ifaceName, "alias", ipv4, "netmask", "255.255.255.255", "-arp", "up").Run(); err != nil {
return err
}

// create tunnel from the ip:svcport to the localhost:portmap
t.mu.Lock()
defer t.mu.Unlock()
// There is one IP per Service and a tunnel per Service Port
for containerPort, hostPort := range portmaps {
tun := NewTunnel(ipv4, containerPort, "localhost", hostPort)
// TODO check if we can leak tunnels
err = tun.Start()
if err != nil {
return err
}
t.tunnels[containerName][containerPort] = tun
}
return nil
}

func (t *tunnelManager) removeTunnels(containerName string) error {
t.mu.Lock()
defer t.mu.Unlock()
tunnels, ok := t.tunnels[containerName]
if !ok {
return nil
}

// all tunnels in the same container share the same local IP on the host
var tunnelIP string
for _, tunnel := range tunnels {
if tunnelIP == "" {
tunnelIP = tunnel.localIP
}
tunnel.Stop()

Check failure on line 82 in pkg/loadbalancer/tunnel.go

View workflow job for this annotation

GitHub Actions / test (1.22.x)

Error return value of `tunnel.Stop` is not checked (errcheck)
}

// delete the IP address
if err := exec.Command("ifconfig", ifaceName, "-alias", tunnelIP, "netmask", "255.255.255.255").Run(); err != nil {
return err
}
return nil
}

// tunnel listens on localIP:localPort and proxies the connection to remoteIP:remotePort
type tunnel struct {
listener net.Listener
localIP string
localPort string
remoteIP string // address:Port
remotePort string
}

func NewTunnel(localIP, localPort, remoteIP, remotePort string) *tunnel {
return &tunnel{
localIP: localIP,
localPort: localPort,
remoteIP: remoteIP,
remotePort: remotePort,
}
}

func (t *tunnel) Start() error {
klog.Infof("Starting tunnel on %s", net.JoinHostPort(t.localIP, t.localPort))
ln, err := net.Listen("tcp", net.JoinHostPort(t.localIP, t.localPort))
if err != nil {
return err
}

for {
conn, err := ln.Accept()
if err != nil {
klog.Infof("unexpected error listening: %v", err)
} else {
go func() {
err := t.handleConnection(conn)
if err != nil {
klog.Infof("unexpected error on connection: %v", err)
}
}()
}
}
}

func (t *tunnel) Stop() error {
return t.listener.Close()
}

func (t *tunnel) handleConnection(local net.Conn) error {
remote, err := net.Dial("tcp", net.JoinHostPort(t.remoteIP, t.remotePort))
if err != nil {
return fmt.Errorf("can't connect to server %q: %v", net.JoinHostPort(t.remoteIP, t.remotePort), err)
}
defer remote.Close()

wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
io.Copy(local, remote)

Check failure on line 147 in pkg/loadbalancer/tunnel.go

View workflow job for this annotation

GitHub Actions / test (1.22.x)

Error return value of `io.Copy` is not checked (errcheck)
}()
go func() {
defer wg.Done()
io.Copy(remote, local)

Check failure on line 151 in pkg/loadbalancer/tunnel.go

View workflow job for this annotation

GitHub Actions / test (1.22.x)

Error return value of `io.Copy` is not checked (errcheck)
}()
wg.Wait()
return nil
}

0 comments on commit fbad4f4

Please sign in to comment.