From ba50f4d23b9800a5efb30175cc870120ffa33ed3 Mon Sep 17 00:00:00 2001 From: Pradyumna Krishna Date: Wed, 22 Mar 2023 04:17:35 +0000 Subject: [PATCH] Query: Switch engines using `engine` param Thanos query has two engine, prometheus (default) and thanos. A single engine runs through thanos query command at a time, and have to re run the command to switch between. This commit adds a functionality to run multiple engines at once and switch between them using `engine` query param inq query api. To avoid duplicate matrics registration, the thanos engine is provided with a different registerer having prefix `tpe_` (not been finalized yet). promql-engine command line flag has been removed that specifies the query engine to run. Currently this functionality not implemented on GRPCAPI. Signed-off-by: Pradyumna Krishna --- cmd/thanos/query.go | 62 ++++++++++++++++++++++----------------------- pkg/api/query/v1.go | 40 +++++++++++++++++++++++++++-- 2 files changed, 68 insertions(+), 34 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 5275fef55d3..2160cd1b837 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -66,13 +66,6 @@ const ( queryPushdown = "query-pushdown" ) -type promqlEngineType string - -const ( - promqlEnginePrometheus promqlEngineType = "prometheus" - promqlEngineThanos promqlEngineType = "thanos" -) - type queryMode string const ( @@ -109,9 +102,6 @@ func registerQuery(app *extkingpin.App) { queryTimeout := extkingpin.ModelDuration(cmd.Flag("query.timeout", "Maximum time to process query by query node."). Default("2m")) - promqlEngine := cmd.Flag("query.promql-engine", "PromQL engine to use.").Default(string(promqlEnginePrometheus)). - Enum(string(promqlEnginePrometheus), string(promqlEngineThanos)) - promqlQueryMode := cmd.Flag("query.mode", "PromQL query mode. One of: local, distributed."). Hidden(). Default(string(queryModeLocal)). @@ -341,7 +331,6 @@ func registerQuery(app *extkingpin.App) { *queryTelemetryDurationQuantiles, *queryTelemetrySamplesQuantiles, *queryTelemetrySeriesQuantiles, - promqlEngineType(*promqlEngine), storeRateLimits, queryMode(*promqlQueryMode), ) @@ -416,7 +405,6 @@ func runQuery( queryTelemetryDurationQuantiles []float64, queryTelemetrySamplesQuantiles []int64, queryTelemetrySeriesQuantiles []int64, - promqlEngine promqlEngineType, storeRateLimits store.SeriesSelectLimits, queryMode queryMode, ) error { @@ -663,30 +651,39 @@ func runQuery( EnableAtModifier: true, } + thanosEngineOpts := promql.EngineOpts{ + Logger: logger, + Reg: extprom.WrapRegistererWithPrefix("tpe_", reg), + // TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703. + MaxSamples: math.MaxInt32, + Timeout: queryTimeout, + LookbackDelta: lookbackDelta, + NoStepSubqueryIntervalFn: func(int64) int64 { + return defaultEvaluationInterval.Milliseconds() + }, + EnableNegativeOffset: true, + EnableAtModifier: true, + } + // An active query tracker will be added only if the user specifies a non-default path. // Otherwise, the nil active query tracker from existing engine options will be used. if activeQueryDir != "" { engineOpts.ActiveQueryTracker = promql.NewActiveQueryTracker(activeQueryDir, maxConcurrentQueries, logger) } - var queryEngine v1.QueryEngine - switch promqlEngine { - case promqlEnginePrometheus: - queryEngine = promql.NewEngine(engineOpts) - case promqlEngineThanos: - if queryMode == queryModeLocal { - queryEngine = engine.New(engine.Opts{EngineOpts: engineOpts}) - } else { - remoteEngineEndpoints := query.NewRemoteEndpoints(logger, endpoints.GetQueryAPIClients, query.Opts{ - AutoDownsample: enableAutodownsampling, - ReplicaLabels: queryReplicaLabels, - Timeout: queryTimeout, - EnablePartialResponse: enableQueryPartialResponse, - }) - queryEngine = engine.NewDistributedEngine(engine.Opts{EngineOpts: engineOpts}, remoteEngineEndpoints) - } - default: - return errors.Errorf("unknown query.promql-engine type %v", promqlEngine) + prometheusEngine := promql.NewEngine(engineOpts) + + var thanosEngine v1.QueryEngine + if queryMode == queryModeLocal { + thanosEngine = engine.New(engine.Opts{EngineOpts: thanosEngineOpts}) + } else { + remoteEngineEndpoints := query.NewRemoteEndpoints(logger, endpoints.GetQueryAPIClients, query.Opts{ + AutoDownsample: enableAutodownsampling, + ReplicaLabels: queryReplicaLabels, + Timeout: queryTimeout, + EnablePartialResponse: enableQueryPartialResponse, + }) + thanosEngine = engine.NewDistributedEngine(engine.Opts{EngineOpts: thanosEngineOpts}, remoteEngineEndpoints) } lookbackDeltaCreator := LookbackDeltaFactory(engineOpts, dynamicLookbackDelta) @@ -719,7 +716,8 @@ func runQuery( api := apiv1.NewQueryAPI( logger, endpoints.GetEndpointStatus, - queryEngine, + prometheusEngine, + thanosEngine, lookbackDeltaCreator, queryableCreator, // NOTE: Will share the same replica label as the query for now. @@ -804,7 +802,7 @@ func runQuery( info.WithQueryAPIInfoFunc(), ) - grpcAPI := apiv1.NewGRPCAPI(time.Now, queryReplicaLabels, queryableCreator, queryEngine, lookbackDeltaCreator, instantDefaultMaxSourceResolution) + grpcAPI := apiv1.NewGRPCAPI(time.Now, queryReplicaLabels, queryableCreator, prometheusEngine, lookbackDeltaCreator, instantDefaultMaxSourceResolution) storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, storeRateLimits) s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)), diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 918bcbf5fdd..1462cf8ff6c 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -75,6 +75,13 @@ const ( LookbackDeltaParam = "lookback_delta" ) +type promqlEngineType string + +const ( + promqlEnginePrometheus promqlEngineType = "prometheus" + promqlEngineThanos promqlEngineType = "thanos" +) + // QueryAPI is an API used by Thanos Querier. type QueryAPI struct { baseAPI *api.BaseAPI @@ -83,6 +90,7 @@ type QueryAPI struct { queryableCreate query.QueryableCreator // queryEngine returns appropriate promql.Engine for a query with a given step. queryEngine v1.QueryEngine + thanosEngine v1.QueryEngine lookbackDeltaCreate func(int64) time.Duration ruleGroups rules.UnaryClient targets targets.UnaryClient @@ -120,6 +128,7 @@ func NewQueryAPI( logger log.Logger, endpointStatus func() []query.EndpointStatus, qe v1.QueryEngine, + te v1.QueryEngine, lookbackDeltaCreate func(int64) time.Duration, c query.QueryableCreator, ruleGroups rules.UnaryClient, @@ -150,6 +159,7 @@ func NewQueryAPI( baseAPI: api.NewBaseAPI(logger, disableCORS, flagsMap), logger: logger, queryEngine: qe, + thanosEngine: te, lookbackDeltaCreate: lookbackDeltaCreate, queryableCreate: c, gate: gate, @@ -234,6 +244,22 @@ func (qapi *QueryAPI) parseEnableDedupParam(r *http.Request) (enableDeduplicatio return enableDeduplication, nil } +func (qapi *QueryAPI) parseEngineParam(r *http.Request) (queryEngine v1.QueryEngine, _ *api.ApiError) { + var engine v1.QueryEngine + + param := promqlEngineType(r.FormValue("engine")) + switch param { + case promqlEnginePrometheus: + engine = qapi.queryEngine + case promqlEngineThanos: + engine = qapi.thanosEngine + default: + engine = qapi.queryEngine + } + + return engine, nil +} + func (qapi *QueryAPI) parseReplicaLabelsParam(r *http.Request) (replicaLabels []string, _ *api.ApiError) { if err := r.ParseForm(); err != nil { return nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")} @@ -393,6 +419,11 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro return nil, nil, apiErr, func() {} } + engine, apiErr := qapi.parseEngineParam(r) + if apiErr != nil { + return nil, nil, apiErr, func() {} + } + lookbackDelta := qapi.lookbackDeltaCreate(maxSourceResolution) // Get custom lookback delta from request. lookbackDeltaFromReq, apiErr := qapi.parseLookbackDeltaParam(r) @@ -408,7 +439,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro defer span.Finish() var seriesStats []storepb.SeriesStatsCounter - qry, err := qapi.queryEngine.NewInstantQuery( + qry, err := engine.NewInstantQuery( qapi.queryableCreate( enableDedup, replicaLabels, @@ -541,6 +572,11 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap return nil, nil, apiErr, func() {} } + engine, apiErr := qapi.parseEngineParam(r) + if apiErr != nil { + return nil, nil, apiErr, func() {} + } + lookbackDelta := qapi.lookbackDeltaCreate(maxSourceResolution) // Get custom lookback delta from request. lookbackDeltaFromReq, apiErr := qapi.parseLookbackDeltaParam(r) @@ -559,7 +595,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap defer span.Finish() var seriesStats []storepb.SeriesStatsCounter - qry, err := qapi.queryEngine.NewRangeQuery( + qry, err := engine.NewRangeQuery( qapi.queryableCreate( enableDedup, replicaLabels,