Skip to content

Commit

Permalink
allow to run ccm inside kind network
Browse files Browse the repository at this point in the history
  • Loading branch information
aojea committed Mar 3, 2024
1 parent 7c0c084 commit 572d9df
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 25 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ the docker socket inside the container:

```sh
docker build . -t aojea/cloud-provider-kind:v0.1
# using the host network
docker run --rm --network host -v /var/run/docker.sock:/var/run/docker.sock aojea/cloud-provider-kind:v0.1
# or the kind network
docker run --rm --network kind -v /var/run/docker.sock:/var/run/docker.sock aojea/cloud-provider-kind:v0.1
```

## How to use it
Expand Down
119 changes: 94 additions & 25 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package controller

import (
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
"time"

Expand Down Expand Up @@ -46,20 +49,10 @@ func New(logger log.Logger) *Controller {
}

func (c *Controller) Run(ctx context.Context) {
defer cleanup()
for {
select {
case <-ctx.Done():
// cleanup
containers, err := container.ListByLabel(constants.NodeCCMLabelKey)
if err != nil {
klog.Errorf("can't list containers: %v", err)
return
}
for _, id := range containers {
if err := container.Delete(id); err != nil {
klog.Errorf("can't delete container %s: %v", id, err)
}
}
return
default:
}
Expand All @@ -71,27 +64,20 @@ func (c *Controller) Run(ctx context.Context) {

// add new ones
for _, cluster := range clusters {
select {
case <-ctx.Done():
return
default:
}

klog.V(3).Infof("processing cluster %s", cluster)
_, ok := c.clusters[cluster]
if ok {
klog.V(3).Infof("cluster %s already exist", cluster)
continue
}

// get kubeconfig
kconfig, err := c.kind.KubeConfig(cluster, false)
if err != nil {
klog.Errorf("Failed to get kubeconfig for cluster %s: %v", cluster, err)
continue
}

config, err := clientcmd.RESTConfigFromKubeConfig([]byte(kconfig))
if err != nil {
klog.Errorf("Failed to convert kubeconfig for cluster %s: %v", cluster, err)
continue
}

kubeClient, err := kubernetes.NewForConfig(config)
kubeClient, err := c.getKubeClient(ctx, cluster)
if err != nil {
klog.Errorf("Failed to create kubeClient for cluster %s: %v", cluster, err)
continue
Expand Down Expand Up @@ -121,6 +107,76 @@ func (c *Controller) Run(ctx context.Context) {
}
}

// getKubeClient returns the corresponding kubeclient
// this is needed because the controller can run inside the docker network,
// hence it will need the internal kubeconfig, or can run outside on the host,
// hence it will need the external kubeconfig
func (c *Controller) getKubeClient(ctx context.Context, cluster string) (kubernetes.Interface, error) {
httpClient := &http.Client{
Timeout: 5 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
// try internal first
for _, internal := range []bool{true, false} {
kconfig, err := c.kind.KubeConfig(cluster, internal)
if err != nil {
klog.Errorf("Failed to get kubeconfig for cluster %s: %v", cluster, err)
continue
}

config, err := clientcmd.RESTConfigFromKubeConfig([]byte(kconfig))
if err != nil {
klog.Errorf("Failed to convert kubeconfig for cluster %s: %v", cluster, err)
continue
}

// check that the apiserver is reachable before continue
// to fail fast and avoid waiting until the client operations timeout
var ok bool
for i := 0; i < 5; i++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if probeHTTP(httpClient, config.Host) {
ok = true
break
}
time.Sleep(time.Second * time.Duration(i))
}
if !ok {
klog.Errorf("Failed to connect to apiserver %s: %v", cluster, err)
continue
}

kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Errorf("Failed to create kubeClient for cluster %s: %v", cluster, err)
continue
}
return kubeClient, err
}
return nil, fmt.Errorf("can not find a working kubernetes clientset")
}

func probeHTTP(client *http.Client, address string) bool {
klog.Infof("probe HTTP address %s", address)
resp, err := client.Get(address)
if err != nil {
klog.Infof("Failed to connect to HTTP address %s: %v", address, err)
return false
}
defer resp.Body.Close()
// drain the body
io.ReadAll(resp.Body) // nolint:errcheck
// we only want to verify connectivity so don't need to check the http status code
// as the apiserver may not be ready
return true
}

// TODO: implement leader election to not have problems with multiple providers
// ref: https://github.com/kubernetes/kubernetes/blob/d97ea0f705847f90740cac3bc3dd8f6a4026d0b5/cmd/kube-scheduler/app/server.go#L211
func startCloudControllerManager(ctx context.Context, clusterName string, kubeClient kubernetes.Interface, cloud cloudprovider.Interface) (*ccm, error) {
Expand Down Expand Up @@ -184,3 +240,16 @@ func startCloudControllerManager(ctx context.Context, clusterName string, kubeCl
nodeController: nodeController,
cancelFn: cancel}, nil
}

func cleanup() {
containers, err := container.ListByLabel(constants.NodeCCMLabelKey)
if err != nil {
klog.Errorf("can't list containers: %v", err)
return
}
for _, id := range containers {
if err := container.Delete(id); err != nil {
klog.Errorf("can't delete container %s: %v", id, err)
}
}
}

0 comments on commit 572d9df

Please sign in to comment.