diff --git a/Gopkg.lock b/Gopkg.lock index 1b1866764884a..35077a3a2d6bb 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -222,7 +222,7 @@ [[projects]] branch = "master" - digest = "1:2f0846dd85df3365a80c32ff994eb1fcee5eec2c51a812ceec182398f3ef85f4" + digest = "1:5a07b5363e4c2aa127a3afd1e8e323d3a288ba1d90d37793d2e14843f5b5b82e" name = "github.com/cortexproject/cortex" packages = [ "pkg/chunk", @@ -248,7 +248,7 @@ "pkg/util/validation", ] pruneopts = "UT" - revision = "e1ab5495e8a846891e3b6b8e757e63201b886bec" + revision = "ef492f6bbafb185bbe61ae7a6955b7a4af5f3d9a" [[projects]] digest = "1:ffe9824d294da03b391f44e1ae8281281b4afc1bdaa9588c9097785e3af10cec" @@ -560,39 +560,6 @@ revision = "66b9c49e59c6c48f0ffce28c2d8b8a5678502c6d" version = "v1.4.0" -[[projects]] - branch = "master" - digest = "1:1f4181cfeacebef71babf22e99d727c1667e1f620982787c7035653d6e887dbb" - name = "github.com/grafana/loki" - packages = [ - "pkg/chunkenc", - "pkg/distributor", - "pkg/helpers", - "pkg/ingester", - "pkg/ingester/client", - "pkg/iter", - "pkg/logentry/metric", - "pkg/logentry/stages", - "pkg/logproto", - "pkg/logql", - "pkg/loki", - "pkg/promtail", - "pkg/promtail/api", - "pkg/promtail/client", - "pkg/promtail/client/fake", - "pkg/promtail/config", - "pkg/promtail/positions", - "pkg/promtail/scrape", - "pkg/promtail/server", - "pkg/promtail/server/ui", - "pkg/promtail/targets", - "pkg/querier", - "pkg/util", - "pkg/util/flagext", - ] - pruneopts = "UT" - revision = "4c7138231f77997909564616efc5d0cdbcb1ead8" - [[projects]] digest = "1:1168584a5881d371e96cb0e66ef6db71d7cef0856cc7f311490bc856627f8328" name = "github.com/grpc-ecosystem/go-grpc-middleware" @@ -1619,30 +1586,6 @@ "github.com/golang/snappy", "github.com/gorilla/mux", "github.com/gorilla/websocket", - "github.com/grafana/loki/pkg/chunkenc", - "github.com/grafana/loki/pkg/distributor", - "github.com/grafana/loki/pkg/helpers", - "github.com/grafana/loki/pkg/ingester", - "github.com/grafana/loki/pkg/ingester/client", - "github.com/grafana/loki/pkg/iter", - "github.com/grafana/loki/pkg/logentry/metric", - "github.com/grafana/loki/pkg/logentry/stages", - "github.com/grafana/loki/pkg/logproto", - "github.com/grafana/loki/pkg/logql", - "github.com/grafana/loki/pkg/loki", - "github.com/grafana/loki/pkg/promtail", - "github.com/grafana/loki/pkg/promtail/api", - "github.com/grafana/loki/pkg/promtail/client", - "github.com/grafana/loki/pkg/promtail/client/fake", - "github.com/grafana/loki/pkg/promtail/config", - "github.com/grafana/loki/pkg/promtail/positions", - "github.com/grafana/loki/pkg/promtail/scrape", - "github.com/grafana/loki/pkg/promtail/server", - "github.com/grafana/loki/pkg/promtail/server/ui", - "github.com/grafana/loki/pkg/promtail/targets", - "github.com/grafana/loki/pkg/querier", - "github.com/grafana/loki/pkg/util", - "github.com/grafana/loki/pkg/util/flagext", "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc", "github.com/hpcloud/tail", "github.com/jmespath/go-jmespath", diff --git a/docs/api.md b/docs/api.md index 14b9b40ffcc1c..51e1544f0a72d 100644 --- a/docs/api.md +++ b/docs/api.md @@ -39,7 +39,7 @@ The Loki server has the following API endpoints (_Note:_ Authentication is out o Responses looks like this: - ``` + ```json { "streams": [ { @@ -59,11 +59,14 @@ The Loki server has the following API endpoints (_Note:_ Authentication is out o - `GET /api/prom/label` - For retrieving the names of the labels one can query on. + For doing label name queries, accepts the following parameters in the query-string: + + - `start`: the start time for the query, as a nanosecond Unix epoch (nanoseconds since 1970). Default is always 6 hour ago. + - `end`: the end time for the query, as a nanosecond Unix epoch (nanoseconds since 1970). Default is current time. Responses looks like this: - ``` + ```json { "values": [ "instance", @@ -74,11 +77,15 @@ The Loki server has the following API endpoints (_Note:_ Authentication is out o ``` - `GET /api/prom/label//values` - For retrieving the label values one can query on. + + For doing label values queries, accepts the following parameters in the query-string: + + - `start`: the start time for the query, as a nanosecond Unix epoch (nanoseconds since 1970). Default is always 6 hour ago. + - `end`: the end time for the query, as a nanosecond Unix epoch (nanoseconds since 1970). Default is current time. Responses looks like this: - ``` + ```json { "values": [ "default", diff --git a/pkg/logproto/logproto.pb.go b/pkg/logproto/logproto.pb.go index d2c0e3ed96c25..e63820f2282b2 100644 --- a/pkg/logproto/logproto.pb.go +++ b/pkg/logproto/logproto.pb.go @@ -6,17 +6,18 @@ package logproto import ( context "context" fmt "fmt" - _ "github.com/gogo/protobuf/gogoproto" - proto "github.com/gogo/protobuf/proto" - _ "github.com/gogo/protobuf/types" - github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" - grpc "google.golang.org/grpc" io "io" math "math" reflect "reflect" strconv "strconv" strings "strings" time "time" + + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + _ "github.com/gogo/protobuf/types" + github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" + grpc "google.golang.org/grpc" ) // Reference imports to suppress errors if they are not otherwise used. @@ -257,8 +258,10 @@ func (m *QueryResponse) GetStreams() []*Stream { } type LabelRequest struct { - Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` - Values bool `protobuf:"varint,2,opt,name=values,proto3" json:"values,omitempty"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Values bool `protobuf:"varint,2,opt,name=values,proto3" json:"values,omitempty"` + Start *time.Time `protobuf:"bytes,3,opt,name=start,proto3,stdtime" json:"start,omitempty"` + End *time.Time `protobuf:"bytes,4,opt,name=end,proto3,stdtime" json:"end,omitempty"` } func (m *LabelRequest) Reset() { *m = LabelRequest{} } @@ -307,6 +310,20 @@ func (m *LabelRequest) GetValues() bool { return false } +func (m *LabelRequest) GetStart() *time.Time { + if m != nil { + return m.Start + } + return nil +} + +func (m *LabelRequest) GetEnd() *time.Time { + if m != nil { + return m.End + } + return nil +} + type LabelResponse struct { Values []string `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty"` } @@ -838,6 +855,20 @@ func (this *LabelRequest) Equal(that interface{}) bool { if this.Values != that1.Values { return false } + if that1.Start == nil { + if this.Start != nil { + return false + } + } else if !this.Start.Equal(*that1.Start) { + return false + } + if that1.End == nil { + if this.End != nil { + return false + } + } else if !this.End.Equal(*that1.End) { + return false + } return true } func (this *LabelResponse) Equal(that interface{}) bool { @@ -1072,10 +1103,12 @@ func (this *LabelRequest) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 6) + s := make([]string, 0, 8) s = append(s, "&logproto.LabelRequest{") s = append(s, "Name: "+fmt.Sprintf("%#v", this.Name)+",\n") s = append(s, "Values: "+fmt.Sprintf("%#v", this.Values)+",\n") + s = append(s, "Start: "+fmt.Sprintf("%#v", this.Start)+",\n") + s = append(s, "End: "+fmt.Sprintf("%#v", this.End)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -1587,6 +1620,26 @@ func (m *LabelRequest) MarshalTo(dAtA []byte) (int, error) { } i++ } + if m.Start != nil { + dAtA[i] = 0x1a + i++ + i = encodeVarintLogproto(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(*m.Start))) + n3, err3 := github_com_gogo_protobuf_types.StdTimeMarshalTo(*m.Start, dAtA[i:]) + if err3 != nil { + return 0, err3 + } + i += n3 + } + if m.End != nil { + dAtA[i] = 0x22 + i++ + i = encodeVarintLogproto(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(*m.End))) + n4, err4 := github_com_gogo_protobuf_types.StdTimeMarshalTo(*m.End, dAtA[i:]) + if err4 != nil { + return 0, err4 + } + i += n4 + } return i, nil } @@ -1677,11 +1730,11 @@ func (m *Entry) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintLogproto(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp))) - n3, err3 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i:]) - if err3 != nil { - return 0, err3 + n5, err5 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i:]) + if err5 != nil { + return 0, err5 } - i += n3 + i += n5 if len(m.Line) > 0 { dAtA[i] = 0x12 i++ @@ -1894,6 +1947,14 @@ func (m *LabelRequest) Size() (n int) { if m.Values { n += 2 } + if m.Start != nil { + l = github_com_gogo_protobuf_types.SizeOfStdTime(*m.Start) + n += 1 + l + sovLogproto(uint64(l)) + } + if m.End != nil { + l = github_com_gogo_protobuf_types.SizeOfStdTime(*m.End) + n += 1 + l + sovLogproto(uint64(l)) + } return n } @@ -2076,6 +2137,8 @@ func (this *LabelRequest) String() string { s := strings.Join([]string{`&LabelRequest{`, `Name:` + fmt.Sprintf("%v", this.Name) + `,`, `Values:` + fmt.Sprintf("%v", this.Values) + `,`, + `Start:` + strings.Replace(fmt.Sprintf("%v", this.Start), "Timestamp", "types.Timestamp", 1) + `,`, + `End:` + strings.Replace(fmt.Sprintf("%v", this.End), "Timestamp", "types.Timestamp", 1) + `,`, `}`, }, "") return s @@ -2694,6 +2757,78 @@ func (m *LabelRequest) Unmarshal(dAtA []byte) error { } } m.Values = bool(v != 0) + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Start", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Start == nil { + m.Start = new(time.Time) + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(m.Start, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field End", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.End == nil { + m.End = new(time.Time) + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(m.End, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipLogproto(dAtA[iNdEx:]) diff --git a/pkg/logproto/logproto.proto b/pkg/logproto/logproto.proto index d0d8388babb0c..d97dabcc70db2 100644 --- a/pkg/logproto/logproto.proto +++ b/pkg/logproto/logproto.proto @@ -43,6 +43,8 @@ message QueryResponse { message LabelRequest { string name = 1; bool values = 2; // True to fetch label values, false for fetch labels names. + google.protobuf.Timestamp start = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = true]; + google.protobuf.Timestamp end = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = true]; } message LabelResponse { diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 4d1cbedeba2e3..f1df00b1ec393 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -143,10 +143,27 @@ func (q *Querier) QueryHandler(w http.ResponseWriter, r *http.Request) { // LabelHandler is a http.HandlerFunc for handling label queries. func (q *Querier) LabelHandler(w http.ResponseWriter, r *http.Request) { name, ok := mux.Vars(r)["name"] + params := r.URL.Query() + now := time.Now() req := &logproto.LabelRequest{ Values: ok, Name: name, } + + end, err := unixNanoTimeParam(params, "end", now) + if err != nil { + http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest) + return + } + req.End = &end + + start, err := unixNanoTimeParam(params, "start", end.Add(-6*time.Hour)) + if err != nil { + http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest) + return + } + req.Start = &start + resp, err := q.Label(r.Context(), req) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 5a2a5e2ff18bd..eacf22fb0edd6 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -10,6 +10,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util" token_util "github.com/grafana/loki/pkg/util" + "github.com/prometheus/common/model" "github.com/weaveworks/common/user" "google.golang.org/grpc/health/grpc_health_v1" @@ -148,10 +149,25 @@ func (q *Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*logpr return nil, err } + from, through := model.TimeFromUnixNano(req.Start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano()) + var storeValues []string + if req.Values { + storeValues, err = q.store.LabelValuesForMetricName(ctx, from, through, "logs", req.Name) + if err != nil { + return nil, err + } + } else { + storeValues, err = q.store.LabelNamesForMetricName(ctx, from, through, "logs") + if err != nil { + return nil, err + } + } + results := make([][]string, 0, len(resps)) for _, resp := range resps { results = append(results, resp.response.(*logproto.LabelResponse).Values) } + results = append(results, storeValues) return &logproto.LabelResponse{ Values: mergeLists(results...), diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go index 7ae86fc3de429..f93472c09cfcf 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go @@ -64,11 +64,15 @@ type StoreConfig struct { // Limits query start time to be greater than now() - MaxLookBackPeriod, if set. MaxLookBackPeriod time.Duration `yaml:"max_look_back_period"` + + // Not visible in yaml because the setting shouldn't be common between ingesters and queriers + chunkCacheStubs bool // don't write the full chunk to cache, just a stub entry } // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) { cfg.ChunkCacheConfig.RegisterFlagsWithPrefix("", "Cache config for chunks. ", f) + f.BoolVar(&cfg.chunkCacheStubs, "store.chunk-cache-stubs", false, "If true, don't write the full chunk to cache, just a stub entry.") cfg.WriteDedupeCacheConfig.RegisterFlagsWithPrefix("store.index-cache-write.", "Cache config for index entry writing. ", f) f.DurationVar(&cfg.MinChunkAge, "store.min-chunk-age", 0, "Minimum time between chunk update and being saved to the store.") @@ -92,7 +96,7 @@ type store struct { } func newStore(cfg StoreConfig, schema Schema, index IndexClient, chunks ObjectClient, limits *validation.Overrides) (Store, error) { - fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, chunks) + fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, cfg.chunkCacheStubs, chunks) if err != nil { return nil, err } @@ -228,12 +232,44 @@ func (c *store) LabelValuesForMetricName(ctx context.Context, from, through mode } result = append(result, string(labelValue)) } - sort.Strings(result) result = uniqueStrings(result) return result, nil } +// LabelNamesForMetricName retrieves all label names for a metric name. +func (c *store) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) { + log, ctx := spanlogger.New(ctx, "ChunkStore.LabelNamesForMetricName") + defer log.Span.Finish() + level.Debug(log).Log("from", from, "through", through, "metricName", metricName) + + shortcut, err := c.validateQueryTimeRange(ctx, &from, &through) + if err != nil { + return nil, err + } else if shortcut { + return nil, nil + } + + chunks, err := c.lookupChunksByMetricName(ctx, from, through, nil, metricName) + if err != nil { + return nil, err + } + level.Debug(log).Log("msg", "Chunks in index", "chunks", len(chunks)) + + // Filter out chunks that are not in the selected time range and keep a single chunk per fingerprint + filtered := filterChunksByTime(from, through, chunks) + filtered, keys := filterChunksByUniqueFingerprint(filtered) + level.Debug(log).Log("msg", "Chunks post filtering", "chunks", len(chunks)) + + // Now fetch the actual chunk data from Memcache / S3 + allChunks, err := c.FetchChunks(ctx, filtered, keys) + if err != nil { + level.Error(log).Log("msg", "FetchChunks", "err", err) + return nil, err + } + return labelNamesFromChunks(allChunks), nil +} + func (c *store) validateQueryTimeRange(ctx context.Context, from *model.Time, through *model.Time) (bool, error) { log, ctx := spanlogger.New(ctx, "store.validateQueryTimeRange") defer log.Span.Finish() diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go index c5e7cade69b63..49506be76a382 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go @@ -2,6 +2,7 @@ package chunk import ( "context" + "sort" "sync" "github.com/go-kit/kit/log/level" @@ -36,6 +37,39 @@ func keysFromChunks(chunks []Chunk) []string { return keys } +func labelNamesFromChunks(chunks []Chunk) []string { + keys := map[string]struct{}{} + var result []string + for _, c := range chunks { + for _, l := range c.Metric { + if l.Name != model.MetricNameLabel { + if _, ok := keys[string(l.Name)]; !ok { + keys[string(l.Name)] = struct{}{} + result = append(result, string(l.Name)) + } + } + } + } + sort.Strings(result) + return result +} + +func filterChunksByUniqueFingerprint(chunks []Chunk) ([]Chunk, []string) { + filtered := make([]Chunk, 0, len(chunks)) + keys := make([]string, 0, len(chunks)) + uniqueFp := map[model.Fingerprint]struct{}{} + + for _, chunk := range chunks { + if _, ok := uniqueFp[chunk.Fingerprint]; ok { + continue + } + filtered = append(filtered, chunk) + keys = append(keys, chunk.ExternalKey()) + uniqueFp[chunk.Fingerprint] = struct{}{} + } + return filtered, keys +} + func filterChunksByMatchers(chunks []Chunk, filters []*labels.Matcher) []Chunk { filteredChunks := make([]Chunk, 0, len(chunks)) outer: @@ -54,8 +88,9 @@ outer: // and writing back any misses to the cache. Also responsible for decoding // chunks from the cache, in parallel. type Fetcher struct { - storage ObjectClient - cache cache.Cache + storage ObjectClient + cache cache.Cache + cacheStubs bool wait sync.WaitGroup decodeRequests chan decodeRequest @@ -72,7 +107,7 @@ type decodeResponse struct { } // NewChunkFetcher makes a new ChunkFetcher. -func NewChunkFetcher(cfg cache.Config, storage ObjectClient) (*Fetcher, error) { +func NewChunkFetcher(cfg cache.Config, cacheStubs bool, storage ObjectClient) (*Fetcher, error) { cache, err := cache.New(cfg) if err != nil { return nil, err @@ -81,6 +116,7 @@ func NewChunkFetcher(cfg cache.Config, storage ObjectClient) (*Fetcher, error) { c := &Fetcher{ storage: storage, cache: cache, + cacheStubs: cacheStubs, decodeRequests: make(chan decodeRequest), } @@ -149,10 +185,14 @@ func (c *Fetcher) writeBackCache(ctx context.Context, chunks []Chunk) error { keys := make([]string, 0, len(chunks)) bufs := make([][]byte, 0, len(chunks)) for i := range chunks { - encoded, err := chunks[i].Encoded() - // TODO don't fail, just log and conitnue? - if err != nil { - return err + var encoded []byte + var err error + if !c.cacheStubs { + encoded, err = chunks[i].Encoded() + // TODO don't fail, just log and conitnue? + if err != nil { + return err + } } keys = append(keys, chunks[i].ExternalKey()) diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go index 762aa424e3625..c00c3da8dba46 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go @@ -19,6 +19,7 @@ type Store interface { // using the corresponding Fetcher (fetchers[i].FetchChunks(ctx, chunks[i], ...) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error) + LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) Stop() } @@ -106,6 +107,20 @@ func (c compositeStore) LabelValuesForMetricName(ctx context.Context, from, thro return result, err } +// LabelNamesForMetricName retrieves all label names for a metric name. +func (c compositeStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) { + var result []string + err := c.forStores(from, through, func(from, through model.Time, store Store) error { + labelNames, err := store.LabelNamesForMetricName(ctx, from, through, metricName) + if err != nil { + return err + } + result = append(result, labelNames...) + return nil + }) + return result, err +} + func (c compositeStore) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) { chunkIDs := [][]Chunk{} fetchers := []*Fetcher{} diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go index 5462a0fc38fcf..38bd313a51cf3 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go @@ -68,7 +68,7 @@ type seriesStore struct { } func newSeriesStore(cfg StoreConfig, schema Schema, index IndexClient, chunks ObjectClient, limits *validation.Overrides) (Store, error) { - fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, chunks) + fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, cfg.chunkCacheStubs, chunks) if err != nil { return nil, err } @@ -191,6 +191,61 @@ func (c *seriesStore) GetChunkRefs(ctx context.Context, from, through model.Time return [][]Chunk{chunks}, []*Fetcher{c.store.Fetcher}, nil } +// LabelNamesForMetricName retrieves all label names for a metric name. +func (c *seriesStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) { + log, ctx := spanlogger.New(ctx, "SeriesStore.LabelNamesForMetricName") + defer log.Span.Finish() + + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + + shortcut, err := c.validateQueryTimeRange(ctx, &from, &through) + if err != nil { + return nil, err + } else if shortcut { + return nil, nil + } + level.Debug(log).Log("metric", metricName) + + // Fetch the series IDs from the index + seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, userID, metricName, nil) + if err != nil { + return nil, err + } + level.Debug(log).Log("series-ids", len(seriesIDs)) + + // Lookup the series in the index to get the chunks. + chunkIDs, err := c.lookupChunksBySeries(ctx, from, through, userID, seriesIDs) + if err != nil { + level.Error(log).Log("msg", "lookupChunksBySeries", "err", err) + return nil, err + } + level.Debug(log).Log("chunk-ids", len(chunkIDs)) + + chunks, err := c.convertChunkIDsToChunks(ctx, userID, chunkIDs) + if err != nil { + level.Error(log).Log("err", "convertChunkIDsToChunks", "err", err) + return nil, err + } + + // Filter out chunks that are not in the selected time range and keep a single chunk per fingerprint + filtered := filterChunksByTime(from, through, chunks) + filtered, keys := filterChunksByUniqueFingerprint(filtered) + level.Debug(log).Log("Chunks post filtering", len(chunks)) + + chunksPerQuery.Observe(float64(len(filtered))) + + // Now fetch the actual chunk data from Memcache / S3 + allChunks, err := c.FetchChunks(ctx, filtered, keys) + if err != nil { + level.Error(log).Log("msg", "FetchChunks", "err", err) + return nil, err + } + return labelNamesFromChunks(allChunks), nil +} + func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from, through model.Time, userID, metricName string, matchers []*labels.Matcher) ([]string, error) { log, ctx := spanlogger.New(ctx, "SeriesStore.lookupSeriesByMetricNameMatchers", "metricName", metricName, "matchers", len(matchers)) defer log.Span.Finish()