Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(controller): Enable dummy metrics server on non-leader workflow controller #11295

Merged
merged 6 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func NewRootCommand() *cobra.Command {
log.Info("Leader election is turned off. Running in single-instance mode")
log.WithField("id", "single-instance").Info("starting leading")
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers)
go wfController.RunMetricsServer(ctx, false)
} else {
nodeID, ok := os.LookupEnv("LEADER_ELECTION_IDENTITY")
if !ok {
Expand All @@ -128,6 +129,11 @@ func NewRootCommand() *cobra.Command {
leaderName = fmt.Sprintf("%s-%s", leaderName, wfController.Config.InstanceID)
}

// for controlling the dummy metrics server
dummyCtx, dummyCancel := context.WithCancel(context.Background())
defer dummyCancel()
go wfController.RunMetricsServer(dummyCtx, true)

go leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{Name: leaderName, Namespace: namespace}, Client: kubeclientset.CoordinationV1(),
Expand All @@ -139,11 +145,14 @@ func NewRootCommand() *cobra.Command {
RetryPeriod: env.LookupEnvDurationOr("LEADER_ELECTION_RETRY_PERIOD", 5*time.Second),
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
dummyCancel()
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers)
go wfController.RunMetricsServer(ctx, false)
},
OnStoppedLeading: func() {
log.WithField("id", nodeID).Info("stopped leading")
cancel()
go wfController.RunMetricsServer(dummyCtx, true)
},
OnNewLeader: func(identity string) {
log.WithField("leader", identity).Info("new leader")
Expand Down
7 changes: 4 additions & 3 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,6 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
log.Fatal("Timed out waiting for caches to sync")
}

// Start the metrics server
go wfc.metrics.RunServer(ctx)

for i := 0; i < podCleanupWorkers; i++ {
go wait.UntilWithContext(ctx, wfc.runPodCleanup, time.Second)
}
Expand All @@ -323,6 +320,10 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
<-ctx.Done()
}

func (wfc *WorkflowController) RunMetricsServer(ctx context.Context, isDummy bool) {
go wfc.metrics.RunServer(ctx, isDummy)
}

// Create and the Synchronization Manager
func (wfc *WorkflowController) createSynchronizationManager(ctx context.Context) {
getSyncLimit := func(lockKey string) (int, error) {
Expand Down
33 changes: 23 additions & 10 deletions workflow/metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
)

// RunServer starts a metrics server
func (m *Metrics) RunServer(ctx context.Context) {
// If 'isDummy' is set to true, the dummy metrics server will be started. If it's false, the prometheus metrics server will be started
func (m *Metrics) RunServer(ctx context.Context, isDummy bool) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

if !m.metricsConfig.Enabled {
Expand All @@ -36,23 +37,33 @@ func (m *Metrics) RunServer(ctx context.Context) {
// If the telemetry server is different -- and it's enabled -- run each on its own instance
telemetryRegistry := prometheus.NewRegistry()
telemetryRegistry.MustRegister(collectors.NewGoCollector())
go runServer(m.telemetryConfig, telemetryRegistry, ctx)
go runServer(m.telemetryConfig, telemetryRegistry, ctx, isDummy)
}

// Run the metrics server
go runServer(m.metricsConfig, metricsRegistry, ctx)
go runServer(m.metricsConfig, metricsRegistry, ctx, isDummy)

go m.garbageCollector(ctx)
}

func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.Context) {
func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.Context, isDummy bool) {
var handlerOpts promhttp.HandlerOpts
if config.IgnoreErrors {
handlerOpts.ErrorHandling = promhttp.ContinueOnError
}

name := ""
mux := http.NewServeMux()
mux.Handle(config.Path, promhttp.HandlerFor(registry, handlerOpts))
if isDummy {
// dummy metrics server responds to all requests with a 200 status, but without providing any metrics data
name = "dummy metrics server"
mux.HandleFunc(config.Path, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
} else {
name = "prometheus metrics server"
mux.Handle(config.Path, promhttp.HandlerFor(registry, handlerOpts))
}
srv := &http.Server{Addr: fmt.Sprintf(":%v", config.Port), Handler: mux}

if config.Secure {
Expand All @@ -67,15 +78,15 @@ func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.C
}
srv.TLSConfig = tlsConfig
go func() {
log.Infof("Starting prometheus metrics server at localhost:%v%s", config.Port, config.Path)
if err := srv.ListenAndServeTLS("", ""); err != nil {
log.Infof("Starting %s at localhost:%v%s", name, config.Port, config.Path)
if err := srv.ListenAndServeTLS("", ""); err != http.ErrServerClosed {
panic(err)
}
}()
} else {
go func() {
log.Infof("Starting prometheus metrics server at localhost:%v%s", config.Port, config.Path)
if err := srv.ListenAndServe(); err != nil {
log.Infof("Starting %s at localhost:%v%s", name, config.Port, config.Path)
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
panic(err)
}
}()
Expand All @@ -88,7 +99,9 @@ func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.C
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Infof("Unable to shutdown metrics server at localhost:%v%s", config.Port, config.Path)
log.Infof("Unable to shutdown %s at localhost:%v%s", name, config.Port, config.Path)
} else {
log.Infof("Successfully shutdown %s at localhost:%v%s", name, config.Port, config.Path)
}
}

Expand Down
87 changes: 87 additions & 0 deletions workflow/metrics/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package metrics

import (
"context"
"fmt"
"io"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestDisableMetricsServer(t *testing.T) {
config := ServerConfig{
Enabled: false,
Path: DefaultMetricsServerPath,
Port: DefaultMetricsServerPort,
}
m := New(config, config)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

m.RunServer(ctx, false)
time.Sleep(1 * time.Second) // to confirm that the server doesn't start, even if we wait
resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath))
if resp != nil {
defer resp.Body.Close()
}

assert.Error(t, err)
assert.Contains(t, err.Error(), "connection refused") // expect that the metrics server not to start
}

func TestMetricsServer(t *testing.T) {
config := ServerConfig{
Enabled: true,
Path: DefaultMetricsServerPath,
Port: DefaultMetricsServerPort,
}
m := New(config, config)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

m.RunServer(ctx, false)
time.Sleep(1 * time.Second)
resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath))
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)

defer resp.Body.Close()

bodyBytes, err := io.ReadAll(resp.Body)
assert.NoError(t, err)

bodyString := string(bodyBytes)
assert.NotEmpty(t, bodyString)
}

func TestDummyMetricsServer(t *testing.T) {
config := ServerConfig{
Enabled: true,
Path: DefaultMetricsServerPath,
Port: DefaultMetricsServerPort,
}
m := New(config, config)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

m.RunServer(ctx, true)
time.Sleep(1 * time.Second)
resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath))
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)

defer resp.Body.Close()

bodyBytes, err := io.ReadAll(resp.Body)
assert.NoError(t, err)

bodyString := string(bodyBytes)

assert.Empty(t, bodyString) // expect the dummy metrics server to provide no metrics responses
}