Skip to content

Commit

Permalink
Use QueryEngineFactory in query API
Browse files Browse the repository at this point in the history
thanos query commands pass `QueryEngineFactory` to query apis
that will use engine based on query params. It will provide more
flexibility to create multiple engines in thanos.

Adds `defaultEngine` CLI flag, A default engine to use if not
specified with query params.

Signed-off-by: Pradyumna Krishna <git@onpy.in>
  • Loading branch information
PradyumnaKrishna committed Mar 27, 2023
1 parent 567d614 commit 88b209b
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 61 deletions.
45 changes: 17 additions & 28 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
v1 "github.com/prometheus/prometheus/web/api/v1"

"github.com/thanos-community/promql-engine/engine"
"github.com/thanos-community/promql-engine/api"

apiv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/compact/downsample"
Expand Down Expand Up @@ -102,6 +100,9 @@ func registerQuery(app *extkingpin.App) {
queryTimeout := extkingpin.ModelDuration(cmd.Flag("query.timeout", "Maximum time to process query by query node.").
Default("2m"))

defaultEngine := cmd.Flag("query.promql-engine", "Default PromQL engine to use.").Default(string(apiv1.PromqlEnginePrometheus)).
Enum(string(apiv1.PromqlEnginePrometheus), string(apiv1.PromqlEngineThanos))

promqlQueryMode := cmd.Flag("query.mode", "PromQL query mode. One of: local, distributed.").
Hidden().
Default(string(queryModeLocal)).
Expand Down Expand Up @@ -334,6 +335,7 @@ func registerQuery(app *extkingpin.App) {
*queryTelemetrySeriesQuantiles,
storeRateLimits,
queryMode(*promqlQueryMode),
*defaultEngine,
)
})
}
Expand Down Expand Up @@ -409,6 +411,7 @@ func runQuery(
queryTelemetrySeriesQuantiles []int64,
storeRateLimits store.SeriesSelectLimits,
queryMode queryMode,
defaultEngine string,
) error {
if alertQueryURL == "" {
lastColon := strings.LastIndex(httpBindAddr, ":")
Expand Down Expand Up @@ -658,41 +661,27 @@ func runQuery(
EnableAtModifier: true,
}

thanosEngineOpts := promql.EngineOpts{
Logger: logger,
Reg: extprom.WrapRegistererWithPrefix("tpe_", reg),
// TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703.
MaxSamples: math.MaxInt32,
Timeout: queryTimeout,
LookbackDelta: lookbackDelta,
NoStepSubqueryIntervalFn: func(int64) int64 {
return defaultEvaluationInterval.Milliseconds()
},
EnableNegativeOffset: true,
EnableAtModifier: true,
}

// An active query tracker will be added only if the user specifies a non-default path.
// Otherwise, the nil active query tracker from existing engine options will be used.
if activeQueryDir != "" {
engineOpts.ActiveQueryTracker = promql.NewActiveQueryTracker(activeQueryDir, maxConcurrentQueries, logger)
}

prometheusEngine := promql.NewEngine(engineOpts)

var thanosEngine v1.QueryEngine
if queryMode == queryModeLocal {
thanosEngine = engine.New(engine.Opts{EngineOpts: thanosEngineOpts})
} else {
remoteEngineEndpoints := query.NewRemoteEndpoints(logger, endpoints.GetQueryAPIClients, query.Opts{
var remoteEngineEndpoints api.RemoteEndpoints
if queryMode != queryModeLocal {
remoteEngineEndpoints = query.NewRemoteEndpoints(logger, endpoints.GetQueryAPIClients, query.Opts{
AutoDownsample: enableAutodownsampling,
ReplicaLabels: queryReplicaLabels,
Timeout: queryTimeout,
EnablePartialResponse: enableQueryPartialResponse,
})
thanosEngine = engine.NewDistributedEngine(engine.Opts{EngineOpts: thanosEngineOpts}, remoteEngineEndpoints)
}

engineFactory := apiv1.NewQueryEngineFactory(
engineOpts,
remoteEngineEndpoints,
)

lookbackDeltaCreator := LookbackDeltaFactory(engineOpts, dynamicLookbackDelta)

// Start query API + UI HTTP server.
Expand Down Expand Up @@ -723,8 +712,8 @@ func runQuery(
api := apiv1.NewQueryAPI(
logger,
endpoints.GetEndpointStatus,
prometheusEngine,
thanosEngine,
*engineFactory,
apiv1.PromqlEngineType(defaultEngine),
lookbackDeltaCreator,
queryableCreator,
// NOTE: Will share the same replica label as the query for now.
Expand Down Expand Up @@ -809,7 +798,7 @@ func runQuery(
info.WithQueryAPIInfoFunc(),
)

grpcAPI := apiv1.NewGRPCAPI(time.Now, queryReplicaLabels, queryableCreator, prometheusEngine, thanosEngine, lookbackDeltaCreator, instantDefaultMaxSourceResolution)
grpcAPI := apiv1.NewGRPCAPI(time.Now, queryReplicaLabels, queryableCreator, *engineFactory, apiv1.PromqlEngineType(defaultEngine), lookbackDeltaCreator, instantDefaultMaxSourceResolution)
storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, storeRateLimits)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)),
Expand Down
46 changes: 28 additions & 18 deletions pkg/api/query/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ type GRPCAPI struct {
now func() time.Time
replicaLabels []string
queryableCreate query.QueryableCreator
queryEngine v1.QueryEngine
thanosEngine v1.QueryEngine
engineFactory QueryEngineFactory
defaultEngine PromqlEngineType
lookbackDeltaCreate func(int64) time.Duration
defaultMaxResolutionSeconds time.Duration
}
Expand All @@ -33,17 +33,17 @@ func NewGRPCAPI(
now func() time.Time,
replicaLabels []string,
creator query.QueryableCreator,
queryEngine v1.QueryEngine,
thanosEngine v1.QueryEngine,
engineFactory QueryEngineFactory,
defaultEngine PromqlEngineType,
lookbackDeltaCreate func(int64) time.Duration,
defaultMaxResolutionSeconds time.Duration,
) *GRPCAPI {
return &GRPCAPI{
now: now,
replicaLabels: replicaLabels,
queryableCreate: creator,
queryEngine: queryEngine,
thanosEngine: thanosEngine,
engineFactory: engineFactory,
defaultEngine: defaultEngine,
lookbackDeltaCreate: lookbackDeltaCreate,
defaultMaxResolutionSeconds: defaultMaxResolutionSeconds,
}
Expand Down Expand Up @@ -104,13 +104,18 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer
)

var engine v1.QueryEngine
switch promqlEngineType(request.Engine) {
case promqlEnginePrometheus:
engine = g.queryEngine
case promqlEngineThanos:
engine = g.thanosEngine
engineParam := PromqlEngineType(request.Engine)
if engineParam == "" {
engineParam = g.defaultEngine
}

switch engineParam {
case PromqlEnginePrometheus:
engine = g.engineFactory.GetPrometheusEngine()
case PromqlEngineThanos:
engine = g.engineFactory.GetThanosEngine()
default:
engine = g.queryEngine
return status.Error(codes.InvalidArgument, "invalid engine parameter")
}
qry, err := engine.NewInstantQuery(queryable, &promql.QueryOpts{LookbackDelta: lookbackDelta}, request.Query, ts)
if err != nil {
Expand Down Expand Up @@ -201,13 +206,18 @@ func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Que
interval := time.Duration(request.IntervalSeconds) * time.Second

var engine v1.QueryEngine
switch promqlEngineType(request.Engine) {
case promqlEnginePrometheus:
engine = g.queryEngine
case promqlEngineThanos:
engine = g.thanosEngine
engineParam := PromqlEngineType(request.Engine)
if engineParam == "" {
engineParam = g.defaultEngine
}

switch engineParam {
case PromqlEnginePrometheus:
engine = g.engineFactory.GetPrometheusEngine()
case PromqlEngineThanos:
engine = g.engineFactory.GetThanosEngine()
default:
engine = g.queryEngine
return status.Error(codes.InvalidArgument, "invalid engine parameter")
}
qry, err := engine.NewRangeQuery(queryable, &promql.QueryOpts{LookbackDelta: lookbackDelta}, request.Query, startTime, endTime, interval)
if err != nil {
Expand Down
35 changes: 20 additions & 15 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,14 @@ const (
Stats = "stats"
ShardInfoParam = "shard_info"
LookbackDeltaParam = "lookback_delta"
EngineParam = "engine"
)

type promqlEngineType string
type PromqlEngineType string

const (
promqlEnginePrometheus promqlEngineType = "prometheus"
promqlEngineThanos promqlEngineType = "thanos"
PromqlEnginePrometheus PromqlEngineType = "prometheus"
PromqlEngineThanos PromqlEngineType = "thanos"
)

type QueryEngineFactory struct {
Expand Down Expand Up @@ -132,8 +133,8 @@ type QueryAPI struct {
gate gate.Gate
queryableCreate query.QueryableCreator
// queryEngine returns appropriate promql.Engine for a query with a given step.
queryEngine v1.QueryEngine
thanosEngine v1.QueryEngine
engineFactory QueryEngineFactory
defaultEngine PromqlEngineType
lookbackDeltaCreate func(int64) time.Duration
ruleGroups rules.UnaryClient
targets targets.UnaryClient
Expand Down Expand Up @@ -170,8 +171,8 @@ type seriesQueryPerformanceMetricsAggregator interface {
func NewQueryAPI(
logger log.Logger,
endpointStatus func() []query.EndpointStatus,
qe v1.QueryEngine,
te v1.QueryEngine,
engineFactory QueryEngineFactory,
de PromqlEngineType,
lookbackDeltaCreate func(int64) time.Duration,
c query.QueryableCreator,
ruleGroups rules.UnaryClient,
Expand Down Expand Up @@ -201,8 +202,8 @@ func NewQueryAPI(
return &QueryAPI{
baseAPI: api.NewBaseAPI(logger, disableCORS, flagsMap),
logger: logger,
queryEngine: qe,
thanosEngine: te,
engineFactory: engineFactory,
defaultEngine: de,
lookbackDeltaCreate: lookbackDeltaCreate,
queryableCreate: c,
gate: gate,
Expand Down Expand Up @@ -290,14 +291,18 @@ func (qapi *QueryAPI) parseEnableDedupParam(r *http.Request) (enableDeduplicatio
func (qapi *QueryAPI) parseEngineParam(r *http.Request) (queryEngine v1.QueryEngine, _ *api.ApiError) {
var engine v1.QueryEngine

param := promqlEngineType(r.FormValue("engine"))
param := PromqlEngineType(r.FormValue("engine"))
if param == "" {
param = qapi.defaultEngine
}

switch param {
case promqlEnginePrometheus:
engine = qapi.queryEngine
case promqlEngineThanos:
engine = qapi.thanosEngine
case PromqlEnginePrometheus:
engine = qapi.engineFactory.GetPrometheusEngine()
case PromqlEngineThanos:
engine = qapi.engineFactory.GetThanosEngine()
default:
engine = qapi.queryEngine
return nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("'%s' bad engine", param)}
}

return engine, nil
Expand Down

0 comments on commit 88b209b

Please sign in to comment.