Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter instant queries and shard them. #3984

Merged
merged 29 commits into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
916a1e7
Filter instan queries and shard them.
jeschkies Jul 12, 2021
8b6bca1
Merge remote-tracking branch 'grafana/main' into karsten/instant-quer…
jeschkies Jul 12, 2021
63f7aec
[WIP] Test instant query sharding
jeschkies Jul 12, 2021
b7a7a01
Trace casting error.
jeschkies Jul 12, 2021
f6a8f29
WRap LokiInstantRequest for Params interface.
jeschkies Jul 13, 2021
993e398
Convert paras to request.
jeschkies Jul 13, 2021
d4190f6
Convert vector.
jeschkies Jul 13, 2021
2dc6ae8
Use proper query time stamp.
jeschkies Jul 13, 2021
4ec4aca
Assert number of calls.
jeschkies Jul 13, 2021
41695f3
Convert vector to multiple samples.
jeschkies Jul 13, 2021
857f0d4
Format code.
jeschkies Jul 13, 2021
91cefd2
Use type switch.
jeschkies Jul 13, 2021
72f5b39
Format code.
jeschkies Jul 13, 2021
91c08e7
Rename Matrix to proto conversion.
jeschkies Jul 15, 2021
b074567
Remvoe comment on caching.
jeschkies Jul 19, 2021
b56c00f
Update pkg/querier/queryrange/roundtrip_test.go
jeschkies Jul 19, 2021
b0120e1
Remove nil middlware.
jeschkies Jul 20, 2021
810a0a9
Handle LokiInstantRequest correctly.
jeschkies Jul 20, 2021
f640c10
Assert reponse type.
jeschkies Jul 20, 2021
886bbca
Return LokiPromResponse.
jeschkies Jul 20, 2021
d0f0873
Verify err in tests.
jeschkies Jul 20, 2021
dc6fa09
Set shards.
jeschkies Jul 20, 2021
768acb1
Correct result type.
jeschkies Jul 21, 2021
cf4a29f
Use limits middleware.
jeschkies Jul 21, 2021
b8e4f05
Use shars parameter in querier.
jeschkies Jul 21, 2021
060ba66
Return vector.
jeschkies Jul 22, 2021
baf7f0b
Unify params.
jeschkies Jul 22, 2021
c7a15f2
Merge remote-tracking branch 'grafana/main' into karsten/instant-quer…
jeschkies Jul 22, 2021
10f53b6
Change direction to FORWARD.
jeschkies Jul 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ type InstantQuery struct {
Ts time.Time
Limit uint32
Direction logproto.Direction
Shards []string
}

// ParseInstantQuery parses an InstantQuery request from an http request.
Expand All @@ -261,6 +262,7 @@ func ParseInstantQuery(r *http.Request) (*InstantQuery, error) {
if err != nil {
return nil, err
}
request.Shards = shards(r)

request.Direction, err = direction(r)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (q *Querier) InstantQueryHandler(w http.ResponseWriter, r *http.Request) {
0,
request.Direction,
request.Limit,
nil,
request.Shards,
)
query := q.engine.Query(params)
result, err := query.Exec(ctx)
Expand Down
209 changes: 184 additions & 25 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,48 @@ func (r *LokiRequest) LogToSpan(sp opentracing.Span) {

func (*LokiRequest) GetCachingOptions() (res queryrange.CachingOptions) { return }

func (r *LokiInstantRequest) GetStep() int64 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit for later: we can probably serialize a lot of these methods into an embedded struct for reuse

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean?

return 0
}

func (r *LokiInstantRequest) GetEnd() int64 {
return r.TimeTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
}

func (r *LokiInstantRequest) GetStart() int64 {
return r.TimeTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
}

func (r *LokiInstantRequest) WithStartEnd(s int64, e int64) queryrange.Request {
new := *r
new.TimeTs = time.Unix(0, s*int64(time.Millisecond))
return &new
}

func (r *LokiInstantRequest) WithQuery(query string) queryrange.Request {
new := *r
new.Query = query
return &new
}

func (r *LokiInstantRequest) WithShards(shards logql.Shards) *LokiInstantRequest {
new := *r
new.Shards = shards.Encode()
return &new
}

func (r *LokiInstantRequest) LogToSpan(sp opentracing.Span) {
sp.LogFields(
otlog.String("query", r.GetQuery()),
otlog.String("ts", timestamp.Time(r.GetStart()).String()),
otlog.Int64("limit", int64(r.GetLimit())),
otlog.String("direction", r.GetDirection().String()),
otlog.String("shards", strings.Join(r.GetShards(), ",")),
)
}

func (*LokiInstantRequest) GetCachingOptions() (res queryrange.CachingOptions) { return }

func (r *LokiSeriesRequest) GetEnd() int64 {
return r.EndTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
}
Expand Down Expand Up @@ -173,6 +215,19 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Reque
Path: r.URL.Path,
Shards: req.Shards,
}, nil
case InstantQueryOp:
req, err := loghttp.ParseInstantQuery(r)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
return &LokiInstantRequest{
Query: req.Query,
Limit: req.Limit,
Direction: req.Direction,
TimeTs: req.Ts.UTC(),
Path: r.URL.Path,
Shards: req.Shards,
}, nil
case SeriesOp:
req, err := logql.ParseAndValidateSeriesQuery(r)
if err != nil {
Expand Down Expand Up @@ -267,6 +322,29 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Req
Body: http.NoBody,
Header: http.Header{},
}
return req.WithContext(ctx), nil
case *LokiInstantRequest:
params := url.Values{
"query": []string{request.Query},
"direction": []string{request.Direction.String()},
"limit": []string{fmt.Sprintf("%d", request.Limit)},
}
if len(request.Shards) > 0 {
params["shards"] = request.Shards
}
u := &url.URL{
// the request could come /api/prom/query but we want to only use the new api.
Path: "/loki/api/v1/query",
RawQuery: params.Encode(),
}
req := &http.Request{
Method: "GET",
RequestURI: u.String(), // This is what the httpgrpc code looks at.
URL: u,
Body: http.NoBody,
Header: http.Header{},
}

return req.WithContext(ctx), nil
default:
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid request format")
Expand Down Expand Up @@ -343,27 +421,54 @@ func (Codec) DecodeResponse(ctx context.Context, r *http.Response, req queryrang
Status: resp.Status,
Data: queryrange.PrometheusData{
ResultType: loghttp.ResultTypeMatrix,
Result: toProto(resp.Data.Result.(loghttp.Matrix)),
Result: toProtoMatrix(resp.Data.Result.(loghttp.Matrix)),
},
Headers: convertPrometheusResponseHeadersToPointers(httpResponseHeadersToPromResponseHeaders(r.Header)),
},
Statistics: resp.Data.Statistics,
}, nil
case loghttp.ResultTypeStream:
// This is the same as in querysharding.go
params, err := paramsFromRequest(req)
if err != nil {
return nil, err
}

var path string
switch r := req.(type) {
case *LokiRequest:
path = r.GetPath()
case *LokiInstantRequest:
path = r.GetPath()
default:
return nil, fmt.Errorf("expected *LokiRequest or *LokiInstantRequest, got (%T)", r)
}
return &LokiResponse{
Status: resp.Status,
Direction: req.(*LokiRequest).Direction,
Limit: req.(*LokiRequest).Limit,
Version: uint32(loghttp.GetVersion(req.(*LokiRequest).Path)),
Direction: params.Direction(),
Limit: params.Limit(),
Version: uint32(loghttp.GetVersion(path)),
Statistics: resp.Data.Statistics,
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
Result: resp.Data.Result.(loghttp.Streams).ToProto(),
},
Headers: httpResponseHeadersToPromResponseHeaders(r.Header),
}, nil
case loghttp.ResultTypeVector:
return &LokiPromResponse{
Response: &queryrange.PrometheusResponse{
Status: resp.Status,
Data: queryrange.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: toProtoVector(resp.Data.Result.(loghttp.Vector)),
},
Headers: convertPrometheusResponseHeadersToPointers(httpResponseHeadersToPromResponseHeaders(r.Header)),
},
Statistics: resp.Data.Statistics,
}, nil
default:
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "unsupported response type")
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "unsupported response type, got (%s)", string(resp.Data.ResultType))
}
}
}
Expand Down Expand Up @@ -621,7 +726,7 @@ func mergeOrderedNonOverlappingStreams(resps []*LokiResponse, limit uint32, dire
return results
}

func toProto(m loghttp.Matrix) []queryrange.SampleStream {
func toProtoMatrix(m loghttp.Matrix) []queryrange.SampleStream {
if len(m) == 0 {
return nil
}
Expand All @@ -642,6 +747,23 @@ func toProto(m loghttp.Matrix) []queryrange.SampleStream {
return res
}

func toProtoVector(v loghttp.Vector) []queryrange.SampleStream {
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
if len(v) == 0 {
return nil
}
res := make([]queryrange.SampleStream, 0, len(v))
for _, s := range v {
res = append(res, queryrange.SampleStream{
Samples: []cortexpb.Sample{{
Value: float64(s.Value),
TimestampMs: int64(s.Timestamp),
}},
Labels: cortexpb.FromMetricsToLabelAdapters(s.Metric),
})
}
return res
}

func (res LokiResponse) Count() int64 {
var result int64
for _, s := range res.Data.Result {
Expand All @@ -650,38 +772,75 @@ func (res LokiResponse) Count() int64 {
return result
}

type paramsWrapper struct {
func paramsFromRequest(req queryrange.Request) (logql.Params, error) {
switch r := req.(type) {
case *LokiRequest:
return &paramsRangeWrapper{
LokiRequest: r,
}, nil
case *LokiInstantRequest:
return &paramsInstantWrapper{
LokiInstantRequest: r,
}, nil
default:
return nil, fmt.Errorf("expected *LokiRequest or *LokiInstantRequest, got (%T)", r)
}
}

type paramsRangeWrapper struct {
*LokiRequest
}

func paramsFromRequest(req queryrange.Request) *paramsWrapper {
return &paramsWrapper{
LokiRequest: req.(*LokiRequest),
}
func (p paramsRangeWrapper) Query() string {
return p.GetQuery()
}

func (p paramsRangeWrapper) Start() time.Time {
return p.GetStartTs()
}

func (p paramsRangeWrapper) End() time.Time {
return p.GetEndTs()
}

func (p paramsRangeWrapper) Step() time.Duration {
return time.Duration(p.GetStep() * 1e6)
}
func (p paramsRangeWrapper) Interval() time.Duration { return 0 }
func (p paramsRangeWrapper) Direction() logproto.Direction {
return p.GetDirection()
}
func (p paramsRangeWrapper) Limit() uint32 { return p.LokiRequest.Limit }
func (p paramsRangeWrapper) Shards() []string {
return p.GetShards()
}

type paramsInstantWrapper struct {
*LokiInstantRequest
}

func (p paramsWrapper) Query() string {
return p.LokiRequest.Query
func (p paramsInstantWrapper) Query() string {
return p.GetQuery()
}

func (p paramsWrapper) Start() time.Time {
return p.StartTs
func (p paramsInstantWrapper) Start() time.Time {
return p.LokiInstantRequest.GetTimeTs()
}

func (p paramsWrapper) End() time.Time {
return p.EndTs
func (p paramsInstantWrapper) End() time.Time {
return p.LokiInstantRequest.GetTimeTs()
}

func (p paramsWrapper) Step() time.Duration {
return time.Duration(p.LokiRequest.Step * 1e6)
func (p paramsInstantWrapper) Step() time.Duration {
return time.Duration(p.GetStep() * 1e6)
}
func (p paramsWrapper) Interval() time.Duration { return 0 }
func (p paramsWrapper) Direction() logproto.Direction {
return p.LokiRequest.Direction
func (p paramsInstantWrapper) Interval() time.Duration { return 0 }
func (p paramsInstantWrapper) Direction() logproto.Direction {
return p.GetDirection()
}
func (p paramsWrapper) Limit() uint32 { return p.LokiRequest.Limit }
func (p paramsWrapper) Shards() []string {
return p.LokiRequest.Shards
func (p paramsInstantWrapper) Limit() uint32 { return p.LokiInstantRequest.Limit }
func (p paramsInstantWrapper) Shards() []string {
return p.GetShards()
}

func httpResponseHeadersToPromResponseHeaders(httpHeaders http.Header) []queryrange.PrometheusResponseHeader {
Expand Down
17 changes: 14 additions & 3 deletions pkg/querier/queryrange/downstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,17 @@ type DownstreamHandler struct {
next queryrange.Handler
}

func ParamsToLokiRequest(params logql.Params) *LokiRequest {
func ParamsToLokiRequest(params logql.Params, shards logql.Shards) queryrange.Request {
if params.Start() == params.End() {
return &LokiInstantRequest{
Query: params.Query(),
Limit: params.Limit(),
TimeTs: params.Start(),
Direction: params.Direction(),
Path: "/loki/api/v1/query", // TODO(owen-d): make this derivable
Shards: shards.Encode(),
}
}
return &LokiRequest{
Query: params.Query(),
Limit: params.Limit(),
Expand All @@ -33,6 +43,7 @@ func ParamsToLokiRequest(params logql.Params) *LokiRequest {
EndTs: params.End(),
Direction: params.Direction(),
Path: "/loki/api/v1/query_range", // TODO(owen-d): make this derivable
Shards: shards.Encode(),
}
}

Expand All @@ -58,10 +69,10 @@ type instance struct {

func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQuery) ([]logqlmodel.Result, error) {
return in.For(ctx, queries, func(qry logql.DownstreamQuery) (logqlmodel.Result, error) {
req := ParamsToLokiRequest(qry.Params).WithShards(qry.Shards).WithQuery(qry.Expr.String()).(*LokiRequest)
req := ParamsToLokiRequest(qry.Params, qry.Shards).WithQuery(qry.Expr.String())
logger, ctx := spanlogger.New(ctx, "DownstreamHandler.instance")
defer logger.Finish()
level.Debug(logger).Log("shards", fmt.Sprintf("%+v", req.Shards), "query", req.Query, "step", req.GetStep())
level.Debug(logger).Log("shards", fmt.Sprintf("%+v", qry.Shards), "query", req.GetQuery(), "step", req.GetStep())
owen-d marked this conversation as resolved.
Show resolved Hide resolved

res, err := in.handler.Do(ctx, req)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/downstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func TestInstanceDownstream(t *testing.T) {
// for some reason these seemingly can't be checked in their own goroutines,
// so we assign them to scoped variables for later comparison.
got = req
want = ParamsToLokiRequest(params).WithShards(logql.Shards{{Shard: 0, Of: 2}}).WithQuery(expr.String())
want = ParamsToLokiRequest(params, queries[0].Shards).WithQuery(expr.String())

return expectedResp(), nil
},
Expand Down
Loading