From 3e3b2015ec2c5f9dbe5e1bd0fee5be65bf0ff5a5 Mon Sep 17 00:00:00 2001 From: Jaana Dogan Date: Fri, 13 Nov 2020 06:02:36 -0800 Subject: [PATCH] Fix the scraper/discover manager coordination on the Prometheus receiver (#2089) * Fix the scraper/discover manager coordination on the Prometheus receiver The receiver contains various unnecessary sections. Rewriting the receiver's Start for better maintainability. Related to #1909. * Use the background context * Remove dead code --- .../prometheusreceiver/internal/ocastore.go | 11 +-- .../prometheusreceiver/metrics_receiver.go | 96 ++++++++----------- service/internal/resources.go | 12 +-- 3 files changed, 51 insertions(+), 68 deletions(-) diff --git a/receiver/prometheusreceiver/internal/ocastore.go b/receiver/prometheusreceiver/internal/ocastore.go index 55e1f53719d8..cbdb7e74a9f4 100644 --- a/receiver/prometheusreceiver/internal/ocastore.go +++ b/receiver/prometheusreceiver/internal/ocastore.go @@ -17,7 +17,6 @@ package internal import ( "context" "io" - "sync" "sync/atomic" "github.com/prometheus/prometheus/pkg/labels" @@ -47,16 +46,17 @@ type OcaStore interface { // OpenCensus Store for prometheus type ocaStore struct { - running int32 - logger *zap.Logger + ctx context.Context + + running int32 // access atomically sink consumer.MetricsConsumer mc *mService - once *sync.Once - ctx context.Context jobsMap *JobsMap useStartTimeMetric bool startTimeMetricRegex string receiverName string + + logger *zap.Logger } // NewOcaStore returns an ocaStore instance, which can be acted as prometheus' scrape.Appendable @@ -66,7 +66,6 @@ func NewOcaStore(ctx context.Context, sink consumer.MetricsConsumer, logger *zap ctx: ctx, sink: sink, logger: logger, - once: &sync.Once{}, jobsMap: jobsMap, useStartTimeMetric: useStartTimeMetric, startTimeMetricRegex: startTimeMetricRegex, diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index 639e3546bd35..3c9828b154e3 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -16,7 +16,6 @@ package prometheusreceiver import ( "context" - "sync" "time" "github.com/prometheus/prometheus/discovery" @@ -25,18 +24,16 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/receiver/prometheusreceiver/internal" ) // pReceiver is the type that provides Prometheus scraper/receiver functionality. type pReceiver struct { - startOnce sync.Once - stopOnce sync.Once - cfg *Config - consumer consumer.MetricsConsumer - cancel context.CancelFunc - logger *zap.Logger + cfg *Config + consumer consumer.MetricsConsumer + cancelFunc context.CancelFunc + + logger *zap.Logger } // New creates a new prometheus.Receiver reference. @@ -51,62 +48,49 @@ func newPrometheusReceiver(logger *zap.Logger, cfg *Config, next consumer.Metric // Start is the method that starts Prometheus scraping and it // is controlled by having previously defined a Configuration using perhaps New. -func (pr *pReceiver) Start(_ context.Context, host component.Host) error { - pr.startOnce.Do(func() { - ctx := context.Background() - c, cancel := context.WithCancel(ctx) - pr.cancel = cancel - c = obsreport.ReceiverContext(c, pr.cfg.Name(), "http") - var jobsMap *internal.JobsMap - if !pr.cfg.UseStartTimeMetric { - jobsMap = internal.NewJobsMap(2 * time.Minute) - } - app := internal.NewOcaStore(c, pr.consumer, pr.logger, jobsMap, pr.cfg.UseStartTimeMetric, pr.cfg.StartTimeMetricRegex, pr.cfg.Name()) - // need to use a logger with the gokitLog interface - l := internal.NewZapToGokitLogAdapter(pr.logger) - scrapeManager := scrape.NewManager(l, app) - app.SetScrapeManager(scrapeManager) - discoveryManagerScrape := discovery.NewManager(ctx, l) - go func() { - if err := discoveryManagerScrape.Run(); err != nil { - host.ReportFatalError(err) - } - }() - if err := scrapeManager.ApplyConfig(pr.cfg.PrometheusConfig); err != nil { - host.ReportFatalError(err) - return - } +func (r *pReceiver) Start(ctx context.Context, host component.Host) error { + discoveryCtx, cancel := context.WithCancel(context.Background()) + r.cancelFunc = cancel - // Run the scrape manager. - syncConfig := make(chan bool) - errsChan := make(chan error, 1) - go func() { - defer close(errsChan) - <-time.After(100 * time.Millisecond) - close(syncConfig) - if err := scrapeManager.Run(discoveryManagerScrape.SyncCh()); err != nil { - errsChan <- err - } - }() - <-syncConfig - // By this point we've given time to the scrape manager - // to start applying its original configuration. + logger := internal.NewZapToGokitLogAdapter(r.logger) - discoveryCfg := make(map[string]discovery.Configs) - for _, scrapeConfig := range pr.cfg.PrometheusConfig.ScrapeConfigs { - discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs + discoveryManager := discovery.NewManager(discoveryCtx, logger) + discoveryCfg := make(map[string]discovery.Configs) + for _, scrapeConfig := range r.cfg.PrometheusConfig.ScrapeConfigs { + discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs + } + if err := discoveryManager.ApplyConfig(discoveryCfg); err != nil { + return err + } + go func() { + if err := discoveryManager.Run(); err != nil { + r.logger.Error("Discovery manager failed", zap.Error(err)) + host.ReportFatalError(err) } + }() + + var jobsMap *internal.JobsMap + if !r.cfg.UseStartTimeMetric { + jobsMap = internal.NewJobsMap(2 * time.Minute) + } + ocaStore := internal.NewOcaStore(ctx, r.consumer, r.logger, jobsMap, r.cfg.UseStartTimeMetric, r.cfg.StartTimeMetricRegex, r.cfg.Name()) - // Now trigger the discovery notification to the scrape manager. - if err := discoveryManagerScrape.ApplyConfig(discoveryCfg); err != nil { - errsChan <- err + scrapeManager := scrape.NewManager(logger, ocaStore) + ocaStore.SetScrapeManager(scrapeManager) + if err := scrapeManager.ApplyConfig(r.cfg.PrometheusConfig); err != nil { + return err + } + go func() { + if err := scrapeManager.Run(discoveryManager.SyncCh()); err != nil { + r.logger.Error("Scrape manager failed", zap.Error(err)) + host.ReportFatalError(err) } - }) + }() return nil } // Shutdown stops and cancels the underlying Prometheus scrapers. -func (pr *pReceiver) Shutdown(context.Context) error { - pr.stopOnce.Do(pr.cancel) +func (r *pReceiver) Shutdown(context.Context) error { + r.cancelFunc() return nil } diff --git a/service/internal/resources.go b/service/internal/resources.go index 3777769f2b33..c5e93cd1c43e 100644 --- a/service/internal/resources.go +++ b/service/internal/resources.go @@ -227,7 +227,7 @@ var _escData = map[string]*_escFile{ name: "component_header.html", local: "templates/component_header.html", size: 156, - modtime: 1594178791, + modtime: 1605208512, compressed: ` H4sIAAAAAAAC/1SMsQqDMBRFd7/iIq7q5lBiltKt9B8CPklQX6R1e9x/L6ZQ2vXcc65ZE3AZ0V3ztmcV PW467TnpQVZmzZp0Kfs96VJQizTjw1uyAgAXB+8C4lPmsT4fydqbdY+wCen64F0fB19iWV/yF/54X0en @@ -239,7 +239,7 @@ U3kHAAD//zT+SdCcAAAA name: "extensions_table.html", local: "templates/extensions_table.html", size: 353, - modtime: 1594178791, + modtime: 1605208512, compressed: ` H4sIAAAAAAAC/2SQwU7DMBBE7/2KlemRNJwjxxwQHDnwB248DRbOOnK2tGD531HTQIvqk1fzZjU7Wuw2 gCb5CmjVNiaHVE2j7Tz3DT0osyIiynltqWlp8xSHMTJYntmN0bOUsgDJcg9ap3jw7HC8n7+z5y0epgU7 @@ -252,7 +252,7 @@ oxX5HeETfMGv9NPTkv4i2e6jT3HPrqE7AEui8yaECbdWkzPYUXWlaHFkg++5VR1YkJTRlt4Tdq06HVfK name: "footer.html", local: "templates/footer.html", size: 15, - modtime: 1594178791, + modtime: 1605208512, compressed: ` H4sIAAAAAAAC/7LRT8pPqbTjstHPKMnNsQMEAAD//wEFevAPAAAA `, @@ -262,7 +262,7 @@ H4sIAAAAAAAC/7LRT8pPqbTjstHPKMnNsQMEAAD//wEFevAPAAAA name: "header.html", local: "templates/header.html", size: 467, - modtime: 1594178791, + modtime: 1605208512, compressed: ` H4sIAAAAAAAC/5TRMU8sIRAH8P4+BY/25eC9szGGxUItLIwW11giO7uMB8wG5rxsLvfdDdnTxNhoBeFP fpnM3/y5fbzZPj/dicAp2pVph4guj52ELK0J4Hq7EkIIk4Cd8MGVCtzJPQ/rS3mOGDmCPR7Vtl1OJ6OX @@ -276,7 +276,7 @@ vuDEoocBiqjF/5RszGuV1uhFsCujl0bMC/Vz62vzZe1hY98DAAD//7qRGmLTAQAA name: "pipelines_table.html", local: "templates/pipelines_table.html", size: 1946, - modtime: 1594178791, + modtime: 1605208512, compressed: ` H4sIAAAAAAAC/7SVwXLTMBCG7zyFxnRyIjVcU1scSpnhAMN0eAFZ2gRNlZVmJbdujd+dsWyrTp0LtL5k rOjX/tlv/8hFEJUB5sOjgTKrLCmgrXdCajzs2MeMv2OMsSLQ8DAsFJPWeCew/MSE0QcsDewDLyr+tTbm @@ -293,7 +293,7 @@ QeMmXNC4hCvdNKvQgsYtacFoGWFFxSvCNl+lu3HQFXl8JfO/AQAA//9We3KLmgcAAA== name: "properties_table.html", local: "templates/properties_table.html", size: 420, - modtime: 1594178791, + modtime: 1605208512, compressed: ` H4sIAAAAAAAC/2SRwW7DIBBE7/6KVRr1VMc5u5gfqFT11Ds2U8sqWVuwqRoR/r1yTCpb4YAEO48ZDarV MR7ezQkp1apqdaHEtA4U5OLQ7NrRW/gyTKYbuK/puNMFEVGMtB/Y4pfqho6UUr71hnvk0Qvt4XACyyw6