From cfd0659f2229164c179a003d253dd8f118b7823f Mon Sep 17 00:00:00 2001 From: Daniel Hrabovcak Date: Tue, 4 Jun 2024 17:27:28 -0400 Subject: [PATCH] refactor: cleanup rule-evaluator flags --- cmd/rule-evaluator/README.md | 4 +- cmd/rule-evaluator/main.go | 370 ++++++++++++++++++++++------------- 2 files changed, 233 insertions(+), 141 deletions(-) diff --git a/cmd/rule-evaluator/README.md b/cmd/rule-evaluator/README.md index 362bb06543..8ec676ba18 100644 --- a/cmd/rule-evaluator/README.md +++ b/cmd/rule-evaluator/README.md @@ -131,7 +131,7 @@ Flags: ($KUBE_NAME) --query.project-id="" Project ID of the Google Cloud Monitoring scoping project to evaluate rules against. - --query.target-url="https://monitoring.googleapis.com/v1/projects/PROJECT_ID/location/global/prometheus" + --query.target-url=https://monitoring.googleapis.com/v1/projects/PROJECT_ID/location/global/prometheus The address of the Prometheus server query endpoint. (PROJECT_ID is replaced with the --query.project-id flag.) @@ -140,7 +140,7 @@ Flags: alert notification payload. Should point to an instance of a query frontend that accesses the same data as --query.target-url. - --query.credentials-file="" + --query.credentials-file= Credentials file for OAuth2 authentication with --query.target-url. --[no-]query.debug.disable-auth diff --git a/cmd/rule-evaluator/main.go b/cmd/rule-evaluator/main.go index 43beb152aa..cae2dffa9b 100644 --- a/cmd/rule-evaluator/main.go +++ b/cmd/rule-evaluator/main.go @@ -26,6 +26,7 @@ import ( "path/filepath" "runtime" "runtime/debug" + "strconv" "strings" "syscall" "time" @@ -110,32 +111,15 @@ func main() { haOpts := exportsetup.HAOptions{} haOpts.SetupFlags(a) - notifierOptions := notifier.Options{Registerer: reg} - - projectID := a.Flag("query.project-id", "Project ID of the Google Cloud Monitoring scoping project to evaluate rules against."). - Default(defaultProjectID).String() - - targetURL := a.Flag("query.target-url", fmt.Sprintf("The address of the Prometheus server query endpoint. (%s is replaced with the --query.project-id flag.)", projectIDVar)). - Default(fmt.Sprintf("https://monitoring.googleapis.com/v1/projects/%s/location/global/prometheus", projectIDVar)). - String() - - generatorURLStr := a.Flag("query.generator-url", "The base URL used for the generator URL in the alert notification payload. Should point to an instance of a query frontend that accesses the same data as --query.target-url."). - PlaceHolder("").String() - - queryCredentialsFile := a.Flag("query.credentials-file", "Credentials file for OAuth2 authentication with --query.target-url."). - Default("").String() - - disableAuth := a.Flag("query.debug.disable-auth", "Disable authentication (for debugging purposes)."). - Default("false").Bool() - - listenAddress := a.Flag("web.listen-address", "The address to listen on for HTTP requests."). - Default(":9091").String() - - configFile := a.Flag("config.file", "Prometheus configuration file path."). - Default("prometheus.yml").String() - - a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications."). - Default("10000").IntVar(¬ifierOptions.QueueCapacity) + evaluatorOpts := evaluatorOptions{ + TargetURL: Must(url.Parse(fmt.Sprintf("https://monitoring.googleapis.com/v1/projects/%s/location/global/prometheus", projectIDVar))), + ProjectID: defaultProjectID, + DisableAuth: false, + ListenAddress: ":9091", + ConfigFile: "prometheus.yml", + QueueCapacity: 10000, + } + evaluatorOpts.setupFlags(a) extraArgs, err := exportsetup.ExtraArgs() if err != nil { @@ -148,33 +132,15 @@ func main() { a.Usage(os.Args[1:]) os.Exit(2) } - startTime := time.Now() - - if *projectID == "" { - _ = level.Error(logger).Log("msg", "no --query.project-id was specified or could be derived from the environment") - os.Exit(2) - } - - *targetURL = strings.ReplaceAll(*targetURL, projectIDVar, *projectID) - generatorURL := &url.URL{} - if *generatorURLStr != "" { - var err error - generatorURL, err = url.Parse(*generatorURLStr) - if err != nil { - _ = level.Error(logger).Log("msg", "Invalid --query.generator-url", "err", err) - os.Exit(2) - } - } - - // Don't expand external labels on config file loading. It's a feature we like but we want to remain - // compatible with Prometheus and this is still an experimental feature, which we don't support. - if _, err := config.LoadFile(*configFile, false, false, logger); err != nil { - _ = level.Error(logger).Log("msg", fmt.Sprintf("Error loading config (--config.file=%s)", *configFile), "err", err) - os.Exit(2) + if err := evaluatorOpts.validate(); err != nil { + _ = level.Error(logger).Log("msg", "invalid command line argument", "err", err) + os.Exit(1) } + startTime := time.Now() ctx := context.Background() + metadataOpts.ExtractMetadata(logger, &exporterOpts) lease, err := haOpts.NewLease(logger, reg) if err != nil { @@ -188,70 +154,19 @@ func main() { } destination := export.NewStorage(exporter) - ctxRuleManager := context.Background() - ctxDiscover, cancelDiscover := context.WithCancel(context.Background()) - - opts := []option.ClientOption{ - option.WithScopes("https://www.googleapis.com/auth/monitoring.read"), - option.WithUserAgent(fmt.Sprintf("rule-evaluator/%s", version)), - } - if *queryCredentialsFile != "" { - opts = append(opts, option.WithCredentialsFile(*queryCredentialsFile)) - } - if *disableAuth { - opts = append(opts, - option.WithoutAuthentication(), - option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), - ) - } - transport, err := apihttp.NewTransport(ctxRuleManager, http.DefaultTransport, opts...) - if err != nil { - _ = level.Error(logger).Log("msg", "Creating proxy HTTP transport failed", "err", err) - os.Exit(1) + ctxDiscover, cancelDiscover := context.WithCancel(ctx) + discoveryManager := discovery.NewManager(ctxDiscover, log.With(logger, "component", "discovery manager notify"), discovery.Name("notify")) + notifierOptions := notifier.Options{ + Registerer: reg, + QueueCapacity: evaluatorOpts.QueueCapacity, } - roundTripper := makeInstrumentedRoundTripper(transport, reg) - client, err := api.NewClient(api.Config{ - Address: *targetURL, - RoundTripper: roundTripper, - }) + notificationManager := notifier.NewManager(¬ifierOptions, log.With(logger, "component", "notifier")) + rulesMetrics := rules.NewGroupMetrics(reg) + ruleEvaluator, err := newRuleEvaluator(ctx, logger, reg, &evaluatorOpts, version, destination, notificationManager, rulesMetrics) if err != nil { - _ = level.Error(logger).Log("msg", "Error creating client", "err", err) + _ = level.Error(logger).Log("msg", "Create rule-evaluator", "err", err) os.Exit(1) } - v1api := v1.NewAPI(client) - - queryFunc := func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { - v, warnings, err := QueryFunc(ctx, q, t, v1api) - if len(warnings) > 0 { - _ = level.Warn(logger).Log("msg", "Querying Prometheus instance returned warnings", "warn", warnings) - } - if err != nil { - return nil, fmt.Errorf("execute query: %w", err) - } - vec, ok := v.(promql.Vector) - if !ok { - return nil, fmt.Errorf("Error querying Prometheus, Expected type vector response. Actual type %v", v.Type()) - } - return vec, nil - } - - discoveryManager := discovery.NewManager(ctxDiscover, log.With(logger, "component", "discovery manager notify"), discovery.Name("notify")) - notificationManager := notifier.NewManager(¬ifierOptions, log.With(logger, "component", "notifier")) - - externalStorage := &queryStorage{ - api: v1api, - } - - ruleManager := rules.NewManager(&rules.ManagerOptions{ - ExternalURL: generatorURL, - QueryFunc: queryFunc, - Context: ctxRuleManager, - Appendable: destination, - Queryable: externalStorage, - Logger: logger, - NotifyFunc: sendAlerts(notificationManager, generatorURL.String()), - Metrics: rules.NewGroupMetrics(reg), - }) reloaders := []reloader{ { @@ -270,32 +185,15 @@ func main() { return discoveryManager.ApplyConfig(c) }, }, { - name: "rules", - reloader: func(cfg *config.Config) error { - // Get all rule files matching the configuration paths. - var files []string - for _, pat := range cfg.RuleFiles { - fs, err := filepath.Glob(pat) - if fs == nil || err != nil { - return fmt.Errorf("retrieving rule file: %s", pat) - } - files = append(files, fs...) - } - return ruleManager.Update( - time.Duration(cfg.GlobalConfig.EvaluationInterval), - files, - cfg.GlobalConfig.ExternalLabels, - "", - nil, - ) - }, + name: "rules", + reloader: ruleEvaluator.ApplyConfig, }, } configMetrics := newConfigMetrics(reg) // Do an initial load of the configuration for all components. - if err := reloadConfig(*configFile, logger, configMetrics, reloaders...); err != nil { + if err := reloadConfig(evaluatorOpts.ConfigFile, logger, configMetrics, reloaders...); err != nil { _ = level.Error(logger).Log("msg", "error loading config file.", "err", err) os.Exit(1) } @@ -323,10 +221,10 @@ func main() { { // Rule manager. g.Add(func() error { - ruleManager.Run() + ruleEvaluator.Run() return nil }, func(error) { - ruleManager.Stop() + ruleEvaluator.Stop() }) } { @@ -371,7 +269,7 @@ func main() { reloadCh := make(chan chan error) { // Web Server. - server := &http.Server{Addr: *listenAddress} + server := &http.Server{Addr: evaluatorOpts.ListenAddress} http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) http.HandleFunc("/-/reload", func(w http.ResponseWriter, r *http.Request) { @@ -423,7 +321,7 @@ func main() { } }) g.Add(func() error { - _ = level.Info(logger).Log("msg", "Starting web server", "listen", *listenAddress) + _ = level.Info(logger).Log("msg", "Starting web server", "listen", evaluatorOpts.ListenAddress) return server.ListenAndServe() }, func(error) { ctxServer, cancelServer := context.WithTimeout(ctx, time.Minute) @@ -443,11 +341,11 @@ func main() { for { select { case <-hup: - if err := reloadConfig(*configFile, logger, configMetrics, reloaders...); err != nil { + if err := reloadConfig(evaluatorOpts.ConfigFile, logger, configMetrics, reloaders...); err != nil { _ = level.Error(logger).Log("msg", "Error reloading config", "err", err) } case rc := <-reloadCh: - if err := reloadConfig(*configFile, logger, configMetrics, reloaders...); err != nil { + if err := reloadConfig(evaluatorOpts.ConfigFile, logger, configMetrics, reloaders...); err != nil { _ = level.Error(logger).Log("msg", "Error reloading config", "err", err) rc <- err } else { @@ -467,7 +365,7 @@ func main() { } // Run a test query to check status of rule evaluator. - _, err = queryFunc(ctx, "vector(1)", time.Now()) + _, err = ruleEvaluator.Query(ctx, "vector(1)", time.Now()) if err != nil { _ = level.Error(logger).Log("msg", "Error querying Prometheus instance", "err", err) } @@ -478,6 +376,108 @@ func main() { } } +// Must panics if there's any error. +func Must[T any](value T, err error) T { + if err != nil { + panic(err) + } + return value +} + +type evaluatorOptions struct { + TargetURL *url.URL + ProjectID string + GeneratorURL *url.URL + CredentialsFile string + DisableAuth bool + ListenAddress string + ConfigFile string + QueueCapacity int +} + +func (opts *evaluatorOptions) setupFlags(a *kingpin.Application) { + a.Flag("query.project-id", "Project ID of the Google Cloud Monitoring scoping project to evaluate rules against."). + Default(opts.ProjectID). + StringVar(&opts.ProjectID) + + a.Flag("query.target-url", fmt.Sprintf("The address of the Prometheus server query endpoint. (%s is replaced with the --query.project-id flag.)", projectIDVar)). + Default(opts.TargetURL.String()). + URLVar(&opts.TargetURL) + + a.Flag("query.generator-url", "The base URL used for the generator URL in the alert notification payload. Should point to an instance of a query frontend that accesses the same data as --query.target-url."). + PlaceHolder(""). + URLVar(&opts.GeneratorURL) + + a.Flag("query.credentials-file", "Credentials file for OAuth2 authentication with --query.target-url."). + PlaceHolder(""). + StringVar(&opts.CredentialsFile) + + a.Flag("query.debug.disable-auth", "Disable authentication (for debugging purposes)."). + Default("false"). + BoolVar(&opts.DisableAuth) + + a.Flag("web.listen-address", "The address to listen on for HTTP requests."). + Default(":9091"). + StringVar(&opts.ListenAddress) + + a.Flag("config.file", "Prometheus configuration file path."). + Default(opts.ConfigFile). + StringVar(&opts.ConfigFile) + + a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications."). + Default(strconv.Itoa(opts.QueueCapacity)). + IntVar(&opts.QueueCapacity) +} + +func (opts *evaluatorOptions) validate() error { + if opts.ProjectID == "" { + return errors.New("no --query.project-id was specified or could be derived from the environment") + } + + targetURL, err := url.Parse(strings.ReplaceAll(opts.TargetURL.String(), projectIDVar, opts.ProjectID)) + if err != nil { + return fmt.Errorf("unable to parse --query.target-url value %q: %w", opts.TargetURL.String(), err) + } + opts.TargetURL = targetURL + + // Don't expand external labels on config file loading. It's a feature we like but we want to + // remain compatible with Prometheus and this is still an experimental feature, which we don't + // support. + if _, err := config.LoadFile(opts.ConfigFile, false, false, log.NewNopLogger()); err != nil { + return fmt.Errorf("loading config %q: %w", opts.ConfigFile, err) + } + return nil +} + +func newAPI(ctx context.Context, opts *evaluatorOptions, version string, roundTripperFunc func(http.RoundTripper) http.RoundTripper) (v1.API, error) { + clientOpts := []option.ClientOption{ + option.WithScopes("https://www.googleapis.com/auth/monitoring.read"), + option.WithUserAgent(fmt.Sprintf("rule-evaluator/%s", version)), + } + if opts.CredentialsFile != "" { + clientOpts = append(clientOpts, option.WithCredentialsFile(opts.CredentialsFile)) + } + if opts.DisableAuth { + clientOpts = append(clientOpts, + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + ) + } + transport, err := apihttp.NewTransport(ctx, http.DefaultTransport, clientOpts...) + if err != nil { + return nil, err + } + roundTripper := roundTripperFunc(transport) + client, err := api.NewClient(api.Config{ + Address: opts.TargetURL.String(), + RoundTripper: roundTripper, + }) + if err != nil { + return nil, err + } + return v1.NewAPI(client), nil +} + // response wraps all Prometheus API responses. type response struct { Status string `json:"status"` @@ -495,7 +495,7 @@ func QueryFunc(ctx context.Context, q string, t time.Time, v1api v1.API) (parser } // sendAlerts returns the rules.NotifyFunc for a Notifier. -func sendAlerts(s *notifier.Manager, externalURL string) rules.NotifyFunc { +func sendAlerts(s *notifier.Manager, externalURL *url.URL) rules.NotifyFunc { return func(_ context.Context, expr string, alerts ...*rules.Alert) { var res []*notifier.Alert for _, alert := range alerts { @@ -509,8 +509,8 @@ func sendAlerts(s *notifier.Manager, externalURL string) rules.NotifyFunc { } else { a.EndsAt = alert.ValidUntil } - if externalURL != "" { - a.GeneratorURL = externalURL + strutil.TableLinkForExpression(expr) + if externalURL != nil { + a.GeneratorURL = externalURL.String() + strutil.TableLinkForExpression(expr) } res = append(res, a) } @@ -784,3 +784,95 @@ func makeInstrumentedRoundTripper(transport http.RoundTripper, reg prometheus.Re return promhttp.InstrumentRoundTripperCounter(queryCounter, promhttp.InstrumentRoundTripperDuration(queryHistogram, transport)) } + +type ruleEvaluator struct { + queryFunc rules.QueryFunc + rulesManager *rules.Manager +} + +func newRuleEvaluator( + ctx context.Context, + logger log.Logger, + reg prometheus.Registerer, + evaluatorOpts *evaluatorOptions, + version string, + appendable storage.Appendable, + notifierManager *notifier.Manager, + rulesMetrics *rules.Metrics, +) (*ruleEvaluator, error) { + v1api, err := newAPI(ctx, evaluatorOpts, version, func(rt http.RoundTripper) http.RoundTripper { + return makeInstrumentedRoundTripper(rt, reg) + }) + if err != nil { + return nil, fmt.Errorf("query client: %w", err) + } + queryFunc := newQueryFunc(logger, v1api) + + rulesManager := rules.NewManager(&rules.ManagerOptions{ + ExternalURL: evaluatorOpts.GeneratorURL, + QueryFunc: queryFunc, + Context: ctx, + Appendable: appendable, + Queryable: &queryStorage{ + api: v1api, + }, + Logger: logger, + NotifyFunc: sendAlerts(notifierManager, evaluatorOpts.GeneratorURL), + Metrics: rulesMetrics, + }) + + evaluator := ruleEvaluator{ + rulesManager: rulesManager, + queryFunc: queryFunc, + } + + return &evaluator, nil +} + +func (e *ruleEvaluator) ApplyConfig(cfg *config.Config) error { + // Get all rule files matching the configuration paths. + var files []string + for _, pat := range cfg.RuleFiles { + fs, err := filepath.Glob(pat) + if fs == nil || err != nil { + return fmt.Errorf("retrieving rule file: %s", pat) + } + files = append(files, fs...) + } + return e.rulesManager.Update( + time.Duration(cfg.GlobalConfig.EvaluationInterval), + files, + cfg.GlobalConfig.ExternalLabels, + "", + nil, + ) +} + +func (e *ruleEvaluator) Query(ctx context.Context, q string, t time.Time) (promql.Vector, error) { + return e.queryFunc(ctx, q, t) +} + +func (e *ruleEvaluator) Run() { + e.rulesManager.Run() +} + +func (e *ruleEvaluator) Stop() { + e.rulesManager.Stop() +} + +func newQueryFunc(logger log.Logger, v1api v1.API) rules.QueryFunc { + return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { + v, warnings, err := QueryFunc(ctx, q, t, v1api) + if len(warnings) > 0 { + _ = level.Warn(logger).Log("msg", "Querying Prometheus instance returned warnings", "warn", warnings) + } + if err != nil { + return nil, fmt.Errorf("execute query: %w", err) + } + vec, ok := v.(promql.Vector) + if !ok { + return nil, fmt.Errorf("query Prometheus, Expected type vector response. Actual type %v", v.Type()) + } + return vec, nil + } +}