From f5f087861fd37241320322b0c4d07f133be5b89b Mon Sep 17 00:00:00 2001 From: Luke Kysow <1034429+lkysow@users.noreply.github.com> Date: Tue, 9 Feb 2021 16:31:10 -0800 Subject: [PATCH] Refactor inject-connect command (#432) This refactor paves the way for adding the cleanup controller. It removes the if/else statement around starting the mutating webhook server when health checks are enabled vs when they're not and instead starts it the same way in both cases. It also removes special validation around the CONSUL_HTTP_ADDR because this was already being discovered earlier in the file. --- subcommand/inject-connect/command.go | 85 ++++++++++------------- subcommand/inject-connect/command_test.go | 12 +--- 2 files changed, 41 insertions(+), 56 deletions(-) diff --git a/subcommand/inject-connect/command.go b/subcommand/inject-connect/command.go index 51eb61d544c7..6711d8776ddf 100644 --- a/subcommand/inject-connect/command.go +++ b/subcommand/inject-connect/command.go @@ -282,6 +282,16 @@ func (c *Command) Run(args []string) int { if cfg.TLSConfig.CAFile == "" && c.flagConsulCACert != "" { cfg.TLSConfig.CAFile = c.flagConsulCACert } + consulURLRaw := cfg.Address + // cfg.Address may or may not be prefixed with scheme. + if !strings.Contains(cfg.Address, "://") { + consulURLRaw = fmt.Sprintf("%s://%s", cfg.Scheme, cfg.Address) + } + consulURL, err := url.Parse(consulURLRaw) + if err != nil { + c.UI.Error(fmt.Sprintf("Error parsing consul address %q: %s", consulURLRaw, err)) + return 1 + } // load CA file contents var consulCACert []byte @@ -359,53 +369,42 @@ func (c *Command) Run(args []string) int { mux.HandleFunc("/mutate", injector.Handle) mux.HandleFunc("/health/ready", c.handleReady) var handler http.Handler = mux + serverErrors := make(chan error) server := &http.Server{ Addr: c.flagListen, Handler: handler, TLSConfig: &tls.Config{GetCertificate: c.getCertificate}, } - if c.flagEnableHealthChecks { - // Channel used for health checks - // also check to see if we should enable TLS. - consulAddr := os.Getenv(api.HTTPAddrEnvName) - if consulAddr == "" { - c.UI.Error(fmt.Sprintf("%s is not specified", api.HTTPAddrEnvName)) - return 1 - } - consulUrl, err := url.Parse(consulAddr) - if err != nil { - c.UI.Error(fmt.Sprintf("Error parsing %s: %s", api.HTTPAddrEnvName, err)) - return 1 + // Start the mutating webhook server. + go func() { + c.UI.Info(fmt.Sprintf("Listening on %q...", c.flagListen)) + if err := server.ListenAndServeTLS("", ""); err != nil { + c.UI.Error(fmt.Sprintf("Error listening: %s", err)) + serverErrors <- err } + }() - serverErrors := make(chan error) - go func() { - c.UI.Info(fmt.Sprintf("Listening on %q...", c.flagListen)) - if err := server.ListenAndServeTLS("", ""); err != nil { - c.UI.Error(fmt.Sprintf("Error listening: %s", err)) - serverErrors <- err - } - }() - + // Start the health checks controller. + ctrlExitCh := make(chan error) + if c.flagEnableHealthChecks { healthResource := connectinject.HealthCheckResource{ Log: logger.Named("healthCheckResource"), KubernetesClientset: c.clientset, - ConsulUrl: consulUrl, + ConsulUrl: consulURL, Ctx: ctx, ReconcilePeriod: c.flagHealthChecksReconcilePeriod, } - ctl := &controller.Controller{ + healthChecksCtrl := &controller.Controller{ Log: logger.Named("healthCheckController"), Resource: &healthResource, } // Start the health check controller, reconcile is started at the same time // and new events will queue in the informer. - ctrlExitCh := make(chan error) go func() { - ctl.Run(ctx.Done()) + healthChecksCtrl.Run(ctx.Done()) // If ctl.Run() exits before ctx is cancelled, then our health checks // controller isn't running. In that case we need to shutdown since // this is unrecoverable. @@ -413,33 +412,25 @@ func (c *Command) Run(args []string) int { ctrlExitCh <- fmt.Errorf("health checks controller exited unexpectedly") } }() + } - select { - // Interrupted/terminated, gracefully exit. - case sig := <-c.sigCh: - c.UI.Info(fmt.Sprintf("%s received, shutting down", sig)) - if err := server.Close(); err != nil { - c.UI.Error(fmt.Sprintf("shutting down server: %v", err)) - return 1 - } - return 0 - - case <-serverErrors: - return 1 - - case err := <-ctrlExitCh: - c.UI.Error(fmt.Sprintf("controller error: %v", err)) + // Block until we get a signal or something errors. + select { + case sig := <-c.sigCh: + c.UI.Info(fmt.Sprintf("%s received, shutting down", sig)) + if err := server.Close(); err != nil { + c.UI.Error(fmt.Sprintf("shutting down server: %v", err)) return 1 } + return 0 - } else { - c.UI.Info(fmt.Sprintf("Listening on %q...", c.flagListen)) - if err := server.ListenAndServeTLS("", ""); err != nil { - c.UI.Error(fmt.Sprintf("Error listening: %s", err)) - return 1 - } + case <-serverErrors: + return 1 + + case err := <-ctrlExitCh: + c.UI.Error(fmt.Sprintf("controller error: %v", err)) + return 1 } - return 0 } func (c *Command) interrupt() { diff --git a/subcommand/inject-connect/command_test.go b/subcommand/inject-connect/command_test.go index 438fbe036576..40ff4cf3482a 100644 --- a/subcommand/inject-connect/command_test.go +++ b/subcommand/inject-connect/command_test.go @@ -153,11 +153,6 @@ func TestRun_FlagValidation(t *testing.T) { }, expErr: "request must be <= limit: -consul-sidecar-cpu-request value of \"50m\" is greater than the -consul-sidecar-cpu-limit value of \"25m\"", }, - { - flags: []string{"-consul-k8s-image", "hashicorpdev/consul-k8s:latest", "-consul-image", "foo", "-envoy-image", "envoy:1.16.0", - "-enable-health-checks-controller=true"}, - expErr: "CONSUL_HTTP_ADDR is not specified", - }, } for _, c := range cases { @@ -192,7 +187,7 @@ func TestRun_ResourceLimitDefaults(t *testing.T) { require.Equal(t, cmd.flagConsulSidecarMemoryLimit, "50Mi") } -func TestRun_ValidationHealthCheckEnv(t *testing.T) { +func TestRun_ValidationConsulHTTPAddr(t *testing.T) { cases := []struct { name string envVars []string @@ -200,10 +195,10 @@ func TestRun_ValidationHealthCheckEnv(t *testing.T) { expErr string }{ { - envVars: []string{api.HTTPAddrEnvName, "0.0.0.0:999999"}, + envVars: []string{api.HTTPAddrEnvName, "%"}, flags: []string{"-consul-k8s-image", "hashicorp/consul-k8s", "-consul-image", "foo", "-envoy-image", "envoy:1.16.0", "-enable-health-checks-controller=true"}, - expErr: "Error parsing CONSUL_HTTP_ADDR: parse \"0.0.0.0:999999\": first path segment in URL cannot contain colon", + expErr: "Error parsing consul address \"http://%\": parse \"http://%\": invalid URL escape \"%", }, } for _, c := range cases { @@ -246,7 +241,6 @@ func TestRun_CommandFailsWithInvalidListener(t *testing.T) { // Test that when healthchecks are enabled that SIGINT/SIGTERM exits the // command cleanly. func TestRun_CommandExitsCleanlyAfterSignal(t *testing.T) { - t.Run("SIGINT", testSignalHandling(syscall.SIGINT)) t.Run("SIGTERM", testSignalHandling(syscall.SIGTERM)) }