Skip to content

Commit

Permalink
Merge pull request #295 from jcmoraisjr/jm-v08-controller-2
Browse files Browse the repository at this point in the history
New ingress converter and haproxy types (part 2)
  • Loading branch information
jcmoraisjr authored Mar 13, 2019
2 parents 3fe202c + 8a6be4b commit 7d162f0
Show file tree
Hide file tree
Showing 38 changed files with 3,084 additions and 440 deletions.
4 changes: 3 additions & 1 deletion pkg/common/ingress/controller/backend_ssl.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ func (ic *GenericController) getPemCertificate(secret *apiv1.Secret) (*ingress.S
ca := secret.Data["ca.crt"]

// namespace/secretName -> namespace-secretName
nsSecName := strings.Replace(secretName, "/", "-", -1)
// use `_` instead if v0.8+
sep := map[bool]string{true: "-", false: "_"}
nsSecName := strings.Replace(secretName, "/", sep[ic.cfg.V07], -1)

var s *ingress.SSLCert
var err error
Expand Down
43 changes: 26 additions & 17 deletions pkg/common/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1532,27 +1532,31 @@ func (ic GenericController) Stop() error {
return fmt.Errorf("shutdown already in progress")
}

// StartControllers ...
func (ic *GenericController) StartControllers() {
ic.cacheController.Run(ic.stopCh)
}

// Start starts the Ingress controller.
func (ic *GenericController) Start() {
glog.Infof("starting Ingress controller")

ic.cacheController.Run(ic.stopCh)

createDefaultSSLCertificate()

time.Sleep(5 * time.Second)
// initial sync of secrets to avoid unnecessary reloads
glog.Info("running initial sync of secrets")
for _, obj := range ic.listers.Ingress.List() {
ing := obj.(*extensions.Ingress)
if ic.cfg.V07 {
ic.CreateDefaultSSLCertificate()
time.Sleep(5 * time.Second)
// initial sync of secrets to avoid unnecessary reloads
glog.Info("running initial sync of secrets")
for _, obj := range ic.listers.Ingress.List() {
ing := obj.(*extensions.Ingress)

if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
glog.V(2).Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
continue
}

if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
glog.V(2).Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
continue
ic.readSecrets(ing)
}

ic.readSecrets(ing)
}

go ic.syncQueue.Run(time.Second, ic.stopCh)
Expand All @@ -1561,7 +1565,9 @@ func (ic *GenericController) Start() {
go ic.syncStatus.Run(ic.stopCh)
}

go wait.Until(ic.checkMissingSecrets, 30*time.Second, ic.stopCh)
if ic.cfg.V07 {
go wait.Until(ic.checkMissingSecrets, 30*time.Second, ic.stopCh)
}

// force initial sync
ic.syncQueue.Enqueue(&extensions.Ingress{})
Expand All @@ -1583,7 +1589,8 @@ func (ic *GenericController) SetForceReload(shouldReload bool) {
}
}

func createDefaultSSLCertificate() {
// CreateDefaultSSLCertificate ...
func (ic *GenericController) CreateDefaultSSLCertificate() (path, hash string) {
defCert, defKey := ssl.GetFakeSSLCert()
c, err := ssl.AddOrUpdateCertAndKey(fakeCertificate, defCert, defKey, []byte{})
if err != nil {
Expand All @@ -1592,4 +1599,6 @@ func createDefaultSSLCertificate() {

fakeCertificateSHA = c.PemSHA
fakeCertificatePath = c.PemFileName

return fakeCertificatePath, fakeCertificateSHA
}
29 changes: 18 additions & 11 deletions pkg/common/ingress/controller/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func NewIngressController(backend ingress.Controller) *GenericController {
glog.Infof("Watching for ingress class: %s", *ingressClass)
}

if *defaultSvc == "" {
if *v07 && *defaultSvc == "" {
glog.Fatalf("Please specify --default-backend-service")
}

Expand All @@ -157,19 +157,21 @@ func NewIngressController(backend ingress.Controller) *GenericController {
handleFatalInitError(err)
}

ns, name, err := k8s.ParseNameNS(*defaultSvc)
if err != nil {
glog.Fatalf("invalid format for service %v: %v", *defaultSvc, err)
}
if *defaultSvc != "" {
ns, name, err := k8s.ParseNameNS(*defaultSvc)
if err != nil {
glog.Fatalf("invalid format for service %v: %v", *defaultSvc, err)
}

_, err = kubeClient.CoreV1().Services(ns).Get(name, metav1.GetOptions{})
if err != nil {
if strings.Contains(err.Error(), "cannot get services in the namespace") {
glog.Fatalf("✖ It seems the cluster it is running with Authorization enabled (like RBAC) and there is no permissions for the ingress controller. Please check the configuration")
_, err = kubeClient.CoreV1().Services(ns).Get(name, metav1.GetOptions{})
if err != nil {
if strings.Contains(err.Error(), "cannot get services in the namespace") {
glog.Fatalf("✖ It seems the cluster it is running with Authorization enabled (like RBAC) and there is no permissions for the ingress controller. Please check the configuration")
}
glog.Fatalf("no service with name %v found: %v", *defaultSvc, err)
}
glog.Fatalf("no service with name %v found: %v", *defaultSvc, err)
glog.Infof("validated %v as the default backend", *defaultSvc)
}
glog.Infof("validated %v as the default backend", *defaultSvc)

if *publishSvc != "" {
ns, name, err := k8s.ParseNameNS(*publishSvc)
Expand Down Expand Up @@ -199,6 +201,11 @@ func NewIngressController(backend ingress.Controller) *GenericController {
if err != nil {
glog.Fatalf("no watchNamespace with name %v found: %v", *watchNamespace, err)
}
} else {
_, err = kubeClient.CoreV1().Services("default").Get("kubernetes", metav1.GetOptions{})
if err != nil {
glog.Fatalf("error connecting to the apiserver: %v", err)
}
}

if *rateLimitUpdate <= 0 {
Expand Down
72 changes: 66 additions & 6 deletions pkg/controller/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,23 @@ import (

api "k8s.io/api/core/v1"

"github.com/jcmoraisjr/haproxy-ingress/pkg/common/file"
"github.com/jcmoraisjr/haproxy-ingress/pkg/common/ingress"
"github.com/jcmoraisjr/haproxy-ingress/pkg/common/ingress/controller"
"github.com/jcmoraisjr/haproxy-ingress/pkg/common/net/ssl"
ingtypes "github.com/jcmoraisjr/haproxy-ingress/pkg/converters/ingress/types"
)

type cache struct {
listers *ingress.StoreLister
listers *ingress.StoreLister
controller *controller.GenericController
}

func newCache(listers *ingress.StoreLister, controller *controller.GenericController) *cache {
return &cache{
listers: listers,
controller: controller,
}
}

func (c *cache) GetService(serviceName string) (*api.Service, error) {
Expand All @@ -46,14 +58,62 @@ func (c *cache) GetPod(podName string) (*api.Pod, error) {
return c.listers.Pod.GetPod(sname[0], sname[1])
}

func (c *cache) GetTLSSecretPath(secretName string) (string, error) {
return "", fmt.Errorf("implement")
func (c *cache) GetTLSSecretPath(secretName string) (ingtypes.File, error) {
sslCert, err := c.controller.GetCertificate(secretName)
if err != nil {
return ingtypes.File{}, err
}
if sslCert.PemFileName == "" {
return ingtypes.File{}, fmt.Errorf("secret '%s' does not have keys 'tls.crt' and 'tls.key'", secretName)
}
return ingtypes.File{
Filename: sslCert.PemFileName,
SHA1Hash: sslCert.PemSHA,
}, nil
}

func (c *cache) GetCASecretPath(secretName string) (ingtypes.File, error) {
sslCert, err := c.controller.GetCertificate(secretName)
if err != nil {
return ingtypes.File{}, err
}
if sslCert.CAFileName == "" {
return ingtypes.File{}, fmt.Errorf("secret '%s' does not have key 'ca.crt'", secretName)
}
return ingtypes.File{
Filename: sslCert.CAFileName,
SHA1Hash: sslCert.PemSHA,
}, nil
}

func (c *cache) GetCASecretPath(secretName string) (string, error) {
return "", fmt.Errorf("implement")
func (c *cache) GetDHSecretPath(secretName string) (ingtypes.File, error) {
secret, err := c.listers.Secret.GetByName(secretName)
if err != nil {
return ingtypes.File{}, err
}
dh, found := secret.Data[dhparamFilename]
if !found {
return ingtypes.File{}, fmt.Errorf("secret '%s' does not have key '%s'", secretName, dhparamFilename)
}
pem := strings.Replace(secretName, "/", "_", -1)
pemFileName, err := ssl.AddOrUpdateDHParam(pem, dh)
if err != nil {
return ingtypes.File{}, fmt.Errorf("error creating dh-param file '%s': %v", pem, err)
}
return ingtypes.File{
Filename: pemFileName,
SHA1Hash: file.SHA1(pemFileName),
}, nil
}

func (c *cache) GetSecretContent(secretName, keyName string) ([]byte, error) {
return []byte{}, fmt.Errorf("implement")
secret, err := c.listers.Secret.GetByName(secretName)
if err != nil {
return nil, err
}
data, found := secret.Data[keyName]
if !found {
return nil, fmt.Errorf("secret '%s' does not have key '%s'", secretName, keyName)
}
return data, nil
}
64 changes: 55 additions & 9 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (hc *HAProxyController) Info() *ingress.BackendInfo {
// Start starts the controller
func (hc *HAProxyController) Start() {
hc.controller = controller.NewIngressController(hc)
hc.controller.StartControllers()
hc.configController()
hc.controller.Start()
}
Expand All @@ -96,20 +97,65 @@ func (hc *HAProxyController) configController() {

// starting v0.8 only config
logger := &logger{depth: 1}
hc.converterOptions = &ingtypes.ConverterOptions{
Logger: logger,
Cache: &cache{listers: hc.storeLister},
AnnotationPrefix: "ingress.kubernetes.io",
DefaultBackend: hc.cfg.DefaultService,
DefaultSSLSecret: hc.cfg.DefaultSSLCertificate,
}
instanceOptions := haproxy.InstanceOptions{
HAProxyCmd: "haproxy",
ReloadCmd: "/haproxy-reload.sh",
HAProxyConfigFile: "/etc/haproxy/haproxy.cfg",
ReloadStrategy: *hc.reloadStrategy,
MaxOldConfigFiles: *hc.maxOldConfigFiles,
}
hc.instance = haproxy.CreateInstance(logger, hc, instanceOptions)
if err := hc.instance.ParseTemplates(); err != nil {
glog.Fatalf("error creating HAProxy instance: %v", err)
}
cache := newCache(hc.storeLister, hc.controller)
hc.converterOptions = &ingtypes.ConverterOptions{
Logger: logger,
Cache: cache,
AnnotationPrefix: "ingress.kubernetes.io",
DefaultBackend: hc.cfg.DefaultService,
DefaultSSLFile: hc.createDefaultSSLFile(cache),
}
}

func (hc *HAProxyController) createDefaultSSLFile(cache *cache) (tlsFile ingtypes.File) {
if hc.cfg.DefaultSSLCertificate != "" {
tlsFile, err := cache.GetTLSSecretPath(hc.cfg.DefaultSSLCertificate)
if err == nil {
return tlsFile
}
glog.Warningf("using auto generated fake certificate due to an error reading default TLS certificate: %v", err)
} else {
glog.Info("using auto generated fake certificate")
}
path, hash := hc.controller.CreateDefaultSSLCertificate()
tlsFile = ingtypes.File{
Filename: path,
SHA1Hash: hash,
}
return tlsFile
}

// CreateX509CertsDir hard link files from certs to a single directory.
func (hc *HAProxyController) CreateX509CertsDir(bindName string, certs []string) (string, error) {
x509dir := "/var/haproxy/certs/" + bindName
if err := os.RemoveAll(x509dir); err != nil {
return "", err
}
if err := os.MkdirAll(x509dir, 0700); err != nil {
return "", err
}
for _, cert := range certs {
srcFile, err := os.Stat(cert)
if err != nil {
return "", err
}
dstFile := x509dir + "/" + srcFile.Name()
if err := os.Link(cert, dstFile); err != nil {
return "", err
}
}
hc.instance = haproxy.CreateInstance(logger, instanceOptions)
return x509dir, nil
}

// Stop shutdown the controller process
Expand Down Expand Up @@ -223,7 +269,7 @@ func (hc *HAProxyController) SyncIngress(item interface{}) error {
}
converter := ingressconverter.NewIngressConverter(
hc.converterOptions,
hc.instance.CreateConfig(),
hc.instance.Config(),
globalConfig,
)
converter.Sync(ingress)
Expand Down
14 changes: 7 additions & 7 deletions pkg/converters/ingress/annotations/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
hatypes "github.com/jcmoraisjr/haproxy-ingress/pkg/haproxy/types"
)

func (c *updater) buildAffinity(d *backData) {
func (c *updater) buildBackendAffinity(d *backData) {
if d.ann.Affinity != "cookie" {
if d.ann.Affinity != "" {
c.logger.Error("unsupported affinity type on %v: %s", d.ann.Source, d.ann.Affinity)
Expand All @@ -50,7 +50,7 @@ func (c *updater) buildAffinity(d *backData) {
d.backend.Cookie.Key = d.ann.CookieKey
}

func (c *updater) buildAuthHTTP(d *backData) {
func (c *updater) buildBackendAuthHTTP(d *backData) {
if d.ann.AuthType != "basic" {
if d.ann.AuthType != "" {
c.logger.Error("unsupported authentication type on %v: %s", d.ann.Source, d.ann.AuthType)
Expand All @@ -71,7 +71,7 @@ func (c *updater) buildAuthHTTP(d *backData) {
return
}
userstr := string(userb)
users, errs := c.buildAuthHTTPExtractUserlist(d.ann.Source.Name, secretName, userstr)
users, errs := c.buildBackendAuthHTTPExtractUserlist(d.ann.Source.Name, secretName, userstr)
for _, err := range errs {
c.logger.Warn("ignoring malformed usr/passwd on secret '%s', declared on %v: %v", secretName, d.ann.Source, err)
}
Expand All @@ -83,7 +83,7 @@ func (c *updater) buildAuthHTTP(d *backData) {
d.backend.HreqValidateUserlist(userlist)
}

func (c *updater) buildAuthHTTPExtractUserlist(source, secret, users string) ([]hatypes.User, []error) {
func (c *updater) buildBackendAuthHTTPExtractUserlist(source, secret, users string) ([]hatypes.User, []error) {
var userlist []hatypes.User
var err []error
for i, usr := range strings.Split(users, "\n") {
Expand Down Expand Up @@ -125,7 +125,7 @@ func (c *updater) buildAuthHTTPExtractUserlist(source, secret, users string) ([]
return userlist, err
}

func (c *updater) buildBlueGreen(d *backData) {
func (c *updater) buildBackendBlueGreen(d *backData) {
balance := d.ann.BlueGreenBalance
if balance == "" {
balance = d.ann.BlueGreenDeploy
Expand Down Expand Up @@ -168,7 +168,7 @@ func (c *updater) buildBlueGreen(d *backData) {
}
for _, ep := range d.backend.Endpoints {
hasLabel := false
if pod, err := c.cache.GetPod(ep.Target); err == nil {
if pod, err := c.cache.GetPod(ep.TargetRef); err == nil {
for _, dw := range deployWeights {
if label, found := pod.Labels[dw.labelName]; found {
if label == dw.labelValue {
Expand All @@ -181,7 +181,7 @@ func (c *updater) buildBlueGreen(d *backData) {
}
}
} else {
if ep.Target == "" {
if ep.TargetRef == "" {
err = fmt.Errorf("endpoint does not reference a pod")
}
c.logger.Warn("endpoint '%s:%d' on %v was removed from balance: %v", ep.IP, ep.Port, d.ann.Source, err)
Expand Down
Loading

0 comments on commit 7d162f0

Please sign in to comment.