Skip to content

Commit

Permalink
implement model update
Browse files Browse the repository at this point in the history
Improves the time of object parsing on big k8s clusters - about 1000+ ingress objects which references 1000+ services - using a updatable haproxy model and a object<->model tracking.

The implementation follows this approach:

* track changes on each relevant object type - currently ingress, service, endpoint, secret, configmap, and also pod if drain-support is used;
* every object tracks a host and/or backend in the haproxy model;
* if the object is removed or updated, only the tracked host and/or backend is rebuilt;
* dynupdate doesn't compare old/cur state anymore, but instead dirty and new host/backend state.

This update has the majority of the job: notification improvement, object<->model tracking and converter changes. Missing dirty/new state in the haproxy model, concurrency improvements, dynupdate refactor and some minor adjusts.
  • Loading branch information
jcmoraisjr committed May 2, 2020
1 parent 2fb3005 commit 2a24f12
Show file tree
Hide file tree
Showing 19 changed files with 1,601 additions and 146 deletions.
3 changes: 0 additions & 3 deletions pkg/common/ingress/controller/backend_ssl.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ func (ic *GenericController) SyncSecret(key string) {
cert, err := ic.getPemCertificate(secret)
if err != nil {
glog.V(3).Infof("syncing a non ca/crt secret %v", key)
ic.newctrl.Notify()
return
}

Expand All @@ -59,15 +58,13 @@ func (ic *GenericController) SyncSecret(key string) {
ic.sslCertTracker.Update(key, cert)
// this update must trigger an update
// (like an update event from a change in Ingress)
ic.newctrl.Notify()
return
}

glog.Infof("adding secret %v to the local store", key)
ic.sslCertTracker.Add(key, cert)
// this update must trigger an update
// (like an update event from a change in Ingress)
ic.newctrl.Notify()
}

// getPemCertificate receives a secret, and creates a ingress.SSLCert as return.
Expand Down
1 change: 0 additions & 1 deletion pkg/common/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
type NewCtrlIntf interface {
GetIngressList() ([]*extensions.Ingress, error)
GetSecret(name string) (*apiv1.Secret, error)
Notify()
}

// GenericController holds the boilerplate code required to build an Ingress controlller.
Expand Down
297 changes: 293 additions & 4 deletions pkg/controller/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,26 @@ import (
"fmt"
"os"
"strings"
"sync"
"time"

api "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"

"github.com/jcmoraisjr/haproxy-ingress/pkg/acme"
cfile "github.com/jcmoraisjr/haproxy-ingress/pkg/common/file"
"github.com/jcmoraisjr/haproxy-ingress/pkg/common/ingress/controller"
"github.com/jcmoraisjr/haproxy-ingress/pkg/common/net/ssl"
convtypes "github.com/jcmoraisjr/haproxy-ingress/pkg/converters/types"
"github.com/jcmoraisjr/haproxy-ingress/pkg/types"
"github.com/jcmoraisjr/haproxy-ingress/pkg/utils"
)

const dhparamFilename = "dhparam.pem"
Expand All @@ -46,11 +54,43 @@ type k8scache struct {
listers *listers
controller *controller.GenericController
crossNS bool
globalConfigMapKey string
tcpConfigMapKey string
acmeSecretKeyName string
acmeTokenConfigmapName string
//
updateQueue utils.Queue
stateMutex sync.RWMutex
clear bool
needResync bool
//
globalConfigMapData map[string]string
tcpConfigMapData map[string]string
newGlobalConfigMapData map[string]string
newTCPConfigMapData map[string]string
//
delIngresses []*extensions.Ingress
updIngresses []*extensions.Ingress
addIngresses []*extensions.Ingress
newEndpoints []*api.Endpoints
delServices []*api.Service
updServices []*api.Service
addServices []*api.Service
delSecrets []*api.Secret
updSecrets []*api.Secret
addSecrets []*api.Secret
newPods []*api.Pod
//
}

func newCache(client k8s.Interface, listers *listers, controller *controller.GenericController) *k8scache {
func createCache(
logger types.Logger,
client k8s.Interface,
controller *controller.GenericController,
updateQueue utils.Queue,
watchNamespace string,
resync time.Duration,
) *k8scache {
namespace := os.Getenv("POD_NAMESPACE")
if namespace == "" {
// TODO implement a smart fallback or error checking
Expand All @@ -70,14 +110,36 @@ func newCache(client k8s.Interface, listers *listers, controller *controller.Gen
if !strings.Contains(acmeTokenConfigmapName, "/") {
acmeTokenConfigmapName = namespace + "/" + acmeTokenConfigmapName
}
return &k8scache{
globalConfigMapName := cfg.ConfigMapName
tcpConfigMapName := cfg.TCPConfigMapName
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(logger.Info)
eventBroadcaster.StartRecordingToSink(&typedv1.EventSinkImpl{
Interface: client.CoreV1().Events(watchNamespace),
})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, api.EventSource{
Component: "ingress-controller",
})
cache := &k8scache{
client: client,
listers: listers,
controller: controller,
crossNS: cfg.AllowCrossNamespace,
globalConfigMapKey: globalConfigMapName,
tcpConfigMapKey: tcpConfigMapName,
acmeSecretKeyName: acmeSecretKeyName,
acmeTokenConfigmapName: acmeTokenConfigmapName,
}
stateMutex: sync.RWMutex{},
updateQueue: updateQueue,
clear: true,
needResync: false,
}
// TODO I'm a circular reference, can you fix me?
cache.listers = createListers(cache, logger, recorder, client, watchNamespace, resync)
return cache
}

func (c *k8scache) RunAsync(stopCh <-chan struct{}) {
c.listers.RunAsync(stopCh)
}

func (c *k8scache) GetIngressPodName() (namespace, podname string, err error) {
Expand All @@ -92,6 +154,18 @@ func (c *k8scache) GetIngressPodName() (namespace, podname string, err error) {
return namespace, podname, nil
}

func (c *k8scache) GetIngress(ingressName string) (*extensions.Ingress, error) {
namespace, name, err := cache.SplitMetaNamespaceKey(ingressName)
if err != nil {
return nil, err
}
return c.listers.ingressLister.Ingresses(namespace).Get(name)
}

func (c *k8scache) GetIngressList() ([]*extensions.Ingress, error) {
return c.listers.ingressLister.List(labels.Everything())
}

func (c *k8scache) GetService(serviceName string) (*api.Service, error) {
namespace, name, err := cache.SplitMetaNamespaceKey(serviceName)
if err != nil {
Expand Down Expand Up @@ -423,3 +497,218 @@ func (c *k8scache) CreateOrUpdateConfigMap(cm *api.ConfigMap) (err error) {
}
return err
}

// implements ListerEvents
func (c *k8scache) IsValidIngress(ing *extensions.Ingress) bool {
return c.controller.IsValidClass(ing)
}

// implements ListerEvents
func (c *k8scache) IsValidConfigMap(cm *api.ConfigMap) bool {
key := fmt.Sprintf("%s/%s", cm.Namespace, cm.Name)
return key == c.globalConfigMapKey || key == c.tcpConfigMapKey
}

// implements ListerEvents
func (c *k8scache) Notify(old, cur interface{}) {
// IMPLEMENT
// maintain a list of changed objects only if partial parsing
// is being used -- SyncNewObjects() is being called
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
if old == nil && cur == nil {
c.needResync = true
}
if old != nil {
switch old.(type) {
case *extensions.Ingress:
if cur == nil {
c.delIngresses = append(c.delIngresses, old.(*extensions.Ingress))
}
case *api.Service:
if cur == nil {
c.delServices = append(c.delServices, old.(*api.Service))
}
case *api.Secret:
if cur == nil {
secret := old.(*api.Secret)
c.delSecrets = append(c.delSecrets, secret)
c.controller.DeleteSecret(fmt.Sprintf("%s/%s", secret.Namespace, secret.Name))
}
}
}
if cur != nil {
switch cur.(type) {
case *extensions.Ingress:
ing := cur.(*extensions.Ingress)
if old == nil {
c.addIngresses = append(c.addIngresses, ing)
} else {
c.updIngresses = append(c.updIngresses, ing)
}
case *api.Endpoints:
c.newEndpoints = append(c.newEndpoints, cur.(*api.Endpoints))
case *api.Service:
svc := cur.(*api.Service)
if old == nil {
c.addServices = append(c.addServices, svc)
} else {
c.updServices = append(c.updServices, svc)
}
case *api.Secret:
secret := cur.(*api.Secret)
if old == nil {
c.addSecrets = append(c.addSecrets, secret)
} else {
c.updSecrets = append(c.updSecrets, secret)
}
case *api.ConfigMap:
cm := cur.(*api.ConfigMap)
key := fmt.Sprintf("%s/%s", cm.Namespace, cm.Name)
switch key {
case c.globalConfigMapKey:
c.newGlobalConfigMapData = cm.Data
case c.tcpConfigMapKey:
c.newTCPConfigMapData = cm.Data
}
case *api.Pod:
c.newPods = append(c.newPods, cur.(*api.Pod))
}
}
if c.clear {
// Notify after 500ms, giving the time to receive
// all/most of the changes of a batch update
// TODO parameterize this delay
time.AfterFunc(500*time.Millisecond, func() { c.updateQueue.Notify() })
}
c.clear = false
}

// implements converters.types.Cache
func (c *k8scache) NeedResync() bool {
c.stateMutex.RLock()
defer c.stateMutex.RUnlock()
return c.needResync
}

// implements converters.types.Cache
func (c *k8scache) GlobalConfig() (cur, new map[string]string) {
return c.globalConfigMapData, c.newGlobalConfigMapData
}

// implements converters.types.Cache
func (c *k8scache) GetDirtyIngresses() (del, upd, add []*extensions.Ingress) {
c.stateMutex.RLock()
defer c.stateMutex.RUnlock()
del = make([]*extensions.Ingress, len(c.delIngresses))
for i := range c.delIngresses {
del[i] = c.delIngresses[i]
}
upd = make([]*extensions.Ingress, len(c.updIngresses))
for i := range c.updIngresses {
upd[i] = c.updIngresses[i]
}
add = make([]*extensions.Ingress, len(c.addIngresses))
for i := range c.addIngresses {
add[i] = c.addIngresses[i]
}
return del, upd, add
}

// implements converters.types.Cache
func (c *k8scache) GetDirtyEndpoints() []*api.Endpoints {
c.stateMutex.RLock()
defer c.stateMutex.RUnlock()
ep := make([]*api.Endpoints, len(c.newEndpoints))
for i := range c.newEndpoints {
ep[i] = c.newEndpoints[i]
}
return ep
}

// implements converters.types.Cache
func (c *k8scache) GetDirtyServices() (del, upd, add []*api.Service) {
c.stateMutex.RLock()
defer c.stateMutex.RUnlock()
del = make([]*api.Service, len(c.delServices))
for i := range c.delServices {
del[i] = c.delServices[i]
}
upd = make([]*api.Service, len(c.updServices))
for i := range c.updServices {
upd[i] = c.updServices[i]
}
add = make([]*api.Service, len(c.addServices))
for i := range c.addServices {
add[i] = c.addServices[i]
}
return del, upd, add
}

// implements converters.types.Cache
func (c *k8scache) GetDirtySecrets() (del, upd, add []*api.Secret) {
c.stateMutex.RLock()
defer c.stateMutex.RUnlock()
del = make([]*api.Secret, len(c.delSecrets))
for i := range c.delSecrets {
del[i] = c.delSecrets[i]
}
upd = make([]*api.Secret, len(c.updSecrets))
for i := range c.updSecrets {
upd[i] = c.updSecrets[i]
}
add = make([]*api.Secret, len(c.addSecrets))
for i := range c.addSecrets {
add[i] = c.addSecrets[i]
}
return del, upd, add
}

// implements converters.types.Cache
func (c *k8scache) GetDirtyPods() []*api.Pod {
c.stateMutex.RLock()
defer c.stateMutex.RUnlock()
pods := make([]*api.Pod, len(c.newPods))
for i := range c.newPods {
pods[i] = c.newPods[i]
}
return pods
}

// implements converters.types.Cache
func (c *k8scache) SyncNewObjects() {
// IMPLEMENT
// lock between the first state reading and this sync
// this will avoid loose unread state change
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
//
c.newPods = nil
c.newEndpoints = nil
//
// Secrets
//
c.delSecrets = nil
c.updSecrets = nil
c.addSecrets = nil
//
// Ingress
//
c.delIngresses = nil
c.updIngresses = nil
c.addIngresses = nil
//
// ConfigMaps
//
if c.newGlobalConfigMapData != nil {
c.globalConfigMapData = c.newGlobalConfigMapData
c.newGlobalConfigMapData = nil
}
if c.newTCPConfigMapData != nil {
c.tcpConfigMapData = c.newTCPConfigMapData
c.newTCPConfigMapData = nil
}
//
c.clear = true
c.needResync = false
}
Loading

0 comments on commit 2a24f12

Please sign in to comment.