Skip to content

Commit

Permalink
Remote read first class: workflow structure
Browse files Browse the repository at this point in the history
Make place for a remote read roundtripper that would
wrap limits and blocking checks.

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
  • Loading branch information
krajorama committed Jun 12, 2024
1 parent 041e37b commit f820df2
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 6 deletions.
46 changes: 41 additions & 5 deletions pkg/frontend/querymiddleware/remote_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,64 @@ import (
"github.com/grafana/mimir/pkg/util"
)

type remoteReadRoundTripper struct {
next http.RoundTripper
}

func newRemoteReadRoundTripper(next http.RoundTripper) http.RoundTripper {
return &remoteReadRoundTripper{
next: next,
}
}

func (r *remoteReadRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
_, err := getRemoteReadRequestWithoutConsumingBody(req)
if err != nil {
return nil, err
}

// apply middlewares kept in 'r' to each individual request

return r.next.RoundTrip(req)
}

// ParseRemoteReadRequestWithoutConsumingBody parses a remote read request
// without consuming the body. It does not check the req.Body size, so it is
// the caller's responsibility to ensure that the body is not too large.
func ParseRemoteReadRequestWithoutConsumingBody(req *http.Request) (url.Values, error) {
params := make(url.Values)
remoteReadRequest, err := getRemoteReadRequestWithoutConsumingBody(req)
if err != nil {
return nil, err
}
return parseRemoteReadRequest(remoteReadRequest)
}

func getRemoteReadRequestWithoutConsumingBody(req *http.Request) (*prompb.ReadRequest, error) {
if req.Body == nil {
return params, nil
return nil, nil
}

bodyBytes, err := util.ReadRequestBodyWithoutConsuming(req)
if err != nil {
return nil, err
}

remoteReadRequest := prompb.ReadRequest{}
remoteReadRequest := &prompb.ReadRequest{}

_, err = util.ParseProtoReader(req.Context(), io.NopCloser(bytes.NewReader(bodyBytes)), int(req.ContentLength), querier.MaxRemoteReadQuerySize, nil, &remoteReadRequest, util.RawSnappy)
_, err = util.ParseProtoReader(req.Context(), io.NopCloser(bytes.NewReader(bodyBytes)), int(req.ContentLength), querier.MaxRemoteReadQuerySize, nil, remoteReadRequest, util.RawSnappy)
if err != nil {
return nil, err
}

return remoteReadRequest, nil
}

func parseRemoteReadRequest(remoteReadRequest *prompb.ReadRequest) (url.Values, error) {
if remoteReadRequest == nil {
return nil, nil
}

params := make(url.Values)
add := func(i int, name, value string) { params.Add(name+"_"+strconv.Itoa(i), value) }

queries := remoteReadRequest.GetQueries()
Expand All @@ -66,5 +102,5 @@ func ParseRemoteReadRequestWithoutConsumingBody(req *http.Request) (url.Values,
}
}

return params, err
return params, nil
}
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/remote_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestParseRemoteReadRequestWithoutConsumingBody(t *testing.T) {
reqBody: func() io.ReadCloser {
return nil
},
expectedParams: make(url.Values),
expectedParams: nil,
},
"valid body": {
reqBody: func() io.ReadCloser {
Expand Down
3 changes: 3 additions & 0 deletions pkg/frontend/querymiddleware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ func newQueryTripperware(
return func(next http.RoundTripper) http.RoundTripper {
queryrange := newLimitedParallelismRoundTripper(next, codec, limits, queryRangeMiddleware...)
instant := newLimitedParallelismRoundTripper(next, codec, limits, queryInstantMiddleware...)
remoteRead := newRemoteReadRoundTripper(next)

// Wrap next for cardinality, labels queries and all other queries.
// That attempts to parse "start" and "end" from the HTTP request and set them in the request's QueryDetails.
Expand Down Expand Up @@ -358,6 +359,8 @@ func newQueryTripperware(
return activeNativeHistogramMetrics.RoundTrip(r)
case IsLabelsQuery(r.URL.Path):
return labels.RoundTrip(r)
case IsRemoteReadQuery(r.URL.Path):
return remoteRead.RoundTrip(r)
default:
return next.RoundTrip(r)
}
Expand Down

0 comments on commit f820df2

Please sign in to comment.