From fbad4f495cc0f641a41b8d7ec4c608f3d1bba9d8 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Tue, 2 Apr 2024 22:04:17 +0000 Subject: [PATCH] allow to reach ports from host on mac This is a pretty simple solution with some limitations, it uses the existing port forwarding capabilities of docker to expose the loadbalancer ports on the host, and then adds the same IP of the container to the loopback address and start proxying from user space the service ports on the service address to the portmapped ports. Limitations: - Service port changes are not updated as docker does not dynamically update forwarded ports - Kind binary needs permissions to add IP address to interfaces and to listen on privileged ports --- go.mod | 2 +- pkg/constants/constants.go | 4 +- pkg/container/container.go | 52 ++++++++++++ pkg/controller/controller.go | 2 +- pkg/loadbalancer/server.go | 55 ++++++++++--- pkg/loadbalancer/tunnel.go | 155 +++++++++++++++++++++++++++++++++++ 6 files changed, 255 insertions(+), 15 deletions(-) create mode 100644 pkg/loadbalancer/tunnel.go diff --git a/go.mod b/go.mod index 61ed4302..1b64ef08 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module sigs.k8s.io/cloud-provider-kind go 1.21 require ( + github.com/google/go-cmp v0.6.0 github.com/pkg/errors v0.9.1 k8s.io/api v0.29.3 k8s.io/apimachinery v0.29.3 @@ -33,7 +34,6 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect - github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/safetext v0.0.0-20240104143208-7a7d9b3d812f // indirect github.com/google/uuid v1.6.0 // indirect diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 682307e8..803ca4bf 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -6,8 +6,8 @@ const ( ContainerPrefix = "kindccm" // KIND constants FixedNetworkName = "kind" - // NodeCCMLabelKey - NodeCCMLabelKey = "io.x-k8s.cloud-provider-kind.cluster" + // ClusterLabelKey + ClusterLabelKey = "io.x-k8s.cloud-provider-kind.cluster" // NodeNameLabelKey NodeNameLabelKey = "io.x-k8s.cloud-provider-kind.node.name" ) diff --git a/pkg/container/container.go b/pkg/container/container.go index 9789b7af..a77f133f 100644 --- a/pkg/container/container.go +++ b/pkg/container/container.go @@ -1,11 +1,13 @@ package container import ( + "encoding/json" "fmt" "io" "os/exec" "strings" + "k8s.io/klog/v2" kindexec "sigs.k8s.io/kind/pkg/exec" ) @@ -114,6 +116,56 @@ func IPs(name string) (ipv4 string, ipv6 string, err error) { return ips[0], ips[1], nil } +// return a list with the map of the internal port to the external port +func PortMaps(name string) (map[string]string, error) { + // retrieve the IP address of the node using docker inspect + cmd := kindexec.Command(containerRuntime, "inspect", + "-f", "{{ json .NetworkSettings.Ports }}", + name, // ... against the "node" container + ) + + lines, err := kindexec.OutputLines(cmd) + if err != nil { + return nil, fmt.Errorf("failed to get container details: %w", err) + } + if len(lines) != 1 { + return nil, fmt.Errorf("file should only be one line, got %d lines: %w", len(lines), err) + } + + type portMapping struct { + HostPort string `json:"HostPort"` + HostIP string `json:"HostIp"` + } + + portMappings := make(map[string][]portMapping) + err = json.Unmarshal([]byte(lines[0]), &portMappings) + if err != nil { + return nil, err + } + + result := map[string]string{} + for k, v := range portMappings { + protocol := "tcp" + parts := strings.Split(k, "/") + if len(parts) == 2 { + protocol = strings.ToLower(parts[1]) + } + if protocol != "tcp" { + klog.Infof("skipping protocol %s not supported, only TCP", protocol) + continue + } + + // TODO we just can get the first entry or look for ip families + for _, pm := range v { + if pm.HostPort != "" { + result[parts[0]] = pm.HostPort + break + } + } + } + return result, nil +} + func ListByLabel(label string) ([]string, error) { cmd := kindexec.Command(containerRuntime, "ps", diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 89d38d85..7d3b716f 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -251,7 +251,7 @@ func startCloudControllerManager(ctx context.Context, clusterName string, kubeCl } func cleanup() { - containers, err := container.ListByLabel(constants.NodeCCMLabelKey) + containers, err := container.ListByLabel(constants.NodeNameLabelKey) if err != nil { klog.Errorf("can't list containers: %v", err) return diff --git a/pkg/loadbalancer/server.go b/pkg/loadbalancer/server.go index 154d2bab..13bb9980 100644 --- a/pkg/loadbalancer/server.go +++ b/pkg/loadbalancer/server.go @@ -6,6 +6,7 @@ import ( "encoding/base32" "fmt" "os" + "runtime" "strings" v1 "k8s.io/api/core/v1" @@ -16,12 +17,17 @@ import ( ) type Server struct { + tunnelManager *tunnelManager } var _ cloudprovider.LoadBalancer = &Server{} func NewServer() cloudprovider.LoadBalancer { - return &Server{} + s := &Server{} + if runtime.GOOS == "darwin" { + s.tunnelManager = NewTunnelManager() + } + return s } func (s *Server) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) { @@ -82,7 +88,7 @@ func (s *Server) EnsureLoadBalancer(ctx context.Context, clusterName string, ser } if !container.Exist(name) { klog.V(2).Infof("creating container for loadbalancer") - err := createLoadBalancer(clusterName, service, proxyImage) + err := s.createLoadBalancer(clusterName, service, proxyImage) if err != nil { return nil, err } @@ -108,28 +114,43 @@ func (s *Server) EnsureLoadBalancer(ctx context.Context, clusterName string, ser } func (s *Server) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error { - return proxyUpdateLoadBalancer(ctx, clusterName, service, nodes) + err := proxyUpdateLoadBalancer(ctx, clusterName, service, nodes) + if err != nil { + return err + } + if s.tunnelManager != nil { + return s.tunnelManager.setupTunnels(loadBalancerName(clusterName, service)) + } + return nil } func (s *Server) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error { - return container.Delete(loadBalancerName(clusterName, service)) + containerName := loadBalancerName(clusterName, service) + err := container.Delete(containerName) + if err != nil { + return err + } + if s.tunnelManager != nil { + return s.tunnelManager.removeTunnels(containerName) + } + return nil } // loadbalancer name is a unique name for the loadbalancer container func loadBalancerName(clusterName string, service *v1.Service) string { - hash := sha256.Sum256([]byte(loadBalancerSimpleName(clusterName, service))) + hash := sha256.Sum256([]byte(LoadBalancerSimpleName(clusterName, service.Namespace, service.Name))) encoded := base32.StdEncoding.EncodeToString(hash[:]) name := constants.ContainerPrefix + "-" + encoded[:40] return name } -func loadBalancerSimpleName(clusterName string, service *v1.Service) string { - return clusterName + "-" + service.Namespace + "-" + service.Name +func LoadBalancerSimpleName(clusterName string, namespace, name string) string { + return clusterName + "-" + namespace + "-" + name } // createLoadBalancer create a docker container with a loadbalancer -func createLoadBalancer(clusterName string, service *v1.Service, image string) error { +func (s *Server) createLoadBalancer(clusterName string, service *v1.Service, image string) error { name := loadBalancerName(clusterName, service) networkName := constants.FixedNetworkName @@ -141,9 +162,9 @@ func createLoadBalancer(clusterName string, service *v1.Service, image string) e "--detach", // run the container detached "--tty", // allocate a tty for entrypoint logs // label the node with the cluster ID - "--label", fmt.Sprintf("%s=%s", constants.NodeCCMLabelKey, clusterName), + "--label", fmt.Sprintf("%s=%s", constants.ClusterLabelKey, clusterName), // label the node with the load balancer name - "--label", fmt.Sprintf("%s=%s", constants.NodeNameLabelKey, loadBalancerSimpleName(clusterName, service)), + "--label", fmt.Sprintf("%s=%s", constants.NodeNameLabelKey, LoadBalancerSimpleName(clusterName, service.Namespace, service.Name)), // user a user defined docker network so we get embedded DNS "--net", networkName, "--init=false", @@ -160,9 +181,21 @@ func createLoadBalancer(clusterName string, service *v1.Service, image string) e "--sysctl=net.ipv6.conf.all.disable_ipv6=0", // enable IPv6 "--sysctl=net.ipv6.conf.all.forwarding=1", // allow ipv6 forwarding "--sysctl=net.ipv4.conf.all.rp_filter=0", // disable rp filter - image, } + if s.tunnelManager != nil { + // Forward the Service Ports to the host so they are accessible on Mac and Windows + for _, port := range service.Spec.Ports { + if port.Protocol != v1.ProtocolTCP { + continue + } + args = append(args, fmt.Sprintf("--publish=%d/%s", port.Port, "TCP")) + } + // Publish all ports in the host in random ports + args = append(args, fmt.Sprintf("--publish-all")) + } + + args = append(args, image) err := container.Create(name, args) if err != nil { return fmt.Errorf("failed to create continers %s %v: %w", name, args, err) diff --git a/pkg/loadbalancer/tunnel.go b/pkg/loadbalancer/tunnel.go new file mode 100644 index 00000000..e66553ba --- /dev/null +++ b/pkg/loadbalancer/tunnel.go @@ -0,0 +1,155 @@ +package loadbalancer + +import ( + "fmt" + "io" + "net" + "os/exec" + "sync" + + "k8s.io/klog/v2" + + "sigs.k8s.io/cloud-provider-kind/pkg/container" +) + +const ( + ifaceName = "lo0" +) + +type tunnelManager struct { + mu sync.Mutex + tunnels map[string]map[string]*tunnel // first key is the service namespace/name second key is the servicePort +} + +func NewTunnelManager() *tunnelManager { + t := &tunnelManager{ + tunnels: map[string]map[string]*tunnel{}, + } + return t +} + +func (t *tunnelManager) setupTunnels(containerName string) error { + // get the portmapping from the container and its internal IPs and forward them + // 1. Create the fake IP on the tunnel interface + // 2. Capture the traffic directed to that IP port and forward to the exposed port in the host + portmaps, err := container.PortMaps(containerName) + if err != nil { + return err + } + klog.V(0).Infof("found port maps %v associated to container %s", portmaps, containerName) + + ipv4, _, err := container.IPs(containerName) + if err != nil { + return err + } + + // TODO: IPv6 + klog.V(0).Infof("setting IPv4 address %s associated to container %s", ipv4, containerName) + if err := exec.Command("ifconfig", ifaceName, "alias", ipv4, "netmask", "255.255.255.255", "-arp", "up").Run(); err != nil { + return err + } + + // create tunnel from the ip:svcport to the localhost:portmap + t.mu.Lock() + defer t.mu.Unlock() + // There is one IP per Service and a tunnel per Service Port + for containerPort, hostPort := range portmaps { + tun := NewTunnel(ipv4, containerPort, "localhost", hostPort) + // TODO check if we can leak tunnels + err = tun.Start() + if err != nil { + return err + } + t.tunnels[containerName][containerPort] = tun + } + return nil +} + +func (t *tunnelManager) removeTunnels(containerName string) error { + t.mu.Lock() + defer t.mu.Unlock() + tunnels, ok := t.tunnels[containerName] + if !ok { + return nil + } + + // all tunnels in the same container share the same local IP on the host + var tunnelIP string + for _, tunnel := range tunnels { + if tunnelIP == "" { + tunnelIP = tunnel.localIP + } + tunnel.Stop() + } + + // delete the IP address + if err := exec.Command("ifconfig", ifaceName, "-alias", tunnelIP, "netmask", "255.255.255.255").Run(); err != nil { + return err + } + return nil +} + +// tunnel listens on localIP:localPort and proxies the connection to remoteIP:remotePort +type tunnel struct { + listener net.Listener + localIP string + localPort string + remoteIP string // address:Port + remotePort string +} + +func NewTunnel(localIP, localPort, remoteIP, remotePort string) *tunnel { + return &tunnel{ + localIP: localIP, + localPort: localPort, + remoteIP: remoteIP, + remotePort: remotePort, + } +} + +func (t *tunnel) Start() error { + klog.Infof("Starting tunnel on %s", net.JoinHostPort(t.localIP, t.localPort)) + ln, err := net.Listen("tcp", net.JoinHostPort(t.localIP, t.localPort)) + if err != nil { + return err + } + + for { + conn, err := ln.Accept() + if err != nil { + klog.Infof("unexpected error listening: %v", err) + } else { + go func() { + err := t.handleConnection(conn) + if err != nil { + klog.Infof("unexpected error on connection: %v", err) + } + }() + } + } +} + +func (t *tunnel) Stop() error { + return t.listener.Close() +} + +func (t *tunnel) handleConnection(local net.Conn) error { + remote, err := net.Dial("tcp", net.JoinHostPort(t.remoteIP, t.remotePort)) + if err != nil { + return fmt.Errorf("can't connect to server %q: %v", net.JoinHostPort(t.remoteIP, t.remotePort), err) + } + defer remote.Close() + + wg := &sync.WaitGroup{} + wg.Add(2) + go func() { + defer wg.Done() + io.Copy(local, remote) + }() + go func() { + defer wg.Done() + io.Copy(remote, local) + }() + wg.Wait() + return nil +}