Skip to content

Commit

Permalink
Query: Switch engines using engine param
Browse files Browse the repository at this point in the history
Thanos query has two engine, prometheus (default) and thanos.
A single engine runs through thanos query command at a time, and
have to re run the command to switch between.

This commit adds a functionality to run multiple engines at once
and switch between them using `engine` query param inq query api.

To avoid duplicate matrics registration, the thanos engine is
provided with a different registerer having prefix `tpe_` (not
been finalized yet).

promql-engine command line flag has been removed that specifies
the query engine to run.

Currently this functionality not implemented on GRPCAPI.

Signed-off-by: Pradyumna Krishna <git@onpy.in>
  • Loading branch information
PradyumnaKrishna committed Mar 22, 2023
1 parent 36de497 commit ba50f4d
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 34 deletions.
62 changes: 30 additions & 32 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,6 @@ const (
queryPushdown = "query-pushdown"
)

type promqlEngineType string

const (
promqlEnginePrometheus promqlEngineType = "prometheus"
promqlEngineThanos promqlEngineType = "thanos"
)

type queryMode string

const (
Expand Down Expand Up @@ -109,9 +102,6 @@ func registerQuery(app *extkingpin.App) {
queryTimeout := extkingpin.ModelDuration(cmd.Flag("query.timeout", "Maximum time to process query by query node.").
Default("2m"))

promqlEngine := cmd.Flag("query.promql-engine", "PromQL engine to use.").Default(string(promqlEnginePrometheus)).
Enum(string(promqlEnginePrometheus), string(promqlEngineThanos))

promqlQueryMode := cmd.Flag("query.mode", "PromQL query mode. One of: local, distributed.").
Hidden().
Default(string(queryModeLocal)).
Expand Down Expand Up @@ -341,7 +331,6 @@ func registerQuery(app *extkingpin.App) {
*queryTelemetryDurationQuantiles,
*queryTelemetrySamplesQuantiles,
*queryTelemetrySeriesQuantiles,
promqlEngineType(*promqlEngine),
storeRateLimits,
queryMode(*promqlQueryMode),
)
Expand Down Expand Up @@ -416,7 +405,6 @@ func runQuery(
queryTelemetryDurationQuantiles []float64,
queryTelemetrySamplesQuantiles []int64,
queryTelemetrySeriesQuantiles []int64,
promqlEngine promqlEngineType,
storeRateLimits store.SeriesSelectLimits,
queryMode queryMode,
) error {
Expand Down Expand Up @@ -663,30 +651,39 @@ func runQuery(
EnableAtModifier: true,
}

thanosEngineOpts := promql.EngineOpts{
Logger: logger,
Reg: extprom.WrapRegistererWithPrefix("tpe_", reg),
// TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703.
MaxSamples: math.MaxInt32,
Timeout: queryTimeout,
LookbackDelta: lookbackDelta,
NoStepSubqueryIntervalFn: func(int64) int64 {
return defaultEvaluationInterval.Milliseconds()
},
EnableNegativeOffset: true,
EnableAtModifier: true,
}

// An active query tracker will be added only if the user specifies a non-default path.
// Otherwise, the nil active query tracker from existing engine options will be used.
if activeQueryDir != "" {
engineOpts.ActiveQueryTracker = promql.NewActiveQueryTracker(activeQueryDir, maxConcurrentQueries, logger)
}

var queryEngine v1.QueryEngine
switch promqlEngine {
case promqlEnginePrometheus:
queryEngine = promql.NewEngine(engineOpts)
case promqlEngineThanos:
if queryMode == queryModeLocal {
queryEngine = engine.New(engine.Opts{EngineOpts: engineOpts})
} else {
remoteEngineEndpoints := query.NewRemoteEndpoints(logger, endpoints.GetQueryAPIClients, query.Opts{
AutoDownsample: enableAutodownsampling,
ReplicaLabels: queryReplicaLabels,
Timeout: queryTimeout,
EnablePartialResponse: enableQueryPartialResponse,
})
queryEngine = engine.NewDistributedEngine(engine.Opts{EngineOpts: engineOpts}, remoteEngineEndpoints)
}
default:
return errors.Errorf("unknown query.promql-engine type %v", promqlEngine)
prometheusEngine := promql.NewEngine(engineOpts)

var thanosEngine v1.QueryEngine
if queryMode == queryModeLocal {
thanosEngine = engine.New(engine.Opts{EngineOpts: thanosEngineOpts})
} else {
remoteEngineEndpoints := query.NewRemoteEndpoints(logger, endpoints.GetQueryAPIClients, query.Opts{
AutoDownsample: enableAutodownsampling,
ReplicaLabels: queryReplicaLabels,
Timeout: queryTimeout,
EnablePartialResponse: enableQueryPartialResponse,
})
thanosEngine = engine.NewDistributedEngine(engine.Opts{EngineOpts: thanosEngineOpts}, remoteEngineEndpoints)
}

lookbackDeltaCreator := LookbackDeltaFactory(engineOpts, dynamicLookbackDelta)
Expand Down Expand Up @@ -719,7 +716,8 @@ func runQuery(
api := apiv1.NewQueryAPI(
logger,
endpoints.GetEndpointStatus,
queryEngine,
prometheusEngine,
thanosEngine,
lookbackDeltaCreator,
queryableCreator,
// NOTE: Will share the same replica label as the query for now.
Expand Down Expand Up @@ -804,7 +802,7 @@ func runQuery(
info.WithQueryAPIInfoFunc(),
)

grpcAPI := apiv1.NewGRPCAPI(time.Now, queryReplicaLabels, queryableCreator, queryEngine, lookbackDeltaCreator, instantDefaultMaxSourceResolution)
grpcAPI := apiv1.NewGRPCAPI(time.Now, queryReplicaLabels, queryableCreator, prometheusEngine, lookbackDeltaCreator, instantDefaultMaxSourceResolution)
storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, storeRateLimits)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)),
Expand Down
40 changes: 38 additions & 2 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ const (
LookbackDeltaParam = "lookback_delta"
)

type promqlEngineType string

const (
promqlEnginePrometheus promqlEngineType = "prometheus"
promqlEngineThanos promqlEngineType = "thanos"
)

// QueryAPI is an API used by Thanos Querier.
type QueryAPI struct {
baseAPI *api.BaseAPI
Expand All @@ -83,6 +90,7 @@ type QueryAPI struct {
queryableCreate query.QueryableCreator
// queryEngine returns appropriate promql.Engine for a query with a given step.
queryEngine v1.QueryEngine
thanosEngine v1.QueryEngine
lookbackDeltaCreate func(int64) time.Duration
ruleGroups rules.UnaryClient
targets targets.UnaryClient
Expand Down Expand Up @@ -120,6 +128,7 @@ func NewQueryAPI(
logger log.Logger,
endpointStatus func() []query.EndpointStatus,
qe v1.QueryEngine,
te v1.QueryEngine,
lookbackDeltaCreate func(int64) time.Duration,
c query.QueryableCreator,
ruleGroups rules.UnaryClient,
Expand Down Expand Up @@ -150,6 +159,7 @@ func NewQueryAPI(
baseAPI: api.NewBaseAPI(logger, disableCORS, flagsMap),
logger: logger,
queryEngine: qe,
thanosEngine: te,
lookbackDeltaCreate: lookbackDeltaCreate,
queryableCreate: c,
gate: gate,
Expand Down Expand Up @@ -234,6 +244,22 @@ func (qapi *QueryAPI) parseEnableDedupParam(r *http.Request) (enableDeduplicatio
return enableDeduplication, nil
}

func (qapi *QueryAPI) parseEngineParam(r *http.Request) (queryEngine v1.QueryEngine, _ *api.ApiError) {
var engine v1.QueryEngine

param := promqlEngineType(r.FormValue("engine"))
switch param {
case promqlEnginePrometheus:
engine = qapi.queryEngine
case promqlEngineThanos:
engine = qapi.thanosEngine
default:
engine = qapi.queryEngine
}

return engine, nil
}

func (qapi *QueryAPI) parseReplicaLabelsParam(r *http.Request) (replicaLabels []string, _ *api.ApiError) {
if err := r.ParseForm(); err != nil {
return nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")}
Expand Down Expand Up @@ -393,6 +419,11 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
return nil, nil, apiErr, func() {}
}

engine, apiErr := qapi.parseEngineParam(r)
if apiErr != nil {
return nil, nil, apiErr, func() {}
}

lookbackDelta := qapi.lookbackDeltaCreate(maxSourceResolution)
// Get custom lookback delta from request.
lookbackDeltaFromReq, apiErr := qapi.parseLookbackDeltaParam(r)
Expand All @@ -408,7 +439,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
defer span.Finish()

var seriesStats []storepb.SeriesStatsCounter
qry, err := qapi.queryEngine.NewInstantQuery(
qry, err := engine.NewInstantQuery(
qapi.queryableCreate(
enableDedup,
replicaLabels,
Expand Down Expand Up @@ -541,6 +572,11 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, apiErr, func() {}
}

engine, apiErr := qapi.parseEngineParam(r)
if apiErr != nil {
return nil, nil, apiErr, func() {}
}

lookbackDelta := qapi.lookbackDeltaCreate(maxSourceResolution)
// Get custom lookback delta from request.
lookbackDeltaFromReq, apiErr := qapi.parseLookbackDeltaParam(r)
Expand All @@ -559,7 +595,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
defer span.Finish()

var seriesStats []storepb.SeriesStatsCounter
qry, err := qapi.queryEngine.NewRangeQuery(
qry, err := engine.NewRangeQuery(
qapi.queryableCreate(
enableDedup,
replicaLabels,
Expand Down

0 comments on commit ba50f4d

Please sign in to comment.