Skip to content

Commit

Permalink
Remote read first class: workflow structure
Browse files Browse the repository at this point in the history
Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
  • Loading branch information
krajorama committed Jun 12, 2024
1 parent b0e7c77 commit 3d19b2e
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 11 deletions.
50 changes: 43 additions & 7 deletions pkg/frontend/querymiddleware/remote_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,64 @@ import (
"github.com/grafana/mimir/pkg/util"
)

func RemoteReadPreprocessor(req *http.Request) (url.Values, error) {
remoteReadRequest, err := parseRemoteReadRequestWithoutConsumingBody(req)
if remoteReadRequest == nil || err != nil {
return nil, err
}

if err := blockQueries(remoteReadRequest); err != nil {
return nil, err
}

if err := applyLimits(remoteReadRequest); err != nil {
return nil, err
}

return remoteReadRequestToURLValues(remoteReadRequest)
}

// blockQueries blocks queries in the remote read request.
// returns an error if the request contains blocked queries.
func blockQueries(_ *prompb.ReadRequest) error {
return nil
}

// applyLimits applies limits to the remote read request.
// returns an error if the request exceeds the limits.
func applyLimits(_ *prompb.ReadRequest) error {
return nil
}

// ParseRemoteReadRequestWithoutConsumingBody parses a remote read request
// without consuming the body. It does not check the req.Body size, so it is
// the caller's responsibility to ensure that the body is not too large.
func ParseRemoteReadRequestWithoutConsumingBody(req *http.Request) (url.Values, error) {
params := make(url.Values)

func parseRemoteReadRequestWithoutConsumingBody(req *http.Request) (*prompb.ReadRequest, error) {
if req.Body == nil {
return params, nil
return nil, nil
}

bodyBytes, err := util.ReadRequestBodyWithoutConsuming(req)
if err != nil {
return nil, err
}

remoteReadRequest := prompb.ReadRequest{}
remoteReadRequest := &prompb.ReadRequest{}

_, err = util.ParseProtoReader(req.Context(), io.NopCloser(bytes.NewReader(bodyBytes)), int(req.ContentLength), querier.MaxRemoteReadQuerySize, nil, &remoteReadRequest, util.RawSnappy)
_, err = util.ParseProtoReader(req.Context(), io.NopCloser(bytes.NewReader(bodyBytes)), int(req.ContentLength), querier.MaxRemoteReadQuerySize, nil, remoteReadRequest, util.RawSnappy)
if err != nil {
return nil, err
}

return remoteReadRequest, nil
}

func remoteReadRequestToURLValues(remoteReadRequest *prompb.ReadRequest) (url.Values, error) {
if remoteReadRequest == nil {
return nil, nil
}

params := make(url.Values)
add := func(i int, name, value string) { params.Add(name+"_"+strconv.Itoa(i), value) }

queries := remoteReadRequest.GetQueries()
Expand All @@ -66,5 +102,5 @@ func ParseRemoteReadRequestWithoutConsumingBody(req *http.Request) (url.Values,
}
}

return params, err
return params, nil
}
5 changes: 2 additions & 3 deletions pkg/frontend/querymiddleware/remote_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestParseRemoteReadRequestWithoutConsumingBody(t *testing.T) {
func TestRemoteReadPreprocessor(t *testing.T) {
testCases := map[string]struct {
reqBody func() io.ReadCloser
contentLength int
Expand All @@ -27,7 +27,6 @@ func TestParseRemoteReadRequestWithoutConsumingBody(t *testing.T) {
reqBody: func() io.ReadCloser {
return nil
},
expectedParams: make(url.Values),
},
"valid body": {
reqBody: func() io.ReadCloser {
Expand Down Expand Up @@ -73,7 +72,7 @@ func TestParseRemoteReadRequestWithoutConsumingBody(t *testing.T) {
req := &http.Request{
Body: tc.reqBody(),
}
params, err := ParseRemoteReadRequestWithoutConsumingBody(req)
params, err := RemoteReadPreprocessor(req)
if err != nil {
if tc.expectedErrorIs != nil {
require.ErrorIs(t, err, tc.expectedErrorIs)
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var err error

if r.Header.Get("Content-Type") == "application/x-protobuf" && querymiddleware.IsRemoteReadQuery(r.URL.Path) {
params, err = querymiddleware.ParseRemoteReadRequestWithoutConsumingBody(r)
params, err = querymiddleware.RemoteReadPreprocessor(r)
} else {
params, err = util.ParseRequestFormWithoutConsumingBody(r)
}
Expand Down

0 comments on commit 3d19b2e

Please sign in to comment.