Skip to content

Commit

Permalink
Refactor inject-connect command (hashicorp#432)
Browse files Browse the repository at this point in the history
This refactor paves the way for adding the cleanup controller. It
removes the if/else statement around starting the mutating webhook
server when health checks are enabled vs when they're not and instead
starts it the same way in both cases.

It also removes special validation around the CONSUL_HTTP_ADDR because
this was already being discovered earlier in the file.
  • Loading branch information
lkysow authored Feb 10, 2021
1 parent f67e161 commit f5f0878
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 56 deletions.
85 changes: 38 additions & 47 deletions subcommand/inject-connect/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,16 @@ func (c *Command) Run(args []string) int {
if cfg.TLSConfig.CAFile == "" && c.flagConsulCACert != "" {
cfg.TLSConfig.CAFile = c.flagConsulCACert
}
consulURLRaw := cfg.Address
// cfg.Address may or may not be prefixed with scheme.
if !strings.Contains(cfg.Address, "://") {
consulURLRaw = fmt.Sprintf("%s://%s", cfg.Scheme, cfg.Address)
}
consulURL, err := url.Parse(consulURLRaw)
if err != nil {
c.UI.Error(fmt.Sprintf("Error parsing consul address %q: %s", consulURLRaw, err))
return 1
}

// load CA file contents
var consulCACert []byte
Expand Down Expand Up @@ -359,87 +369,68 @@ func (c *Command) Run(args []string) int {
mux.HandleFunc("/mutate", injector.Handle)
mux.HandleFunc("/health/ready", c.handleReady)
var handler http.Handler = mux
serverErrors := make(chan error)
server := &http.Server{
Addr: c.flagListen,
Handler: handler,
TLSConfig: &tls.Config{GetCertificate: c.getCertificate},
}

if c.flagEnableHealthChecks {
// Channel used for health checks
// also check to see if we should enable TLS.
consulAddr := os.Getenv(api.HTTPAddrEnvName)
if consulAddr == "" {
c.UI.Error(fmt.Sprintf("%s is not specified", api.HTTPAddrEnvName))
return 1
}
consulUrl, err := url.Parse(consulAddr)
if err != nil {
c.UI.Error(fmt.Sprintf("Error parsing %s: %s", api.HTTPAddrEnvName, err))
return 1
// Start the mutating webhook server.
go func() {
c.UI.Info(fmt.Sprintf("Listening on %q...", c.flagListen))
if err := server.ListenAndServeTLS("", ""); err != nil {
c.UI.Error(fmt.Sprintf("Error listening: %s", err))
serverErrors <- err
}
}()

serverErrors := make(chan error)
go func() {
c.UI.Info(fmt.Sprintf("Listening on %q...", c.flagListen))
if err := server.ListenAndServeTLS("", ""); err != nil {
c.UI.Error(fmt.Sprintf("Error listening: %s", err))
serverErrors <- err
}
}()

// Start the health checks controller.
ctrlExitCh := make(chan error)
if c.flagEnableHealthChecks {
healthResource := connectinject.HealthCheckResource{
Log: logger.Named("healthCheckResource"),
KubernetesClientset: c.clientset,
ConsulUrl: consulUrl,
ConsulUrl: consulURL,
Ctx: ctx,
ReconcilePeriod: c.flagHealthChecksReconcilePeriod,
}

ctl := &controller.Controller{
healthChecksCtrl := &controller.Controller{
Log: logger.Named("healthCheckController"),
Resource: &healthResource,
}

// Start the health check controller, reconcile is started at the same time
// and new events will queue in the informer.
ctrlExitCh := make(chan error)
go func() {
ctl.Run(ctx.Done())
healthChecksCtrl.Run(ctx.Done())
// If ctl.Run() exits before ctx is cancelled, then our health checks
// controller isn't running. In that case we need to shutdown since
// this is unrecoverable.
if ctx.Err() == nil {
ctrlExitCh <- fmt.Errorf("health checks controller exited unexpectedly")
}
}()
}

select {
// Interrupted/terminated, gracefully exit.
case sig := <-c.sigCh:
c.UI.Info(fmt.Sprintf("%s received, shutting down", sig))
if err := server.Close(); err != nil {
c.UI.Error(fmt.Sprintf("shutting down server: %v", err))
return 1
}
return 0

case <-serverErrors:
return 1

case err := <-ctrlExitCh:
c.UI.Error(fmt.Sprintf("controller error: %v", err))
// Block until we get a signal or something errors.
select {
case sig := <-c.sigCh:
c.UI.Info(fmt.Sprintf("%s received, shutting down", sig))
if err := server.Close(); err != nil {
c.UI.Error(fmt.Sprintf("shutting down server: %v", err))
return 1
}
return 0

} else {
c.UI.Info(fmt.Sprintf("Listening on %q...", c.flagListen))
if err := server.ListenAndServeTLS("", ""); err != nil {
c.UI.Error(fmt.Sprintf("Error listening: %s", err))
return 1
}
case <-serverErrors:
return 1

case err := <-ctrlExitCh:
c.UI.Error(fmt.Sprintf("controller error: %v", err))
return 1
}
return 0
}

func (c *Command) interrupt() {
Expand Down
12 changes: 3 additions & 9 deletions subcommand/inject-connect/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,6 @@ func TestRun_FlagValidation(t *testing.T) {
},
expErr: "request must be <= limit: -consul-sidecar-cpu-request value of \"50m\" is greater than the -consul-sidecar-cpu-limit value of \"25m\"",
},
{
flags: []string{"-consul-k8s-image", "hashicorpdev/consul-k8s:latest", "-consul-image", "foo", "-envoy-image", "envoy:1.16.0",
"-enable-health-checks-controller=true"},
expErr: "CONSUL_HTTP_ADDR is not specified",
},
}

for _, c := range cases {
Expand Down Expand Up @@ -192,18 +187,18 @@ func TestRun_ResourceLimitDefaults(t *testing.T) {
require.Equal(t, cmd.flagConsulSidecarMemoryLimit, "50Mi")
}

func TestRun_ValidationHealthCheckEnv(t *testing.T) {
func TestRun_ValidationConsulHTTPAddr(t *testing.T) {
cases := []struct {
name string
envVars []string
flags []string
expErr string
}{
{
envVars: []string{api.HTTPAddrEnvName, "0.0.0.0:999999"},
envVars: []string{api.HTTPAddrEnvName, "%"},
flags: []string{"-consul-k8s-image", "hashicorp/consul-k8s", "-consul-image", "foo", "-envoy-image", "envoy:1.16.0",
"-enable-health-checks-controller=true"},
expErr: "Error parsing CONSUL_HTTP_ADDR: parse \"0.0.0.0:999999\": first path segment in URL cannot contain colon",
expErr: "Error parsing consul address \"http://%\": parse \"http://%\": invalid URL escape \"%",
},
}
for _, c := range cases {
Expand Down Expand Up @@ -246,7 +241,6 @@ func TestRun_CommandFailsWithInvalidListener(t *testing.T) {
// Test that when healthchecks are enabled that SIGINT/SIGTERM exits the
// command cleanly.
func TestRun_CommandExitsCleanlyAfterSignal(t *testing.T) {

t.Run("SIGINT", testSignalHandling(syscall.SIGINT))
t.Run("SIGTERM", testSignalHandling(syscall.SIGTERM))
}
Expand Down

0 comments on commit f5f0878

Please sign in to comment.