From 3d491f472444c1c40dbffc033f5d39e0f336b564 Mon Sep 17 00:00:00 2001 From: Vincent Demeester Date: Wed, 7 Aug 2024 17:05:37 +0200 Subject: [PATCH] {taskrun,pipelinerun}metrics: make sure config is up-to-date This updates some metrics package and struct to be able to keep up-to-date the metrics configuration in the background go routines that are used. Signed-off-by: Vincent Demeester --- pkg/pipelinerunmetrics/metrics.go | 26 ++++++++++++++++++------ pkg/reconciler/pipelinerun/controller.go | 8 ++++++-- pkg/reconciler/taskrun/controller.go | 9 ++++++-- pkg/taskrunmetrics/metrics.go | 22 +++++++++++++------- 4 files changed, 48 insertions(+), 17 deletions(-) diff --git a/pkg/pipelinerunmetrics/metrics.go b/pkg/pipelinerunmetrics/metrics.go index 3b6afcfa523..d528681db9f 100644 --- a/pkg/pipelinerunmetrics/metrics.go +++ b/pkg/pipelinerunmetrics/metrics.go @@ -105,6 +105,7 @@ const ( type Recorder struct { mutex sync.Mutex initialized bool + cfg *config.Metrics insertTag func(pipeline, pipelinerun string) []tag.Mutator @@ -261,8 +262,8 @@ func viewUnregister() { runningPRsWaitingOnTaskResolutionView) } -// MetricsOnStore returns a function that checks if metrics are configured for a config.Store, and registers it if so -func MetricsOnStore(logger *zap.SugaredLogger) func(name string, +// OnStore returns a function that checks if metrics are configured for a config.Store, and registers it if so +func OnStore(logger *zap.SugaredLogger, r *Recorder) func(name string, value interface{}) { return func(name string, value interface{}) { if name == config.GetMetricsConfigName() { @@ -271,6 +272,8 @@ func MetricsOnStore(logger *zap.SugaredLogger) func(name string, logger.Error("Failed to do type insertion for extracting metrics config") return } + r.updateConfig(cfg) + // Update metrics according to configuration viewUnregister() err := viewRegister(cfg) if err != nil { @@ -282,8 +285,10 @@ func MetricsOnStore(logger *zap.SugaredLogger) func(name string, } func pipelinerunInsertTag(pipeline, pipelinerun string) []tag.Mutator { - return []tag.Mutator{tag.Insert(pipelineTag, pipeline), - tag.Insert(pipelinerunTag, pipelinerun)} + return []tag.Mutator{ + tag.Insert(pipelineTag, pipeline), + tag.Insert(pipelinerunTag, pipelinerun), + } } func pipelineInsertTag(pipeline, pipelinerun string) []tag.Mutator { @@ -312,6 +317,13 @@ func getPipelineTagName(pr *v1.PipelineRun) string { return pipelineName } +func (r *Recorder) updateConfig(cfg *config.Metrics) { + r.mutex.Lock() + defer r.mutex.Unlock() + + r.cfg = cfg +} + // DurationAndCount logs the duration of PipelineRun execution and // count for number of PipelineRuns succeed or failed // returns an error if its failed to log the metrics @@ -351,8 +363,10 @@ func (r *Recorder) DurationAndCount(pr *v1.PipelineRun, beforeCondition *apis.Co ctx, err := tag.New( context.Background(), - append([]tag.Mutator{tag.Insert(namespaceTag, pr.Namespace), - tag.Insert(statusTag, status), tag.Insert(reasonTag, reason)}, r.insertTag(pipelineName, pr.Name)...)...) + append([]tag.Mutator{ + tag.Insert(namespaceTag, pr.Namespace), + tag.Insert(statusTag, status), tag.Insert(reasonTag, reason), + }, r.insertTag(pipelineName, pr.Name)...)...) if err != nil { return err } diff --git a/pkg/reconciler/pipelinerun/controller.go b/pkg/reconciler/pipelinerun/controller.go index 728cd752882..d47ef8d7760 100644 --- a/pkg/reconciler/pipelinerun/controller.go +++ b/pkg/reconciler/pipelinerun/controller.go @@ -62,8 +62,12 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex verificationpolicyInformer := verificationpolicyinformer.Get(ctx) secretinformer := secretinformer.Get(ctx) tracerProvider := tracing.New(TracerProviderName, logger.Named("tracing")) + pipelinerunmetricsRecorder := pipelinerunmetrics.Get(ctx) //nolint:contextcheck // OnStore methods does not support context as a parameter - configStore := config.NewStore(logger.Named("config-store"), pipelinerunmetrics.MetricsOnStore(logger), tracerProvider.OnStore(secretinformer.Lister())) + configStore := config.NewStore(logger.Named("config-store"), + pipelinerunmetrics.OnStore(logger, pipelinerunmetricsRecorder), + tracerProvider.OnStore(secretinformer.Lister()), + ) configStore.WatchConfigs(cmw) c := &Reconciler{ @@ -76,7 +80,7 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex customRunLister: customRunInformer.Lister(), verificationPolicyLister: verificationpolicyInformer.Lister(), cloudEventClient: cloudeventclient.Get(ctx), - metrics: pipelinerunmetrics.Get(ctx), + metrics: pipelinerunmetricsRecorder, pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger), resolutionRequester: resolution.NewCRDRequester(resolutionclient.Get(ctx), resolutionInformer.Lister()), tracerProvider: tracerProvider, diff --git a/pkg/reconciler/taskrun/controller.go b/pkg/reconciler/taskrun/controller.go index 024abd29373..84ab26185d2 100644 --- a/pkg/reconciler/taskrun/controller.go +++ b/pkg/reconciler/taskrun/controller.go @@ -65,8 +65,13 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex secretinformer := secretinformer.Get(ctx) spireClient := spire.GetControllerAPIClient(ctx) tracerProvider := tracing.New(TracerProviderName, logger.Named("tracing")) + taskrunmetricsRecorder := taskrunmetrics.Get(ctx) //nolint:contextcheck // OnStore methods does not support context as a parameter - configStore := config.NewStore(logger.Named("config-store"), taskrunmetrics.MetricsOnStore(logger), spire.OnStore(ctx, logger), tracerProvider.OnStore(secretinformer.Lister())) + configStore := config.NewStore(logger.Named("config-store"), + taskrunmetrics.OnStore(logger, taskrunmetricsRecorder), + spire.OnStore(ctx, logger), + tracerProvider.OnStore(secretinformer.Lister()), + ) configStore.WatchConfigs(cmw) entrypointCache, err := pod.NewEntrypointCache(kubeclientset) @@ -84,7 +89,7 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex limitrangeLister: limitrangeInformer.Lister(), verificationPolicyLister: verificationpolicyInformer.Lister(), cloudEventClient: cloudeventclient.Get(ctx), - metrics: taskrunmetrics.Get(ctx), + metrics: taskrunmetricsRecorder, entrypointCache: entrypointCache, podLister: podInformer.Lister(), pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger), diff --git a/pkg/taskrunmetrics/metrics.go b/pkg/taskrunmetrics/metrics.go index b9c705cf7f9..2a4fa887a24 100644 --- a/pkg/taskrunmetrics/metrics.go +++ b/pkg/taskrunmetrics/metrics.go @@ -121,6 +121,7 @@ var ( type Recorder struct { mutex sync.Mutex initialized bool + cfg *config.Metrics ReportingPeriod time.Duration @@ -144,15 +145,15 @@ var ( // to log the TaskRun related metrics func NewRecorder(ctx context.Context) (*Recorder, error) { once.Do(func() { + cfg := config.FromContextOrDefaults(ctx) r = &Recorder{ initialized: true, + cfg: cfg.Metrics, // Default to reporting metrics every 30s. ReportingPeriod: 30 * time.Second, } - cfg := config.FromContextOrDefaults(ctx) - errRegistering = viewRegister(cfg.Metrics) if errRegistering != nil { r.initialized = false @@ -325,9 +326,8 @@ func viewUnregister() { ) } -// MetricsOnStore returns a function that checks if metrics are configured for a config.Store, and registers it if so -func MetricsOnStore(logger *zap.SugaredLogger) func(name string, - value interface{}) { +// OnStore returns a function that checks if metrics are configured for a config.Store, and registers it if so +func OnStore(logger *zap.SugaredLogger, r *Recorder) func(name string, value interface{}) { return func(name string, value interface{}) { if name == config.GetMetricsConfigName() { cfg, ok := value.(*config.Metrics) @@ -335,6 +335,8 @@ func MetricsOnStore(logger *zap.SugaredLogger) func(name string, logger.Error("Failed to do type insertion for extracting metrics config") return } + r.updateConfig(cfg) + // Update metrics according to the configuration viewUnregister() err := viewRegister(cfg) if err != nil { @@ -389,6 +391,13 @@ func getTaskTagName(tr *v1.TaskRun) string { return taskName } +func (r *Recorder) updateConfig(cfg *config.Metrics) { + r.mutex.Lock() + defer r.mutex.Unlock() + + r.cfg = cfg +} + // DurationAndCount logs the duration of TaskRun execution and // count for number of TaskRuns succeed or failed // returns an error if its failed to log the metrics @@ -454,8 +463,7 @@ func (r *Recorder) RunningTaskRuns(ctx context.Context, lister listers.TaskRunLi return err } - cfg := config.FromContextOrDefaults(ctx) - addNamespaceLabelToQuotaThrottleMetric := cfg.Metrics != nil && cfg.Metrics.ThrottleWithNamespace + addNamespaceLabelToQuotaThrottleMetric := r.cfg != nil && r.cfg.ThrottleWithNamespace var runningTrs int trsThrottledByQuota := map[string]int{}