Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New ingress converter and haproxy types (part 2) #295

Merged
merged 8 commits into from
Mar 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/common/ingress/controller/backend_ssl.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ func (ic *GenericController) getPemCertificate(secret *apiv1.Secret) (*ingress.S
ca := secret.Data["ca.crt"]

// namespace/secretName -> namespace-secretName
nsSecName := strings.Replace(secretName, "/", "-", -1)
// use `_` instead if v0.8+
sep := map[bool]string{true: "-", false: "_"}
nsSecName := strings.Replace(secretName, "/", sep[ic.cfg.V07], -1)

var s *ingress.SSLCert
var err error
Expand Down
43 changes: 26 additions & 17 deletions pkg/common/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1532,27 +1532,31 @@ func (ic GenericController) Stop() error {
return fmt.Errorf("shutdown already in progress")
}

// StartControllers ...
func (ic *GenericController) StartControllers() {
ic.cacheController.Run(ic.stopCh)
}

// Start starts the Ingress controller.
func (ic *GenericController) Start() {
glog.Infof("starting Ingress controller")

ic.cacheController.Run(ic.stopCh)

createDefaultSSLCertificate()

time.Sleep(5 * time.Second)
// initial sync of secrets to avoid unnecessary reloads
glog.Info("running initial sync of secrets")
for _, obj := range ic.listers.Ingress.List() {
ing := obj.(*extensions.Ingress)
if ic.cfg.V07 {
ic.CreateDefaultSSLCertificate()
time.Sleep(5 * time.Second)
// initial sync of secrets to avoid unnecessary reloads
glog.Info("running initial sync of secrets")
for _, obj := range ic.listers.Ingress.List() {
ing := obj.(*extensions.Ingress)

if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
glog.V(2).Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
continue
}

if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
glog.V(2).Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
continue
ic.readSecrets(ing)
}

ic.readSecrets(ing)
}

go ic.syncQueue.Run(time.Second, ic.stopCh)
Expand All @@ -1561,7 +1565,9 @@ func (ic *GenericController) Start() {
go ic.syncStatus.Run(ic.stopCh)
}

go wait.Until(ic.checkMissingSecrets, 30*time.Second, ic.stopCh)
if ic.cfg.V07 {
go wait.Until(ic.checkMissingSecrets, 30*time.Second, ic.stopCh)
}

// force initial sync
ic.syncQueue.Enqueue(&extensions.Ingress{})
Expand All @@ -1583,7 +1589,8 @@ func (ic *GenericController) SetForceReload(shouldReload bool) {
}
}

func createDefaultSSLCertificate() {
// CreateDefaultSSLCertificate ...
func (ic *GenericController) CreateDefaultSSLCertificate() (path, hash string) {
defCert, defKey := ssl.GetFakeSSLCert()
c, err := ssl.AddOrUpdateCertAndKey(fakeCertificate, defCert, defKey, []byte{})
if err != nil {
Expand All @@ -1592,4 +1599,6 @@ func createDefaultSSLCertificate() {

fakeCertificateSHA = c.PemSHA
fakeCertificatePath = c.PemFileName

return fakeCertificatePath, fakeCertificateSHA
}
29 changes: 18 additions & 11 deletions pkg/common/ingress/controller/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func NewIngressController(backend ingress.Controller) *GenericController {
glog.Infof("Watching for ingress class: %s", *ingressClass)
}

if *defaultSvc == "" {
if *v07 && *defaultSvc == "" {
glog.Fatalf("Please specify --default-backend-service")
}

Expand All @@ -157,19 +157,21 @@ func NewIngressController(backend ingress.Controller) *GenericController {
handleFatalInitError(err)
}

ns, name, err := k8s.ParseNameNS(*defaultSvc)
if err != nil {
glog.Fatalf("invalid format for service %v: %v", *defaultSvc, err)
}
if *defaultSvc != "" {
ns, name, err := k8s.ParseNameNS(*defaultSvc)
if err != nil {
glog.Fatalf("invalid format for service %v: %v", *defaultSvc, err)
}

_, err = kubeClient.CoreV1().Services(ns).Get(name, metav1.GetOptions{})
if err != nil {
if strings.Contains(err.Error(), "cannot get services in the namespace") {
glog.Fatalf("✖ It seems the cluster it is running with Authorization enabled (like RBAC) and there is no permissions for the ingress controller. Please check the configuration")
_, err = kubeClient.CoreV1().Services(ns).Get(name, metav1.GetOptions{})
if err != nil {
if strings.Contains(err.Error(), "cannot get services in the namespace") {
glog.Fatalf("✖ It seems the cluster it is running with Authorization enabled (like RBAC) and there is no permissions for the ingress controller. Please check the configuration")
}
glog.Fatalf("no service with name %v found: %v", *defaultSvc, err)
}
glog.Fatalf("no service with name %v found: %v", *defaultSvc, err)
glog.Infof("validated %v as the default backend", *defaultSvc)
}
glog.Infof("validated %v as the default backend", *defaultSvc)

if *publishSvc != "" {
ns, name, err := k8s.ParseNameNS(*publishSvc)
Expand Down Expand Up @@ -199,6 +201,11 @@ func NewIngressController(backend ingress.Controller) *GenericController {
if err != nil {
glog.Fatalf("no watchNamespace with name %v found: %v", *watchNamespace, err)
}
} else {
_, err = kubeClient.CoreV1().Services("default").Get("kubernetes", metav1.GetOptions{})
if err != nil {
glog.Fatalf("error connecting to the apiserver: %v", err)
}
}

if *rateLimitUpdate <= 0 {
Expand Down
72 changes: 66 additions & 6 deletions pkg/controller/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,23 @@ import (

api "k8s.io/api/core/v1"

"github.com/jcmoraisjr/haproxy-ingress/pkg/common/file"
"github.com/jcmoraisjr/haproxy-ingress/pkg/common/ingress"
"github.com/jcmoraisjr/haproxy-ingress/pkg/common/ingress/controller"
"github.com/jcmoraisjr/haproxy-ingress/pkg/common/net/ssl"
ingtypes "github.com/jcmoraisjr/haproxy-ingress/pkg/converters/ingress/types"
)

type cache struct {
listers *ingress.StoreLister
listers *ingress.StoreLister
controller *controller.GenericController
}

func newCache(listers *ingress.StoreLister, controller *controller.GenericController) *cache {
return &cache{
listers: listers,
controller: controller,
}
}

func (c *cache) GetService(serviceName string) (*api.Service, error) {
Expand All @@ -46,14 +58,62 @@ func (c *cache) GetPod(podName string) (*api.Pod, error) {
return c.listers.Pod.GetPod(sname[0], sname[1])
}

func (c *cache) GetTLSSecretPath(secretName string) (string, error) {
return "", fmt.Errorf("implement")
func (c *cache) GetTLSSecretPath(secretName string) (ingtypes.File, error) {
sslCert, err := c.controller.GetCertificate(secretName)
if err != nil {
return ingtypes.File{}, err
}
if sslCert.PemFileName == "" {
return ingtypes.File{}, fmt.Errorf("secret '%s' does not have keys 'tls.crt' and 'tls.key'", secretName)
}
return ingtypes.File{
Filename: sslCert.PemFileName,
SHA1Hash: sslCert.PemSHA,
}, nil
}

func (c *cache) GetCASecretPath(secretName string) (ingtypes.File, error) {
sslCert, err := c.controller.GetCertificate(secretName)
if err != nil {
return ingtypes.File{}, err
}
if sslCert.CAFileName == "" {
return ingtypes.File{}, fmt.Errorf("secret '%s' does not have key 'ca.crt'", secretName)
}
return ingtypes.File{
Filename: sslCert.CAFileName,
SHA1Hash: sslCert.PemSHA,
}, nil
}

func (c *cache) GetCASecretPath(secretName string) (string, error) {
return "", fmt.Errorf("implement")
func (c *cache) GetDHSecretPath(secretName string) (ingtypes.File, error) {
secret, err := c.listers.Secret.GetByName(secretName)
if err != nil {
return ingtypes.File{}, err
}
dh, found := secret.Data[dhparamFilename]
if !found {
return ingtypes.File{}, fmt.Errorf("secret '%s' does not have key '%s'", secretName, dhparamFilename)
}
pem := strings.Replace(secretName, "/", "_", -1)
pemFileName, err := ssl.AddOrUpdateDHParam(pem, dh)
if err != nil {
return ingtypes.File{}, fmt.Errorf("error creating dh-param file '%s': %v", pem, err)
}
return ingtypes.File{
Filename: pemFileName,
SHA1Hash: file.SHA1(pemFileName),
}, nil
}

func (c *cache) GetSecretContent(secretName, keyName string) ([]byte, error) {
return []byte{}, fmt.Errorf("implement")
secret, err := c.listers.Secret.GetByName(secretName)
if err != nil {
return nil, err
}
data, found := secret.Data[keyName]
if !found {
return nil, fmt.Errorf("secret '%s' does not have key '%s'", secretName, keyName)
}
return data, nil
}
64 changes: 55 additions & 9 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (hc *HAProxyController) Info() *ingress.BackendInfo {
// Start starts the controller
func (hc *HAProxyController) Start() {
hc.controller = controller.NewIngressController(hc)
hc.controller.StartControllers()
hc.configController()
hc.controller.Start()
}
Expand All @@ -96,20 +97,65 @@ func (hc *HAProxyController) configController() {

// starting v0.8 only config
logger := &logger{depth: 1}
hc.converterOptions = &ingtypes.ConverterOptions{
Logger: logger,
Cache: &cache{listers: hc.storeLister},
AnnotationPrefix: "ingress.kubernetes.io",
DefaultBackend: hc.cfg.DefaultService,
DefaultSSLSecret: hc.cfg.DefaultSSLCertificate,
}
instanceOptions := haproxy.InstanceOptions{
HAProxyCmd: "haproxy",
ReloadCmd: "/haproxy-reload.sh",
HAProxyConfigFile: "/etc/haproxy/haproxy.cfg",
ReloadStrategy: *hc.reloadStrategy,
MaxOldConfigFiles: *hc.maxOldConfigFiles,
}
hc.instance = haproxy.CreateInstance(logger, hc, instanceOptions)
if err := hc.instance.ParseTemplates(); err != nil {
glog.Fatalf("error creating HAProxy instance: %v", err)
}
cache := newCache(hc.storeLister, hc.controller)
hc.converterOptions = &ingtypes.ConverterOptions{
Logger: logger,
Cache: cache,
AnnotationPrefix: "ingress.kubernetes.io",
DefaultBackend: hc.cfg.DefaultService,
DefaultSSLFile: hc.createDefaultSSLFile(cache),
}
}

func (hc *HAProxyController) createDefaultSSLFile(cache *cache) (tlsFile ingtypes.File) {
if hc.cfg.DefaultSSLCertificate != "" {
tlsFile, err := cache.GetTLSSecretPath(hc.cfg.DefaultSSLCertificate)
if err == nil {
return tlsFile
}
glog.Warningf("using auto generated fake certificate due to an error reading default TLS certificate: %v", err)
} else {
glog.Info("using auto generated fake certificate")
}
path, hash := hc.controller.CreateDefaultSSLCertificate()
tlsFile = ingtypes.File{
Filename: path,
SHA1Hash: hash,
}
return tlsFile
}

// CreateX509CertsDir hard link files from certs to a single directory.
func (hc *HAProxyController) CreateX509CertsDir(bindName string, certs []string) (string, error) {
x509dir := "/var/haproxy/certs/" + bindName
if err := os.RemoveAll(x509dir); err != nil {
return "", err
}
if err := os.MkdirAll(x509dir, 0700); err != nil {
return "", err
}
for _, cert := range certs {
srcFile, err := os.Stat(cert)
if err != nil {
return "", err
}
dstFile := x509dir + "/" + srcFile.Name()
if err := os.Link(cert, dstFile); err != nil {
return "", err
}
}
hc.instance = haproxy.CreateInstance(logger, instanceOptions)
return x509dir, nil
}

// Stop shutdown the controller process
Expand Down Expand Up @@ -223,7 +269,7 @@ func (hc *HAProxyController) SyncIngress(item interface{}) error {
}
converter := ingressconverter.NewIngressConverter(
hc.converterOptions,
hc.instance.CreateConfig(),
hc.instance.Config(),
globalConfig,
)
converter.Sync(ingress)
Expand Down
14 changes: 7 additions & 7 deletions pkg/converters/ingress/annotations/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
hatypes "github.com/jcmoraisjr/haproxy-ingress/pkg/haproxy/types"
)

func (c *updater) buildAffinity(d *backData) {
func (c *updater) buildBackendAffinity(d *backData) {
if d.ann.Affinity != "cookie" {
if d.ann.Affinity != "" {
c.logger.Error("unsupported affinity type on %v: %s", d.ann.Source, d.ann.Affinity)
Expand All @@ -50,7 +50,7 @@ func (c *updater) buildAffinity(d *backData) {
d.backend.Cookie.Key = d.ann.CookieKey
}

func (c *updater) buildAuthHTTP(d *backData) {
func (c *updater) buildBackendAuthHTTP(d *backData) {
if d.ann.AuthType != "basic" {
if d.ann.AuthType != "" {
c.logger.Error("unsupported authentication type on %v: %s", d.ann.Source, d.ann.AuthType)
Expand All @@ -71,7 +71,7 @@ func (c *updater) buildAuthHTTP(d *backData) {
return
}
userstr := string(userb)
users, errs := c.buildAuthHTTPExtractUserlist(d.ann.Source.Name, secretName, userstr)
users, errs := c.buildBackendAuthHTTPExtractUserlist(d.ann.Source.Name, secretName, userstr)
for _, err := range errs {
c.logger.Warn("ignoring malformed usr/passwd on secret '%s', declared on %v: %v", secretName, d.ann.Source, err)
}
Expand All @@ -83,7 +83,7 @@ func (c *updater) buildAuthHTTP(d *backData) {
d.backend.HreqValidateUserlist(userlist)
}

func (c *updater) buildAuthHTTPExtractUserlist(source, secret, users string) ([]hatypes.User, []error) {
func (c *updater) buildBackendAuthHTTPExtractUserlist(source, secret, users string) ([]hatypes.User, []error) {
var userlist []hatypes.User
var err []error
for i, usr := range strings.Split(users, "\n") {
Expand Down Expand Up @@ -125,7 +125,7 @@ func (c *updater) buildAuthHTTPExtractUserlist(source, secret, users string) ([]
return userlist, err
}

func (c *updater) buildBlueGreen(d *backData) {
func (c *updater) buildBackendBlueGreen(d *backData) {
balance := d.ann.BlueGreenBalance
if balance == "" {
balance = d.ann.BlueGreenDeploy
Expand Down Expand Up @@ -168,7 +168,7 @@ func (c *updater) buildBlueGreen(d *backData) {
}
for _, ep := range d.backend.Endpoints {
hasLabel := false
if pod, err := c.cache.GetPod(ep.Target); err == nil {
if pod, err := c.cache.GetPod(ep.TargetRef); err == nil {
for _, dw := range deployWeights {
if label, found := pod.Labels[dw.labelName]; found {
if label == dw.labelValue {
Expand All @@ -181,7 +181,7 @@ func (c *updater) buildBlueGreen(d *backData) {
}
}
} else {
if ep.Target == "" {
if ep.TargetRef == "" {
err = fmt.Errorf("endpoint does not reference a pod")
}
c.logger.Warn("endpoint '%s:%d' on %v was removed from balance: %v", ep.IP, ep.Port, d.ann.Source, err)
Expand Down
Loading