Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

{taskrun,pipelinerun}metrics: make sure config is up-to-date #8187

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions pkg/pipelinerunmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ const (
type Recorder struct {
mutex sync.Mutex
initialized bool
cfg *config.Metrics

insertTag func(pipeline,
pipelinerun string) []tag.Mutator
Expand Down Expand Up @@ -261,8 +262,8 @@ func viewUnregister() {
runningPRsWaitingOnTaskResolutionView)
}

// MetricsOnStore returns a function that checks if metrics are configured for a config.Store, and registers it if so
func MetricsOnStore(logger *zap.SugaredLogger) func(name string,
// OnStore returns a function that checks if metrics are configured for a config.Store, and registers it if so
func OnStore(logger *zap.SugaredLogger, r *Recorder) func(name string,
value interface{}) {
return func(name string, value interface{}) {
if name == config.GetMetricsConfigName() {
Expand All @@ -271,6 +272,8 @@ func MetricsOnStore(logger *zap.SugaredLogger) func(name string,
logger.Error("Failed to do type insertion for extracting metrics config")
return
}
r.updateConfig(cfg)
// Update metrics according to configuration
viewUnregister()
err := viewRegister(cfg)
if err != nil {
Expand All @@ -282,8 +285,10 @@ func MetricsOnStore(logger *zap.SugaredLogger) func(name string,
}

func pipelinerunInsertTag(pipeline, pipelinerun string) []tag.Mutator {
return []tag.Mutator{tag.Insert(pipelineTag, pipeline),
tag.Insert(pipelinerunTag, pipelinerun)}
return []tag.Mutator{
tag.Insert(pipelineTag, pipeline),
tag.Insert(pipelinerunTag, pipelinerun),
}
}

func pipelineInsertTag(pipeline, pipelinerun string) []tag.Mutator {
Expand Down Expand Up @@ -312,6 +317,13 @@ func getPipelineTagName(pr *v1.PipelineRun) string {
return pipelineName
}

func (r *Recorder) updateConfig(cfg *config.Metrics) {
r.mutex.Lock()
defer r.mutex.Unlock()

r.cfg = cfg
}

// DurationAndCount logs the duration of PipelineRun execution and
// count for number of PipelineRuns succeed or failed
// returns an error if its failed to log the metrics
Expand Down Expand Up @@ -351,8 +363,10 @@ func (r *Recorder) DurationAndCount(pr *v1.PipelineRun, beforeCondition *apis.Co

ctx, err := tag.New(
context.Background(),
append([]tag.Mutator{tag.Insert(namespaceTag, pr.Namespace),
tag.Insert(statusTag, status), tag.Insert(reasonTag, reason)}, r.insertTag(pipelineName, pr.Name)...)...)
append([]tag.Mutator{
tag.Insert(namespaceTag, pr.Namespace),
tag.Insert(statusTag, status), tag.Insert(reasonTag, reason),
}, r.insertTag(pipelineName, pr.Name)...)...)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/pipelinerunmetrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestUninitializedMetrics(t *testing.T) {
}
}

func TestMetricsOnStore(t *testing.T) {
func TestOnStore(t *testing.T) {
log := zap.NewExample()
defer log.Sync()
logger := log.Sugar()
Expand All @@ -81,7 +81,7 @@ func TestMetricsOnStore(t *testing.T) {
}

// We check that there's no change when incorrect config is passed
MetricsOnStore(logger)(config.GetMetricsConfigName(), &config.Store{})
OnStore(logger, metrics)(config.GetMetricsConfigName(), &config.Store{})
// Comparing function assign to struct with the one which should yield same value
if reflect.ValueOf(metrics.insertTag).Pointer() != reflect.ValueOf(pipelinerunInsertTag).Pointer() {
t.Fatal("metrics recorder shouldn't change during this OnStore call")
Expand All @@ -94,7 +94,7 @@ func TestMetricsOnStore(t *testing.T) {
DurationTaskrunType: config.DurationTaskrunTypeHistogram,
DurationPipelinerunType: config.DurationPipelinerunTypeLastValue,
}
MetricsOnStore(logger)(config.GetMetricsConfigName(), cfg)
OnStore(logger, metrics)(config.GetMetricsConfigName(), cfg)
if reflect.ValueOf(metrics.insertTag).Pointer() != reflect.ValueOf(pipelinerunInsertTag).Pointer() {
t.Fatal("metrics recorder shouldn't change during this OnStore call")
}
Expand All @@ -105,7 +105,7 @@ func TestMetricsOnStore(t *testing.T) {
DurationTaskrunType: config.DurationTaskrunTypeHistogram,
DurationPipelinerunType: config.DurationPipelinerunTypeLastValue,
}
MetricsOnStore(logger)(config.GetMetricsConfigName(), cfg)
OnStore(logger, metrics)(config.GetMetricsConfigName(), cfg)
if reflect.ValueOf(metrics.insertTag).Pointer() != reflect.ValueOf(nilInsertTag).Pointer() {
t.Fatal("metrics recorder didn't change during OnStore call")
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,12 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
verificationpolicyInformer := verificationpolicyinformer.Get(ctx)
secretinformer := secretinformer.Get(ctx)
tracerProvider := tracing.New(TracerProviderName, logger.Named("tracing"))
pipelinerunmetricsRecorder := pipelinerunmetrics.Get(ctx)
//nolint:contextcheck // OnStore methods does not support context as a parameter
configStore := config.NewStore(logger.Named("config-store"), pipelinerunmetrics.MetricsOnStore(logger), tracerProvider.OnStore(secretinformer.Lister()))
configStore := config.NewStore(logger.Named("config-store"),
pipelinerunmetrics.OnStore(logger, pipelinerunmetricsRecorder),
tracerProvider.OnStore(secretinformer.Lister()),
)
configStore.WatchConfigs(cmw)

c := &Reconciler{
Expand All @@ -76,7 +80,7 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
customRunLister: customRunInformer.Lister(),
verificationPolicyLister: verificationpolicyInformer.Lister(),
cloudEventClient: cloudeventclient.Get(ctx),
metrics: pipelinerunmetrics.Get(ctx),
metrics: pipelinerunmetricsRecorder,
pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger),
resolutionRequester: resolution.NewCRDRequester(resolutionclient.Get(ctx), resolutionInformer.Lister()),
tracerProvider: tracerProvider,
Expand Down
9 changes: 7 additions & 2 deletions pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
secretinformer := secretinformer.Get(ctx)
spireClient := spire.GetControllerAPIClient(ctx)
tracerProvider := tracing.New(TracerProviderName, logger.Named("tracing"))
taskrunmetricsRecorder := taskrunmetrics.Get(ctx)
//nolint:contextcheck // OnStore methods does not support context as a parameter
configStore := config.NewStore(logger.Named("config-store"), taskrunmetrics.MetricsOnStore(logger), spire.OnStore(ctx, logger), tracerProvider.OnStore(secretinformer.Lister()))
configStore := config.NewStore(logger.Named("config-store"),
taskrunmetrics.OnStore(logger, taskrunmetricsRecorder),
spire.OnStore(ctx, logger),
tracerProvider.OnStore(secretinformer.Lister()),
)
configStore.WatchConfigs(cmw)

entrypointCache, err := pod.NewEntrypointCache(kubeclientset)
Expand All @@ -84,7 +89,7 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
limitrangeLister: limitrangeInformer.Lister(),
verificationPolicyLister: verificationpolicyInformer.Lister(),
cloudEventClient: cloudeventclient.Get(ctx),
metrics: taskrunmetrics.Get(ctx),
metrics: taskrunmetricsRecorder,
entrypointCache: entrypointCache,
podLister: podInformer.Lister(),
pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger),
Expand Down
22 changes: 15 additions & 7 deletions pkg/taskrunmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ var (
type Recorder struct {
mutex sync.Mutex
initialized bool
cfg *config.Metrics

ReportingPeriod time.Duration

Expand All @@ -144,15 +145,15 @@ var (
// to log the TaskRun related metrics
func NewRecorder(ctx context.Context) (*Recorder, error) {
once.Do(func() {
cfg := config.FromContextOrDefaults(ctx)
r = &Recorder{
initialized: true,
cfg: cfg.Metrics,

// Default to reporting metrics every 30s.
ReportingPeriod: 30 * time.Second,
}

cfg := config.FromContextOrDefaults(ctx)

errRegistering = viewRegister(cfg.Metrics)
if errRegistering != nil {
r.initialized = false
Expand Down Expand Up @@ -325,16 +326,17 @@ func viewUnregister() {
)
}

// MetricsOnStore returns a function that checks if metrics are configured for a config.Store, and registers it if so
func MetricsOnStore(logger *zap.SugaredLogger) func(name string,
value interface{}) {
// OnStore returns a function that checks if metrics are configured for a config.Store, and registers it if so
func OnStore(logger *zap.SugaredLogger, r *Recorder) func(name string, value interface{}) {
return func(name string, value interface{}) {
if name == config.GetMetricsConfigName() {
cfg, ok := value.(*config.Metrics)
if !ok {
logger.Error("Failed to do type insertion for extracting metrics config")
return
}
r.updateConfig(cfg)
// Update metrics according to the configuration
viewUnregister()
err := viewRegister(cfg)
if err != nil {
Expand Down Expand Up @@ -389,6 +391,13 @@ func getTaskTagName(tr *v1.TaskRun) string {
return taskName
}

func (r *Recorder) updateConfig(cfg *config.Metrics) {
r.mutex.Lock()
defer r.mutex.Unlock()

r.cfg = cfg
}

// DurationAndCount logs the duration of TaskRun execution and
// count for number of TaskRuns succeed or failed
// returns an error if its failed to log the metrics
Expand Down Expand Up @@ -454,8 +463,7 @@ func (r *Recorder) RunningTaskRuns(ctx context.Context, lister listers.TaskRunLi
return err
}

cfg := config.FromContextOrDefaults(ctx)
addNamespaceLabelToQuotaThrottleMetric := cfg.Metrics != nil && cfg.Metrics.ThrottleWithNamespace
addNamespaceLabelToQuotaThrottleMetric := r.cfg != nil && r.cfg.ThrottleWithNamespace

var runningTrs int
trsThrottledByQuota := map[string]int{}
Expand Down
8 changes: 4 additions & 4 deletions pkg/taskrunmetrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestUninitializedMetrics(t *testing.T) {
}
}

func TestMetricsOnStore(t *testing.T) {
func TestOnStore(t *testing.T) {
log := zap.NewExample()
defer log.Sync()
logger := log.Sugar()
Expand All @@ -92,7 +92,7 @@ func TestMetricsOnStore(t *testing.T) {
}

// We check that there's no change when incorrect config is passed
MetricsOnStore(logger)(config.GetMetricsConfigName(), &config.Store{})
OnStore(logger, metrics)(config.GetMetricsConfigName(), &config.Store{})
// Comparing function assign to struct with the one which should yield same value
if reflect.ValueOf(metrics.insertTaskTag).Pointer() != reflect.ValueOf(taskrunInsertTag).Pointer() {
t.Fatalf("metrics recorder shouldn't change during this OnStore call")
Expand All @@ -107,7 +107,7 @@ func TestMetricsOnStore(t *testing.T) {
}

// We test that there's no change when incorrect values in configmap is passed
MetricsOnStore(logger)(config.GetMetricsConfigName(), cfg)
OnStore(logger, metrics)(config.GetMetricsConfigName(), cfg)
// Comparing function assign to struct with the one which should yield same value
if reflect.ValueOf(metrics.insertTaskTag).Pointer() != reflect.ValueOf(taskrunInsertTag).Pointer() {
t.Fatalf("metrics recorder shouldn't change during this OnStore call")
Expand All @@ -121,7 +121,7 @@ func TestMetricsOnStore(t *testing.T) {
DurationPipelinerunType: config.DurationPipelinerunTypeLastValue,
}

MetricsOnStore(logger)(config.GetMetricsConfigName(), cfg)
OnStore(logger, metrics)(config.GetMetricsConfigName(), cfg)
if reflect.ValueOf(metrics.insertTaskTag).Pointer() != reflect.ValueOf(nilInsertTag).Pointer() {
t.Fatalf("metrics recorder didn't change during OnStore call")
}
Expand Down