Skip to content

Commit

Permalink
frontend: implement cache control (#1974)
Browse files Browse the repository at this point in the history
* querier: do not cache results if requested

Add an extra `Headers` field to the `PrometheusResponse` message which
contains the headers and their values that came from Prometheus.

Use them in other places to deduce if the response should be cached. If
`Cache-Control` is equal to `no-store` then it is *not* cached.

This will be used by the Thanos project to indicate when a partial
response has been returned. In such cases the result should not be
cached so that invalid data would not be stored there.

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* queryrange: factor out cache checking + add tests

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* querier: gofmt

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* querier: tests: add missing member

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* querier: fix logical mistake

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* queryrange: fix generated code

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* querier: test adjustments

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* querier: test adjustments

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* querier: results_cache: cache by default

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* queryrange: cache: check all header values

`Cache-Control` might contain more than one value so check all of them.

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* Update according to Goutham's comments

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* CHANGELOG: add full stop, PR's number

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>
  • Loading branch information
GiedriusS authored and pracucci committed Jan 17, 2020
1 parent 5365b84 commit 8af6675
Show file tree
Hide file tree
Showing 7 changed files with 523 additions and 59 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

## master / unreleased

* [CHANGE] The frontend component now does not cache results if it finds a `Cache-Control` header and if one of its values is `no-store`. #1974
* [ENHANCEMENT] metric `cortex_ingester_flush_reasons` gets a new `reason` value: `Spread`, when `-ingester.spread-flushes` option is enabled.

* [CHANGE] Flags changed with transition to upstream Prometheus rules manager:
* `ruler.client-timeout` is now `ruler.configs.client-timeout` in order to match `ruler.configs.url`
* `ruler.group-timeout`has been removed
Expand Down
7 changes: 7 additions & 0 deletions pkg/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ var (

// PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses.
PrometheusCodec Codec = &prometheusCodec{}

// Name of the cache control header.
cachecontrolHeader = "Cache-Control"
)

// Codec is used to encode/decode query range requests and responses so they can be passed down to middlewares.
Expand Down Expand Up @@ -221,6 +224,10 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R
if err := json.Unmarshal(buf, &resp); err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
}

for h, hv := range r.Header {
resp.Headers = append(resp.Headers, &PrometheusResponseHeader{Name: h, Values: hv})
}
return &resp, nil
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/querier/queryrange/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ func TestRequest(t *testing.T) {
}

func TestResponse(t *testing.T) {
r := *parsedResponse
r.Headers = respHeaders
for i, tc := range []struct {
body string
expected *PrometheusResponse
}{
{
body: responseBody,
expected: parsedResponse,
expected: &r,
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
Expand Down
471 changes: 414 additions & 57 deletions pkg/querier/queryrange/queryrange.pb.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions pkg/querier/queryrange/queryrange.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@ message PrometheusRequest {
string query = 6;
}

message PrometheusResponseHeader {
string Name = 1 [(gogoproto.jsontag) = "-"];
repeated string Values = 2 [(gogoproto.jsontag) = "-"];
}

message PrometheusResponse {
string Status = 1 [(gogoproto.jsontag) = "status"];
PrometheusData Data = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "data,omitempty"];
string ErrorType = 3 [(gogoproto.jsontag) = "errorType,omitempty"];
string Error = 4 [(gogoproto.jsontag) = "error,omitempty"];
repeated PrometheusResponseHeader Headers = 5 [(gogoproto.jsontag) = "-"];
}

message PrometheusData {
Expand Down
35 changes: 35 additions & 0 deletions pkg/querier/queryrange/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)

var (
// Value that cachecontrolHeader has if the response indicates that the results should not be cached.
noCacheValue = "no-store"
)

// ResultsCacheConfig is the config for the results cache.
type ResultsCacheConfig struct {
CacheConfig cache.Config `yaml:"cache"`
Expand Down Expand Up @@ -59,6 +64,7 @@ var PrometheusResponseExtractor = ExtractorFunc(func(start, end int64, from Resp
ResultType: promRes.Data.ResultType,
Result: extractMatrix(start, end, promRes.Data.Result),
},
Headers: promRes.Headers,
}
})

Expand Down Expand Up @@ -133,12 +139,41 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) {
return response, err
}

// shouldCacheResponse says whether the response should be cached or not.
func shouldCacheResponse(r Response) bool {
if promResp, ok := r.(*PrometheusResponse); ok {
shouldCache := true
outer:
for _, hv := range promResp.Headers {
if hv == nil {
continue
}
if hv.Name != cachecontrolHeader {
continue
}
for _, v := range hv.Values {
if v == noCacheValue {
shouldCache = false
break outer
}
}
}
return shouldCache
}
return true
}

func (s resultsCache) handleMiss(ctx context.Context, r Request) (Response, []Extent, error) {
response, err := s.next.Do(ctx, r)
if err != nil {
return nil, nil, err
}

if !shouldCacheResponse(response) {
level.Debug(s.logger).Log("msg", fmt.Sprintf("%s header in response is equal to %s, not caching the response", cachecontrolHeader, noCacheValue))
return response, []Extent{}, nil
}

extent, err := toExtent(ctx, r, response)
if err != nil {
return nil, nil, err
Expand Down
57 changes: 57 additions & 0 deletions pkg/querier/queryrange/results_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ var (
Step: 120 * 1e3,
Query: "sum(container_memory_rss) by (namespace)",
}
respHeaders = []*PrometheusResponseHeader{
{
Name: "Content-Type",
Values: []string{"application/json"},
},
}
parsedResponse = &PrometheusResponse{
Status: "success",
Data: PrometheusData{
Expand Down Expand Up @@ -108,6 +114,57 @@ func mkExtent(start, end int64) Extent {
}
}

func TestShouldCache(t *testing.T) {
for i, tc := range []struct {
input Response
expected bool
}{
// Does not contain the needed header.
{
input: Response(&PrometheusResponse{
Headers: []*PrometheusResponseHeader{
{
Name: "meaninglessheader",
Values: []string{},
},
},
}),
expected: true,
},
// Does contain the header which has the value.
{
input: Response(&PrometheusResponse{
Headers: []*PrometheusResponseHeader{
{
Name: cachecontrolHeader,
Values: []string{noCacheValue},
},
},
}),
expected: false,
},
// Header contains extra values but still good.
{
input: Response(&PrometheusResponse{
Headers: []*PrometheusResponseHeader{
{
Name: cachecontrolHeader,
Values: []string{"foo", noCacheValue},
},
},
}),
expected: false,
},
} {
{
t.Run(strconv.Itoa(i), func(t *testing.T) {
ret := shouldCacheResponse(tc.input)
require.Equal(t, tc.expected, ret)
})
}
}
}

func TestPartiton(t *testing.T) {
for i, tc := range []struct {
input Request
Expand Down

0 comments on commit 8af6675

Please sign in to comment.