diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index af3e289687f7d..c811333d66caf 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -511,7 +511,7 @@ func (t *Loki) setupModuleManager() error { Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs, UsageReport}, TableManager: {Server, UsageReport}, Compactor: {Server, Overrides, MemberlistKV, UsageReport}, - IndexGateway: {Server, Overrides, UsageReport, MemberlistKV}, + IndexGateway: {Server, Store, Overrides, UsageReport, MemberlistKV}, IngesterQuerier: {Ring}, IndexGatewayRing: {RuntimeConfig, Server, MemberlistKV}, All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index efdebddb96605..5d173e32a1199 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -413,7 +413,7 @@ func (t *Loki) initStore() (_ services.Service, err error) { // and queried as part of live data until the cache TTL expires on the index entry. t.Cfg.Ingester.RetainPeriod = t.Cfg.StorageConfig.IndexCacheValidity + 1*time.Minute t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg) + 2*time.Minute - case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read): + case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read), t.isModuleActive(IndexGateway): // We do not want query to do any updates to index t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly default: @@ -437,6 +437,9 @@ func (t *Loki) initStore() (_ services.Service, err error) { // Use AsyncStore to query both ingesters local store and chunk store for store queries. // Only queriers should use the AsyncStore, it should never be used in ingesters. asyncStore = true + case t.Cfg.isModuleEnabled(IndexGateway): + // we want to use the actual storage when running the index-gateway, so we remove the Addr from the config + t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Address = "" case t.Cfg.isModuleEnabled(All): // We want ingester to also query the store when using boltdb-shipper but only when running with target All. // We do not want to use AsyncStore otherwise it would start spiraling around doing queries over and over again to the ingesters and store. @@ -772,26 +775,20 @@ func (t *Loki) initCompactor() (services.Service, error) { } func (t *Loki) initIndexGateway() (services.Service, error) { - t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly t.Cfg.IndexGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort - objectClient, err := storage.NewObjectClient(t.Cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType, t.Cfg.StorageConfig, t.clientMetrics) - if err != nil { - return nil, err - } - - shipperIndexClient, err := shipper.NewShipper(t.Cfg.StorageConfig.BoltDBShipperConfig, objectClient, t.overrides, prometheus.DefaultRegisterer) + indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, prometheus.DefaultRegisterer) if err != nil { return nil, err } - - gateway, err := indexgateway.NewIndexGateway(t.Cfg.IndexGateway, util_log.Logger, prometheus.DefaultRegisterer, shipperIndexClient) + gateway, err := indexgateway.NewIndexGateway(t.Cfg.IndexGateway, util_log.Logger, prometheus.DefaultRegisterer, t.Store, indexClient) if err != nil { return nil, err } t.Server.HTTP.Path("/indexgateway/ring").Methods("GET", "POST").Handler(gateway) + indexgatewaypb.RegisterIndexGatewayServer(t.Server.GRPC, gateway) return gateway, nil } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index db5dbe7b8628c..02e9eb0723fd2 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -25,6 +25,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores" "github.com/grafana/loki/pkg/storage/stores/series" "github.com/grafana/loki/pkg/storage/stores/series/index" + "github.com/grafana/loki/pkg/storage/stores/shipper" "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" ) @@ -196,8 +197,22 @@ func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client, schema = index.NewSchemaCaching(schema, time.Duration(s.storeCfg.CacheLookupsOlderThan)) } - return series.NewWriter(f, s.schemaCfg, idx, schema, s.writeDedupeCache, s.storeCfg.DisableIndexDeduplication), - series.NewIndexStore(s.schemaCfg, schema, idx, f, s.cfg.MaxChunkBatchSize), + var ( + writer stores.ChunkWriter = series.NewWriter(f, s.schemaCfg, idx, schema, s.writeDedupeCache, s.storeCfg.DisableIndexDeduplication) + seriesdIndex *series.IndexStore = series.NewIndexStore(s.schemaCfg, schema, idx, f, s.cfg.MaxChunkBatchSize) + index stores.Index = seriesdIndex + ) + if s.cfg.BoltDBShipperConfig.Mode == shipper.ModeReadOnly && (s.cfg.BoltDBShipperConfig.IndexGatewayClientConfig.Address != "" || s.cfg.BoltDBShipperConfig.IndexGatewayClientConfig.Ring != nil) { + // inject the index-gateway client into the index store + gw, err := shipper.NewGatewayClient(s.cfg.BoltDBShipperConfig.IndexGatewayClientConfig, indexClientReg, s.logger) + if err != nil { + return nil, nil, nil, err + } + index = series.NewIndexGatewayClientStore(gw, seriesdIndex) + } + + return writer, + index, func() { chunkClient.Stop() f.Stop() diff --git a/pkg/storage/stores/series/series_index_gateway_store.go b/pkg/storage/stores/series/series_index_gateway_store.go new file mode 100644 index 0000000000000..c10225ecf6f72 --- /dev/null +++ b/pkg/storage/stores/series/series_index_gateway_store.go @@ -0,0 +1,104 @@ +package series + +import ( + "context" + + "github.com/gogo/status" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/syntax" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb" +) + +type IndexGatewayClientStore struct { + client IndexGatewayClient + *IndexStore +} + +type IndexGatewayClient interface { + GetChunkRef(ctx context.Context, in *indexgatewaypb.GetChunkRefRequest, opts ...grpc.CallOption) (*indexgatewaypb.GetChunkRefResponse, error) + LabelNamesForMetricName(ctx context.Context, in *indexgatewaypb.LabelNamesForMetricNameRequest, opts ...grpc.CallOption) (*indexgatewaypb.LabelResponse, error) + LabelValuesForMetricName(ctx context.Context, in *indexgatewaypb.LabelValuesForMetricNameRequest, opts ...grpc.CallOption) (*indexgatewaypb.LabelResponse, error) +} + +func NewIndexGatewayClientStore(client IndexGatewayClient, index *IndexStore) *IndexGatewayClientStore { + return &IndexGatewayClientStore{ + client: client, + IndexStore: index, + } +} + +func (c *IndexGatewayClientStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, allMatchers ...*labels.Matcher) ([]logproto.ChunkRef, error) { + response, err := c.client.GetChunkRef(ctx, &indexgatewaypb.GetChunkRefRequest{ + From: from, + Through: through, + Matchers: (&syntax.MatchersExpr{Mts: allMatchers}).String(), + }) + if err != nil { + if isUnimplementedCallError(err) { + // Handle communication with older index gateways gracefully, by falling back to the index store calls. + return c.IndexStore.GetChunkRefs(ctx, userID, from, through, allMatchers...) + } + return nil, err + } + result := make([]logproto.ChunkRef, len(response.Refs)) + for i, ref := range response.Refs { + result[i] = *ref + } + + return result, nil +} + +func (c *IndexGatewayClientStore) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) { + refs, err := c.GetChunkRefs(ctx, userID, from, through, matchers...) + if err != nil { + return nil, err + } + return c.chunksToSeries(ctx, refs, matchers) +} + +// LabelNamesForMetricName retrieves all label names for a metric name. +func (c *IndexGatewayClientStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) { + resp, err := c.client.LabelNamesForMetricName(ctx, &indexgatewaypb.LabelNamesForMetricNameRequest{ + MetricName: metricName, + From: from, + Through: through, + }) + if isUnimplementedCallError(err) { + // Handle communication with older index gateways gracefully, by falling back to the index store calls. + return c.IndexStore.LabelNamesForMetricName(ctx, userID, from, through, metricName) + } + return resp.Values, err +} + +func (c *IndexGatewayClientStore) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) { + resp, err := c.client.LabelValuesForMetricName(ctx, &indexgatewaypb.LabelValuesForMetricNameRequest{ + MetricName: metricName, + LabelName: labelName, + From: from, + Through: through, + Matchers: (&syntax.MatchersExpr{Mts: matchers}).String(), + }) + if isUnimplementedCallError(err) { + // Handle communication with older index gateways gracefully, by falling back to the index store calls. + return c.IndexStore.LabelNamesForMetricName(ctx, userID, from, through, metricName) + } + return resp.Values, err +} + +// isUnimplementedCallError tells if the GRPC error is a gRPC error with code Unimplemented. +func isUnimplementedCallError(err error) bool { + if err == nil { + return false + } + + s, ok := status.FromError(err) + if !ok { + return false + } + return (s.Code() == codes.Unimplemented) +} diff --git a/pkg/storage/stores/series/series_index_gateway_store_test.go b/pkg/storage/stores/series/series_index_gateway_store_test.go new file mode 100644 index 0000000000000..c1bbd65868edc --- /dev/null +++ b/pkg/storage/stores/series/series_index_gateway_store_test.go @@ -0,0 +1,104 @@ +package series + +import ( + "context" + "log" + "net" + "testing" + "time" + + "github.com/grafana/dskit/grpcclient" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + + "github.com/grafana/loki/pkg/storage/chunk/client/testutils" + "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores/series/index" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb" +) + +type fakeClient struct { + indexgatewaypb.IndexGatewayClient +} + +func (fakeClient) GetChunkRef(ctx context.Context, in *indexgatewaypb.GetChunkRefRequest, opts ...grpc.CallOption) (*indexgatewaypb.GetChunkRefResponse, error) { + return &indexgatewaypb.GetChunkRefResponse{}, nil +} + +func Test_IndexGatewayClient(t *testing.T) { + idx := IndexGatewayClientStore{ + client: fakeClient{}, + IndexStore: &IndexStore{ + chunkBatchSize: 1, + }, + } + _, err := idx.GetSeries(context.Background(), "foo", model.Earliest, model.Latest) + require.NoError(t, err) +} + +func Test_IndexGatewayClient_Fallback(t *testing.T) { + lis, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + s := grpc.NewServer() + + // register fake grpc service with missing methods + desc := grpc.ServiceDesc{ + ServiceName: "indexgatewaypb.IndexGateway", + HandlerType: (*indexgatewaypb.IndexGatewayServer)(nil), + Streams: []grpc.StreamDesc{ + { + StreamName: "QueryIndex", + Handler: nil, + ServerStreams: true, + }, + }, + Metadata: "pkg/storage/stores/shipper/indexgateway/indexgatewaypb/gateway.proto", + } + s.RegisterService(&desc, nil) + + go func() { + if err := s.Serve(lis); err != nil { + log.Fatalf("Failed to serve: %v", err) + } + }() + defer func() { + s.GracefulStop() + }() + + cfg := grpcclient.Config{ + MaxRecvMsgSize: 1024, + MaxSendMsgSize: 1024, + } + + dialOpts, err := cfg.DialOption(nil, nil) + require.NoError(t, err) + + conn, err := grpc.Dial(lis.Addr().String(), dialOpts...) + require.NoError(t, err) + defer conn.Close() + schemaCfg := config.SchemaConfig{ + Configs: []config.PeriodConfig{ + {From: config.DayTime{Time: model.Now().Add(-24 * time.Hour)}, Schema: "v12", RowShards: 16}, + }, + } + schema, err := index.CreateSchema(schemaCfg.Configs[0]) + require.NoError(t, err) + testutils.ResetMockStorage() + tm, err := index.NewTableManager(index.TableManagerConfig{}, schemaCfg, 2*time.Hour, testutils.NewMockStorage(), nil, nil, nil) + require.NoError(t, err) + require.NoError(t, tm.SyncTables(context.Background())) + idx := NewIndexGatewayClientStore( + indexgatewaypb.NewIndexGatewayClient(conn), + &IndexStore{ + chunkBatchSize: 1, + schema: schema, + schemaCfg: schemaCfg, + index: testutils.NewMockStorage(), + }, + ) + + _, err = idx.GetSeries(context.Background(), "foo", model.Now(), model.Now().Add(1*time.Hour), labels.MustNewMatcher(labels.MatchEqual, "__name__", "logs")) + require.NoError(t, err) +} diff --git a/pkg/storage/stores/series/series_index_store.go b/pkg/storage/stores/series/series_index_store.go index d7fe51cae7c36..c4fc5b36b56f7 100644 --- a/pkg/storage/stores/series/series_index_store.go +++ b/pkg/storage/stores/series/series_index_store.go @@ -142,9 +142,13 @@ func (c *IndexStore) GetSeries(ctx context.Context, userID string, from, through return nil, err } + return c.chunksToSeries(ctx, chks, matchers) +} + +func (c *IndexStore) chunksToSeries(ctx context.Context, in []logproto.ChunkRef, matchers []*labels.Matcher) ([]labels.Labels, error) { // download one per series and merge // group chunks by series - chunksBySeries, keys := filterChunkRefsByUniqueFingerprint(c.schemaCfg, chks) + chunksBySeries, keys := filterChunkRefsByUniqueFingerprint(c.schemaCfg, in) results := make([]labels.Labels, 0, len(chunksBySeries)) diff --git a/pkg/storage/stores/shipper/gateway_client.go b/pkg/storage/stores/shipper/gateway_client.go index 8efc15266430a..7c11cb1db7a8c 100644 --- a/pkg/storage/stores/shipper/gateway_client.go +++ b/pkg/storage/stores/shipper/gateway_client.go @@ -16,7 +16,6 @@ import ( "github.com/grafana/dskit/tenant" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/instrument" "google.golang.org/grpc" @@ -96,15 +95,26 @@ type GatewayClient struct { // If it is configured to be in ring mode, a pool of GRPC connections to all Index Gateway instances is created. // Otherwise, it creates a single GRPC connection to an Index Gateway instance running in simple mode. func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, logger log.Logger) (*GatewayClient, error) { + latency := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "loki_boltdb_shipper", + Name: "store_gateway_request_duration_seconds", + Help: "Time (in seconds) spent serving requests when using boltdb shipper store gateway", + Buckets: instrument.DefBuckets, + }, []string{"operation", "status_code"}) + if r != nil { + err := r.Register(latency) + if err != nil { + alreadyErr, ok := err.(prometheus.AlreadyRegisteredError) + if !ok { + return nil, err + } + latency = alreadyErr.ExistingCollector.(*prometheus.HistogramVec) + } + } sgClient := &GatewayClient{ - cfg: cfg, - storeGatewayClientRequestDuration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "loki_boltdb_shipper", - Name: "store_gateway_request_duration_seconds", - Help: "Time (in seconds) spent serving requests when using boltdb shipper store gateway", - Buckets: instrument.DefBuckets, - }, []string{"operation", "status_code"}), - ring: cfg.Ring, + cfg: cfg, + storeGatewayClientRequestDuration: latency, + ring: cfg.Ring, } dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(sgClient.storeGatewayClientRequestDuration)) @@ -158,6 +168,51 @@ func (s *GatewayClient) QueryPages(ctx context.Context, queries []index.Query, c }) } +func (s *GatewayClient) GetChunkRef(ctx context.Context, in *indexgatewaypb.GetChunkRefRequest, opts ...grpc.CallOption) (*indexgatewaypb.GetChunkRefResponse, error) { + if s.cfg.Mode == indexgateway.RingMode { + var ( + resp *indexgatewaypb.GetChunkRefResponse + err error + ) + err = s.ringModeDo(ctx, func(client indexgatewaypb.IndexGatewayClient) error { + resp, err = client.GetChunkRef(ctx, in, opts...) + return err + }) + return resp, err + } + return s.grpcClient.GetChunkRef(ctx, in, opts...) +} + +func (s *GatewayClient) LabelNamesForMetricName(ctx context.Context, in *indexgatewaypb.LabelNamesForMetricNameRequest, opts ...grpc.CallOption) (*indexgatewaypb.LabelResponse, error) { + if s.cfg.Mode == indexgateway.RingMode { + var ( + resp *indexgatewaypb.LabelResponse + err error + ) + err = s.ringModeDo(ctx, func(client indexgatewaypb.IndexGatewayClient) error { + resp, err = client.LabelNamesForMetricName(ctx, in, opts...) + return err + }) + return resp, err + } + return s.grpcClient.LabelNamesForMetricName(ctx, in, opts...) +} + +func (s *GatewayClient) LabelValuesForMetricName(ctx context.Context, in *indexgatewaypb.LabelValuesForMetricNameRequest, opts ...grpc.CallOption) (*indexgatewaypb.LabelResponse, error) { + if s.cfg.Mode == indexgateway.RingMode { + var ( + resp *indexgatewaypb.LabelResponse + err error + ) + err = s.ringModeDo(ctx, func(client indexgatewaypb.IndexGatewayClient) error { + resp, err = client.LabelValuesForMetricName(ctx, in, opts...) + return err + }) + return resp, err + } + return s.grpcClient.LabelValuesForMetricName(ctx, in, opts...) +} + func (s *GatewayClient) doQueries(ctx context.Context, queries []index.Query, callback index.QueryPagesCallback) error { queryKeyQueryMap := make(map[string]index.Query, len(queries)) gatewayQueries := make([]*indexgatewaypb.IndexQuery, 0, len(queries)) @@ -174,7 +229,9 @@ func (s *GatewayClient) doQueries(ctx context.Context, queries []index.Query, ca } if s.cfg.Mode == indexgateway.RingMode { - return s.ringModeDoQueries(ctx, gatewayQueries, queryKeyQueryMap, callback) + return s.ringModeDo(ctx, func(client indexgatewaypb.IndexGatewayClient) error { + return s.clientDoQueries(ctx, gatewayQueries, queryKeyQueryMap, callback, client) + }) } return s.clientDoQueries(ctx, gatewayQueries, queryKeyQueryMap, callback, s.grpcClient) @@ -184,7 +241,8 @@ func (s *GatewayClient) doQueries(ctx context.Context, queries []index.Query, ca // // It is used by both, simple and ring mode. func (s *GatewayClient) clientDoQueries(ctx context.Context, gatewayQueries []*indexgatewaypb.IndexQuery, - queryKeyQueryMap map[string]index.Query, callback index.QueryPagesCallback, client indexgatewaypb.IndexGatewayClient) error { + queryKeyQueryMap map[string]index.Query, callback index.QueryPagesCallback, client indexgatewaypb.IndexGatewayClient, +) error { streamer, err := client.QueryIndex(ctx, &indexgatewaypb.QueryIndexRequest{Queries: gatewayQueries}) if err != nil { return errors.Wrap(err, "query index") @@ -211,15 +269,9 @@ func (s *GatewayClient) clientDoQueries(ctx context.Context, gatewayQueries []*i return nil } -// ringModeDoQueries prepares an index query to be sent to the Index Gateway, and then sends it -// using the clientDoQueries implementation. -// -// The preparation and sending phase includes: -// 1. Extracting the tenant name from the query. -// 2. Fetching different Index Gateway instances assigned to the extracted tenant. -// 3. Iterating in parallel over all fetched Index Gateway instances, getting their gRPC connections -// from the pool and invoking clientDoQueries using their client. -func (s *GatewayClient) ringModeDoQueries(ctx context.Context, gatewayQueries []*indexgatewaypb.IndexQuery, queryKeyQueryMap map[string]index.Query, callback index.QueryPagesCallback) error { +// ringModeDo executes the given function for each Index Gateway instance in the ring mapping to the correct tenant in the index. +// In case of callback failure, we'll try another member of the ring for that tenant ID. +func (s *GatewayClient) ringModeDo(ctx context.Context, callback func(client indexgatewaypb.IndexGatewayClient) error) error { userID, err := tenant.TenantID(ctx) if err != nil { return errors.Wrap(err, "index gateway client get tenant ID") @@ -238,7 +290,7 @@ func (s *GatewayClient) ringModeDoQueries(ctx context.Context, gatewayQueries [] rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] }) - + var lastErr error for _, addr := range addrs { genericClient, err := s.pool.GetClientFor(addr) if err != nil { @@ -247,15 +299,16 @@ func (s *GatewayClient) ringModeDoQueries(ctx context.Context, gatewayQueries [] } client := (genericClient.(indexgatewaypb.IndexGatewayClient)) - if err := s.clientDoQueries(ctx, gatewayQueries, queryKeyQueryMap, callback, client); err != nil { - level.Error(util_log.Logger).Log("msg", fmt.Sprintf("client do queries failed for instance %s", addr), "err", err) + if err := callback(client); err != nil { + lastErr = err + level.Error(util_log.Logger).Log("msg", fmt.Sprintf("client do failed for instance %s", addr), "err", err) continue } return nil } - return fmt.Errorf("index gateway replicationSet clientDoQueries") + return lastErr } func (s *GatewayClient) NewWriteBatch() index.WriteBatch { diff --git a/pkg/storage/stores/shipper/gateway_client_test.go b/pkg/storage/stores/shipper/gateway_client_test.go index cb7ae286dd88a..83c3c53406368 100644 --- a/pkg/storage/stores/shipper/gateway_client_test.go +++ b/pkg/storage/stores/shipper/gateway_client_test.go @@ -52,7 +52,9 @@ const ( numTables = 50 ) -type mockIndexGatewayServer struct{} +type mockIndexGatewayServer struct { + indexgatewaypb.IndexGatewayServer +} func (m mockIndexGatewayServer) QueryIndex(request *indexgatewaypb.QueryIndexRequest, server indexgatewaypb.IndexGateway_QueryIndexServer) error { for i, query := range request.Queries { @@ -100,6 +102,10 @@ func (m mockIndexGatewayServer) QueryIndex(request *indexgatewaypb.QueryIndexReq return nil } +func (m mockIndexGatewayServer) GetChunkRef(context.Context, *indexgatewaypb.GetChunkRefRequest) (*indexgatewaypb.GetChunkRefResponse, error) { + return &indexgatewaypb.GetChunkRefResponse{}, nil +} + func createTestGrpcServer(t *testing.T) (func(), string) { var server mockIndexGatewayServer lis, err := net.Listen("tcp", "localhost:0") @@ -234,7 +240,7 @@ func benchmarkIndexQueries(b *testing.B, queries []index.Query) { var cfg indexgateway.Config flagext.DefaultValues(&cfg) - gw, err := indexgateway.NewIndexGateway(cfg, util_log.Logger, prometheus.DefaultRegisterer, tm) + gw, err := indexgateway.NewIndexGateway(cfg, util_log.Logger, prometheus.DefaultRegisterer, nil, tm) require.NoError(b, err) indexgatewaypb.RegisterIndexGatewayServer(s, gw) go func() { @@ -303,3 +309,18 @@ func Benchmark_QueriesMatchingLargeNumOfRows(b *testing.B) { } benchmarkIndexQueries(b, queries) } + +func TestDoubleRegistration(t *testing.T) { + r := prometheus.NewRegistry() + cleanup, storeAddress := createTestGrpcServer(t) + defer cleanup() + + _, err := NewGatewayClient(IndexGatewayClientConfig{ + Address: storeAddress, + }, r, util_log.Logger) + require.NoError(t, err) + _, err = NewGatewayClient(IndexGatewayClientConfig{ + Address: storeAddress, + }, r, util_log.Logger) + require.NoError(t, err) +} diff --git a/pkg/storage/stores/shipper/indexgateway/gateway.go b/pkg/storage/stores/shipper/indexgateway/gateway.go index a99b9104580f5..ba2e97d3f9eb3 100644 --- a/pkg/storage/stores/shipper/indexgateway/gateway.go +++ b/pkg/storage/stores/shipper/indexgateway/gateway.go @@ -12,9 +12,16 @@ import ( "github.com/grafana/dskit/kv" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" + "github.com/grafana/dskit/tenant" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/syntax" + "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/chunk/fetcher" "github.com/grafana/loki/pkg/storage/stores/series/index" "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb" "github.com/grafana/loki/pkg/storage/stores/shipper/util" @@ -36,6 +43,13 @@ const ( ) type IndexQuerier interface { + GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) + LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) + LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) + Stop() +} + +type IndexClient interface { QueryPages(ctx context.Context, queries []index.Query, callback index.QueryPagesCallback) error Stop() } @@ -44,8 +58,10 @@ type Gateway struct { services.Service indexQuerier IndexQuerier - cfg Config - log log.Logger + indexClient IndexClient + + cfg Config + log log.Logger shipper IndexQuerier @@ -60,9 +76,10 @@ type Gateway struct { // // In case it is configured to be in ring mode, a Basic Service wrapping the ring client is started. // Otherwise, it starts an Idle Service that doesn't have lifecycle hooks. -func NewIndexGateway(cfg Config, log log.Logger, registerer prometheus.Registerer, indexQuerier IndexQuerier) (*Gateway, error) { +func NewIndexGateway(cfg Config, log log.Logger, registerer prometheus.Registerer, indexQuerier IndexQuerier, indexClient IndexClient) (*Gateway, error) { g := &Gateway{ indexQuerier: indexQuerier, + indexClient: indexClient, cfg: cfg, log: log, } @@ -111,6 +128,7 @@ func NewIndexGateway(cfg Config, log log.Logger, registerer prometheus.Registere } else { g.Service = services.NewIdleService(nil, func(failureCase error) error { g.indexQuerier.Stop() + g.indexClient.Stop() return nil }) } @@ -191,7 +209,10 @@ func (g *Gateway) running(ctx context.Context) error { // Only invoked if the Index Gateway is in ring mode. func (g *Gateway) stopping(_ error) error { level.Debug(util_log.Logger).Log("msg", "stopping index gateway") - defer g.indexQuerier.Stop() + defer func() { + g.indexQuerier.Stop() + g.indexClient.Stop() + }() return services.StopManagerAndAwaitStopped(context.Background(), g.subservices) } @@ -211,7 +232,7 @@ func (g *Gateway) QueryIndex(request *indexgatewaypb.QueryIndexRequest, server i } sendBatchMtx := sync.Mutex{} - outerErr = g.indexQuerier.QueryPages(server.Context(), queries, func(query index.Query, batch index.ReadBatchResult) bool { + outerErr = g.indexClient.QueryPages(server.Context(), queries, func(query index.Query, batch index.ReadBatchResult) bool { innerErr = buildResponses(query, batch, func(response *indexgatewaypb.QueryIndexResponse) error { // do not send grpc responses concurrently. See https://github.com/grpc/grpc-go/blob/master/stream.go#L120-L123. sendBatchMtx.Lock() @@ -269,6 +290,62 @@ func buildResponses(query index.Query, batch index.ReadBatchResult, callback fun return nil } +func (g *Gateway) GetChunkRef(ctx context.Context, req *indexgatewaypb.GetChunkRefRequest) (*indexgatewaypb.GetChunkRefResponse, error) { + instanceID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + matchers, err := syntax.ParseMatchers(req.Matchers) + if err != nil { + return nil, err + } + chunks, _, err := g.indexQuerier.GetChunkRefs(ctx, instanceID, req.From, req.Through, matchers...) + if err != nil { + return nil, err + } + result := &indexgatewaypb.GetChunkRefResponse{ + Refs: make([]*logproto.ChunkRef, 0, len(chunks)), + } + for _, cs := range chunks { + for _, c := range cs { + result.Refs = append(result.Refs, &c.ChunkRef) + } + } + return result, nil +} + +func (g *Gateway) LabelNamesForMetricName(ctx context.Context, req *indexgatewaypb.LabelNamesForMetricNameRequest) (*indexgatewaypb.LabelResponse, error) { + instanceID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + names, err := g.indexQuerier.LabelNamesForMetricName(ctx, instanceID, req.From, req.Through, req.MetricName) + if err != nil { + return nil, err + } + return &indexgatewaypb.LabelResponse{ + Values: names, + }, nil +} + +func (g *Gateway) LabelValuesForMetricName(ctx context.Context, req *indexgatewaypb.LabelValuesForMetricNameRequest) (*indexgatewaypb.LabelResponse, error) { + instanceID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + matchers, err := syntax.ParseMatchers(req.Matchers) + if err != nil { + return nil, err + } + names, err := g.indexQuerier.LabelValuesForMetricName(ctx, instanceID, req.From, req.Through, req.MetricName, req.LabelName, matchers...) + if err != nil { + return nil, err + } + return &indexgatewaypb.LabelResponse{ + Values: names, + }, nil +} + // ServeHTTP serves the HTTP route /indexgateway/ring. func (g *Gateway) ServeHTTP(w http.ResponseWriter, req *http.Request) { if g.cfg.Mode == RingMode { diff --git a/pkg/storage/stores/shipper/indexgateway/gateway_test.go b/pkg/storage/stores/shipper/indexgateway/gateway_test.go index c883e2f3b9d64..c60c4c422bd16 100644 --- a/pkg/storage/stores/shipper/indexgateway/gateway_test.go +++ b/pkg/storage/stores/shipper/indexgateway/gateway_test.go @@ -128,7 +128,7 @@ func TestGateway_QueryIndex(t *testing.T) { } expectedQueryKey = util.QueryKey(query) - gateway.indexQuerier = mockIndexClient{response: &mockBatch{size: responseSize}} + gateway.indexClient = mockIndexClient{response: &mockBatch{size: responseSize}} err := gateway.QueryIndex(&indexgatewaypb.QueryIndexRequest{Queries: []*indexgatewaypb.IndexQuery{{ TableName: query.TableName, HashValue: query.HashValue, diff --git a/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/gateway.pb.go b/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/gateway.pb.go index 84503bd225ed1..65afacd002469 100644 --- a/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/gateway.pb.go +++ b/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/gateway.pb.go @@ -7,7 +7,10 @@ import ( bytes "bytes" context "context" fmt "fmt" + _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" + logproto "github.com/grafana/loki/pkg/logproto" + github_com_prometheus_common_model "github.com/prometheus/common/model" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" @@ -29,6 +32,243 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package +type LabelValuesForMetricNameRequest struct { + MetricName string `protobuf:"bytes,1,opt,name=metric_name,json=metricName,proto3" json:"metric_name,omitempty"` + LabelName string `protobuf:"bytes,2,opt,name=label_name,json=labelName,proto3" json:"label_name,omitempty"` + From github_com_prometheus_common_model.Time `protobuf:"varint,3,opt,name=from,proto3,customtype=github.com/prometheus/common/model.Time" json:"from"` + Through github_com_prometheus_common_model.Time `protobuf:"varint,4,opt,name=through,proto3,customtype=github.com/prometheus/common/model.Time" json:"through"` + Matchers string `protobuf:"bytes,5,opt,name=matchers,proto3" json:"matchers,omitempty"` +} + +func (m *LabelValuesForMetricNameRequest) Reset() { *m = LabelValuesForMetricNameRequest{} } +func (*LabelValuesForMetricNameRequest) ProtoMessage() {} +func (*LabelValuesForMetricNameRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_33a7bd4603d312b2, []int{0} +} +func (m *LabelValuesForMetricNameRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *LabelValuesForMetricNameRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_LabelValuesForMetricNameRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *LabelValuesForMetricNameRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_LabelValuesForMetricNameRequest.Merge(m, src) +} +func (m *LabelValuesForMetricNameRequest) XXX_Size() int { + return m.Size() +} +func (m *LabelValuesForMetricNameRequest) XXX_DiscardUnknown() { + xxx_messageInfo_LabelValuesForMetricNameRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_LabelValuesForMetricNameRequest proto.InternalMessageInfo + +func (m *LabelValuesForMetricNameRequest) GetMetricName() string { + if m != nil { + return m.MetricName + } + return "" +} + +func (m *LabelValuesForMetricNameRequest) GetLabelName() string { + if m != nil { + return m.LabelName + } + return "" +} + +func (m *LabelValuesForMetricNameRequest) GetMatchers() string { + if m != nil { + return m.Matchers + } + return "" +} + +type LabelNamesForMetricNameRequest struct { + MetricName string `protobuf:"bytes,1,opt,name=metric_name,json=metricName,proto3" json:"metric_name,omitempty"` + From github_com_prometheus_common_model.Time `protobuf:"varint,2,opt,name=from,proto3,customtype=github.com/prometheus/common/model.Time" json:"from"` + Through github_com_prometheus_common_model.Time `protobuf:"varint,3,opt,name=through,proto3,customtype=github.com/prometheus/common/model.Time" json:"through"` +} + +func (m *LabelNamesForMetricNameRequest) Reset() { *m = LabelNamesForMetricNameRequest{} } +func (*LabelNamesForMetricNameRequest) ProtoMessage() {} +func (*LabelNamesForMetricNameRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_33a7bd4603d312b2, []int{1} +} +func (m *LabelNamesForMetricNameRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *LabelNamesForMetricNameRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_LabelNamesForMetricNameRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *LabelNamesForMetricNameRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_LabelNamesForMetricNameRequest.Merge(m, src) +} +func (m *LabelNamesForMetricNameRequest) XXX_Size() int { + return m.Size() +} +func (m *LabelNamesForMetricNameRequest) XXX_DiscardUnknown() { + xxx_messageInfo_LabelNamesForMetricNameRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_LabelNamesForMetricNameRequest proto.InternalMessageInfo + +func (m *LabelNamesForMetricNameRequest) GetMetricName() string { + if m != nil { + return m.MetricName + } + return "" +} + +type LabelResponse struct { + Values []string `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty"` +} + +func (m *LabelResponse) Reset() { *m = LabelResponse{} } +func (*LabelResponse) ProtoMessage() {} +func (*LabelResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_33a7bd4603d312b2, []int{2} +} +func (m *LabelResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *LabelResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_LabelResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *LabelResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_LabelResponse.Merge(m, src) +} +func (m *LabelResponse) XXX_Size() int { + return m.Size() +} +func (m *LabelResponse) XXX_DiscardUnknown() { + xxx_messageInfo_LabelResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_LabelResponse proto.InternalMessageInfo + +func (m *LabelResponse) GetValues() []string { + if m != nil { + return m.Values + } + return nil +} + +type GetChunkRefRequest struct { + From github_com_prometheus_common_model.Time `protobuf:"varint,1,opt,name=from,proto3,customtype=github.com/prometheus/common/model.Time" json:"from"` + Through github_com_prometheus_common_model.Time `protobuf:"varint,2,opt,name=through,proto3,customtype=github.com/prometheus/common/model.Time" json:"through"` + Matchers string `protobuf:"bytes,3,opt,name=matchers,proto3" json:"matchers,omitempty"` +} + +func (m *GetChunkRefRequest) Reset() { *m = GetChunkRefRequest{} } +func (*GetChunkRefRequest) ProtoMessage() {} +func (*GetChunkRefRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_33a7bd4603d312b2, []int{3} +} +func (m *GetChunkRefRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetChunkRefRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetChunkRefRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetChunkRefRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetChunkRefRequest.Merge(m, src) +} +func (m *GetChunkRefRequest) XXX_Size() int { + return m.Size() +} +func (m *GetChunkRefRequest) XXX_DiscardUnknown() { + xxx_messageInfo_GetChunkRefRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_GetChunkRefRequest proto.InternalMessageInfo + +func (m *GetChunkRefRequest) GetMatchers() string { + if m != nil { + return m.Matchers + } + return "" +} + +type GetChunkRefResponse struct { + Refs []*logproto.ChunkRef `protobuf:"bytes,1,rep,name=refs,proto3" json:"refs,omitempty"` +} + +func (m *GetChunkRefResponse) Reset() { *m = GetChunkRefResponse{} } +func (*GetChunkRefResponse) ProtoMessage() {} +func (*GetChunkRefResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_33a7bd4603d312b2, []int{4} +} +func (m *GetChunkRefResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetChunkRefResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetChunkRefResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetChunkRefResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetChunkRefResponse.Merge(m, src) +} +func (m *GetChunkRefResponse) XXX_Size() int { + return m.Size() +} +func (m *GetChunkRefResponse) XXX_DiscardUnknown() { + xxx_messageInfo_GetChunkRefResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_GetChunkRefResponse proto.InternalMessageInfo + +func (m *GetChunkRefResponse) GetRefs() []*logproto.ChunkRef { + if m != nil { + return m.Refs + } + return nil +} + type QueryIndexResponse struct { QueryKey string `protobuf:"bytes,1,opt,name=QueryKey,proto3" json:"QueryKey,omitempty"` Rows []*Row `protobuf:"bytes,2,rep,name=rows,proto3" json:"rows,omitempty"` @@ -37,7 +277,7 @@ type QueryIndexResponse struct { func (m *QueryIndexResponse) Reset() { *m = QueryIndexResponse{} } func (*QueryIndexResponse) ProtoMessage() {} func (*QueryIndexResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_33a7bd4603d312b2, []int{0} + return fileDescriptor_33a7bd4603d312b2, []int{5} } func (m *QueryIndexResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -88,7 +328,7 @@ type Row struct { func (m *Row) Reset() { *m = Row{} } func (*Row) ProtoMessage() {} func (*Row) Descriptor() ([]byte, []int) { - return fileDescriptor_33a7bd4603d312b2, []int{1} + return fileDescriptor_33a7bd4603d312b2, []int{6} } func (m *Row) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -138,7 +378,7 @@ type QueryIndexRequest struct { func (m *QueryIndexRequest) Reset() { *m = QueryIndexRequest{} } func (*QueryIndexRequest) ProtoMessage() {} func (*QueryIndexRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_33a7bd4603d312b2, []int{2} + return fileDescriptor_33a7bd4603d312b2, []int{7} } func (m *QueryIndexRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -185,7 +425,7 @@ type IndexQuery struct { func (m *IndexQuery) Reset() { *m = IndexQuery{} } func (*IndexQuery) ProtoMessage() {} func (*IndexQuery) Descriptor() ([]byte, []int) { - return fileDescriptor_33a7bd4603d312b2, []int{3} + return fileDescriptor_33a7bd4603d312b2, []int{8} } func (m *IndexQuery) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -250,6 +490,11 @@ func (m *IndexQuery) GetValueEqual() []byte { } func init() { + proto.RegisterType((*LabelValuesForMetricNameRequest)(nil), "indexgatewaypb.LabelValuesForMetricNameRequest") + proto.RegisterType((*LabelNamesForMetricNameRequest)(nil), "indexgatewaypb.LabelNamesForMetricNameRequest") + proto.RegisterType((*LabelResponse)(nil), "indexgatewaypb.LabelResponse") + proto.RegisterType((*GetChunkRefRequest)(nil), "indexgatewaypb.GetChunkRefRequest") + proto.RegisterType((*GetChunkRefResponse)(nil), "indexgatewaypb.GetChunkRefResponse") proto.RegisterType((*QueryIndexResponse)(nil), "indexgatewaypb.QueryIndexResponse") proto.RegisterType((*Row)(nil), "indexgatewaypb.Row") proto.RegisterType((*QueryIndexRequest)(nil), "indexgatewaypb.QueryIndexRequest") @@ -261,42 +506,62 @@ func init() { } var fileDescriptor_33a7bd4603d312b2 = []byte{ - // 390 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x92, 0xbb, 0x4e, 0xe3, 0x40, - 0x14, 0x86, 0x3d, 0xb9, 0xec, 0x6e, 0xce, 0x46, 0x7b, 0x99, 0xdd, 0xc2, 0x8a, 0x56, 0xa3, 0xac, - 0x9b, 0x8d, 0xb6, 0x88, 0x51, 0x48, 0x47, 0x87, 0x40, 0x28, 0x42, 0x42, 0x30, 0x88, 0x48, 0x94, - 0x13, 0x71, 0x70, 0x2c, 0x42, 0xec, 0xcc, 0xd8, 0x38, 0xe9, 0x78, 0x04, 0x1e, 0x83, 0xa7, 0xa0, - 0xa6, 0x4c, 0x99, 0x92, 0x38, 0x0d, 0x65, 0x1e, 0x01, 0x79, 0x1c, 0xe2, 0x5c, 0x24, 0x2a, 0xfb, - 0x7c, 0xe7, 0xf7, 0x9c, 0xff, 0xfc, 0x1e, 0x38, 0xf0, 0x6f, 0x1c, 0x5b, 0x05, 0x9e, 0x14, 0x0e, - 0xea, 0x27, 0x2a, 0x5b, 0x75, 0x5d, 0xdf, 0x47, 0x69, 0xbb, 0xfd, 0x2b, 0x1c, 0x3a, 0x22, 0xc0, - 0x48, 0x8c, 0xd6, 0x0a, 0xbf, 0x63, 0x2f, 0xde, 0xea, 0xbe, 0xf4, 0x02, 0x8f, 0x7e, 0x5b, 0xef, - 0x5a, 0x97, 0x40, 0xcf, 0x42, 0x94, 0xa3, 0x56, 0x82, 0x39, 0x2a, 0xdf, 0xeb, 0x2b, 0xa4, 0x15, - 0xf8, 0xa2, 0xe9, 0x31, 0x8e, 0x4c, 0x52, 0x25, 0xb5, 0x12, 0x5f, 0xd6, 0xf4, 0x1f, 0x14, 0xa4, - 0x17, 0x29, 0x33, 0x57, 0xcd, 0xd7, 0xbe, 0x36, 0x7e, 0xd5, 0xd7, 0x0f, 0xac, 0x73, 0x2f, 0xe2, - 0x5a, 0x60, 0xed, 0x41, 0x9e, 0x7b, 0x11, 0x65, 0x00, 0x52, 0xf4, 0x1d, 0x6c, 0x8b, 0x5e, 0x88, - 0xfa, 0xb4, 0x32, 0x5f, 0x21, 0xf4, 0x37, 0x14, 0xef, 0x74, 0x2b, 0xa7, 0x5b, 0x69, 0x61, 0xb5, - 0xe0, 0xe7, 0xaa, 0xaf, 0x41, 0x88, 0x2a, 0xa0, 0x4d, 0xf8, 0x9c, 0x40, 0x17, 0x95, 0x49, 0xf4, - 0xf4, 0xca, 0xe6, 0x74, 0x2d, 0xd7, 0x1f, 0xf2, 0x77, 0xa9, 0xf5, 0x44, 0x00, 0x32, 0x4e, 0xff, - 0x40, 0x29, 0x10, 0x9d, 0x1e, 0x9e, 0x88, 0x5b, 0x5c, 0x2c, 0x97, 0x81, 0xa4, 0xdb, 0x15, 0xaa, - 0xdb, 0x5e, 0x3a, 0x2a, 0xf1, 0x0c, 0xd0, 0xff, 0xf0, 0x23, 0x73, 0x7e, 0x2a, 0xf1, 0xda, 0x1d, - 0x9a, 0x79, 0x6d, 0x7b, 0x8b, 0xd3, 0x1a, 0x7c, 0xcf, 0xd8, 0x79, 0x20, 0x64, 0x60, 0x16, 0xb4, - 0x74, 0x13, 0x27, 0x09, 0xe9, 0xa5, 0x0f, 0x07, 0xa1, 0xe8, 0x99, 0xc5, 0x34, 0xa1, 0x8c, 0x34, - 0x10, 0xca, 0xda, 0xff, 0x51, 0xba, 0x26, 0xbd, 0x00, 0xc8, 0xb2, 0xa1, 0x7f, 0x37, 0x33, 0xd8, - 0xca, 0xad, 0x62, 0x7d, 0x24, 0x49, 0x7f, 0xf9, 0x0e, 0xd9, 0x6f, 0x8e, 0xa7, 0xcc, 0x98, 0x4c, - 0x99, 0x31, 0x9f, 0x32, 0x72, 0x1f, 0x33, 0xf2, 0x18, 0x33, 0xf2, 0x1c, 0x33, 0x32, 0x8e, 0x19, - 0x79, 0x89, 0x19, 0x79, 0x8d, 0x99, 0x31, 0x8f, 0x19, 0x79, 0x98, 0x31, 0x63, 0x3c, 0x63, 0xc6, - 0x64, 0xc6, 0x8c, 0xce, 0x27, 0x7d, 0xaf, 0x76, 0xdf, 0x02, 0x00, 0x00, 0xff, 0xff, 0x95, 0x3c, - 0x4e, 0x5e, 0x9f, 0x02, 0x00, 0x00, -} - -func (this *QueryIndexResponse) Equal(that interface{}) bool { + // 705 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0xcd, 0x6e, 0xd3, 0x4e, + 0x10, 0xf7, 0x26, 0xe9, 0x47, 0xa6, 0xf9, 0x7f, 0xb0, 0x45, 0x10, 0x19, 0xba, 0x29, 0x46, 0xa2, + 0x11, 0x12, 0x31, 0x2a, 0xbd, 0x21, 0x2e, 0x2d, 0x50, 0x55, 0x94, 0x0a, 0x16, 0xa8, 0xe0, 0x84, + 0x9c, 0x74, 0x63, 0x87, 0xda, 0xd9, 0x74, 0x6d, 0x93, 0xf6, 0xc6, 0x0b, 0x20, 0x71, 0xe5, 0x0d, + 0x78, 0x0a, 0xc4, 0xb1, 0xc7, 0x72, 0xab, 0x38, 0x54, 0x34, 0xbd, 0x70, 0xec, 0x23, 0xa0, 0x8c, + 0x63, 0x3b, 0x1f, 0x6d, 0x91, 0x4a, 0x4f, 0xd9, 0xf9, 0xcd, 0x6f, 0x66, 0xe7, 0x37, 0xe3, 0xd9, + 0xc0, 0xc3, 0xd6, 0xa6, 0x6d, 0xfa, 0x81, 0x54, 0x96, 0x2d, 0xf0, 0x57, 0xf8, 0xa6, 0xef, 0x34, + 0x5a, 0x2d, 0xa1, 0xcc, 0x46, 0x73, 0x43, 0x6c, 0xdb, 0x56, 0x20, 0xda, 0xd6, 0xce, 0x80, 0xd1, + 0xaa, 0x9a, 0xbd, 0x53, 0xa5, 0xa5, 0x64, 0x20, 0xe9, 0xbf, 0x83, 0x5e, 0xfd, 0x5a, 0x37, 0xab, + 0x2b, 0x6d, 0xf4, 0x26, 0x87, 0x88, 0xac, 0xdf, 0xb1, 0x1b, 0x81, 0x13, 0x56, 0x2b, 0x35, 0xe9, + 0x99, 0xb6, 0xb4, 0xa5, 0x89, 0x70, 0x35, 0xac, 0xa3, 0x15, 0x85, 0x74, 0x4f, 0x11, 0xdd, 0xf8, + 0x98, 0x81, 0xd2, 0xaa, 0x55, 0x15, 0xee, 0xba, 0xe5, 0x86, 0xc2, 0x7f, 0x2c, 0xd5, 0x53, 0x11, + 0xa8, 0x46, 0x6d, 0xcd, 0xf2, 0x04, 0x17, 0x5b, 0xa1, 0xf0, 0x03, 0x5a, 0x82, 0x29, 0x0f, 0xc1, + 0xb7, 0x4d, 0xcb, 0x13, 0x45, 0x32, 0x4b, 0xca, 0x79, 0x0e, 0x5e, 0xc2, 0xa3, 0x33, 0x00, 0x6e, + 0x37, 0x47, 0xe4, 0xcf, 0xa0, 0x3f, 0x8f, 0x08, 0xba, 0x97, 0x20, 0x57, 0x57, 0xd2, 0x2b, 0x66, + 0x67, 0x49, 0x39, 0xbb, 0x68, 0xee, 0x1e, 0x94, 0xb4, 0x1f, 0x07, 0xa5, 0xb9, 0xbe, 0x42, 0x5b, + 0x4a, 0x7a, 0x22, 0x70, 0x44, 0xe8, 0x9b, 0x35, 0xe9, 0x79, 0xb2, 0x69, 0x7a, 0x72, 0x43, 0xb8, + 0x95, 0x97, 0x0d, 0x4f, 0x70, 0x0c, 0xa6, 0x2b, 0x30, 0x11, 0x38, 0x4a, 0x86, 0xb6, 0x53, 0xcc, + 0x9d, 0x2f, 0x4f, 0x1c, 0x4f, 0x75, 0x98, 0xf4, 0xac, 0xa0, 0xe6, 0x08, 0xe5, 0x17, 0xc7, 0xb0, + 0xd8, 0xc4, 0x36, 0xbe, 0x13, 0x60, 0xab, 0x71, 0xe5, 0xe7, 0x6c, 0x47, 0xac, 0x37, 0x73, 0x41, + 0x7a, 0xb3, 0x7f, 0xa7, 0xd7, 0x98, 0x83, 0x7f, 0x50, 0x12, 0x17, 0x7e, 0x4b, 0x36, 0x7d, 0x41, + 0xaf, 0xc0, 0xf8, 0x7b, 0x1c, 0x77, 0x91, 0xcc, 0x66, 0xcb, 0x79, 0xde, 0xb3, 0x8c, 0x6f, 0x04, + 0xe8, 0xb2, 0x08, 0x96, 0x9c, 0xb0, 0xb9, 0xc9, 0x45, 0x3d, 0x16, 0x1c, 0xeb, 0x21, 0x17, 0xa4, + 0x27, 0x73, 0x81, 0xf3, 0xcb, 0x0e, 0xcd, 0xef, 0x01, 0x4c, 0x0f, 0x28, 0xe8, 0x29, 0xbe, 0x05, + 0x39, 0x25, 0xea, 0x91, 0xde, 0xa9, 0x79, 0x5a, 0x49, 0x96, 0x26, 0x61, 0xa2, 0xdf, 0x78, 0x03, + 0xf4, 0x79, 0x28, 0xd4, 0xce, 0x4a, 0x77, 0xe3, 0x92, 0x68, 0x1d, 0x26, 0x11, 0x7d, 0x22, 0x76, + 0x7a, 0xe3, 0x4e, 0x6c, 0x3a, 0x07, 0x39, 0x25, 0xdb, 0x7e, 0x31, 0x83, 0x99, 0xa7, 0x2b, 0x83, + 0xbb, 0x5a, 0xe1, 0xb2, 0xcd, 0x91, 0x60, 0xdc, 0x87, 0x2c, 0x97, 0x6d, 0xca, 0x00, 0x94, 0xd5, + 0xb4, 0x05, 0xee, 0x1b, 0x66, 0x2b, 0xf0, 0x3e, 0x84, 0x5e, 0x86, 0x31, 0x9c, 0x06, 0x76, 0xa9, + 0xc0, 0x23, 0xc3, 0x58, 0x81, 0x4b, 0xfd, 0x75, 0x45, 0x73, 0x59, 0x80, 0x89, 0x2e, 0xd8, 0x10, + 0xb1, 0x2e, 0x7d, 0xf8, 0x76, 0xa4, 0x63, 0x20, 0x8f, 0xa9, 0xc6, 0x57, 0x02, 0x90, 0xe2, 0xf4, + 0x3a, 0xe4, 0x03, 0xab, 0xea, 0x8a, 0xb5, 0xf4, 0x5b, 0x4e, 0x81, 0xae, 0xd7, 0xb1, 0x7c, 0x67, + 0x3d, 0xa9, 0x28, 0xcf, 0x53, 0x80, 0xde, 0x86, 0xff, 0xd3, 0xca, 0x9f, 0x29, 0x51, 0x6f, 0x6c, + 0xe3, 0x40, 0x0a, 0x7c, 0x04, 0xa7, 0x65, 0xf8, 0x2f, 0xc5, 0x5e, 0x04, 0x96, 0x0a, 0x70, 0x8f, + 0x0b, 0x7c, 0x18, 0xee, 0x76, 0x08, 0x45, 0x3f, 0xda, 0x0a, 0x2d, 0x17, 0x17, 0xb4, 0xc0, 0xfb, + 0x90, 0xf9, 0xcf, 0x59, 0x28, 0xa0, 0x80, 0xe5, 0x48, 0x27, 0x7d, 0x05, 0x90, 0x36, 0x87, 0xde, + 0x18, 0x6e, 0xc2, 0x48, 0xe3, 0x74, 0xe3, 0x2c, 0x4a, 0x34, 0xf3, 0xbb, 0x84, 0xbe, 0x86, 0xa9, + 0xbe, 0x4f, 0x89, 0x8e, 0x04, 0x8d, 0x6e, 0x8a, 0x7e, 0xf3, 0x4c, 0x4e, 0x94, 0xd9, 0xd0, 0xe8, + 0x3b, 0xb8, 0x7a, 0xca, 0x1b, 0x43, 0x2b, 0xc3, 0x19, 0xce, 0x7e, 0x8c, 0xf4, 0x99, 0x13, 0xf9, + 0x7d, 0x77, 0xb9, 0x50, 0x3c, 0xed, 0x7d, 0xa7, 0xe6, 0x89, 0xc1, 0xa7, 0xff, 0x13, 0xfc, 0xf1, + 0xb6, 0xc5, 0x85, 0xbd, 0x43, 0xa6, 0xed, 0x1f, 0x32, 0xed, 0xf8, 0x90, 0x91, 0x0f, 0x1d, 0x46, + 0xbe, 0x74, 0x18, 0xd9, 0xed, 0x30, 0xb2, 0xd7, 0x61, 0xe4, 0x67, 0x87, 0x91, 0x5f, 0x1d, 0xa6, + 0x1d, 0x77, 0x18, 0xf9, 0x74, 0xc4, 0xb4, 0xbd, 0x23, 0xa6, 0xed, 0x1f, 0x31, 0xad, 0x3a, 0x8e, + 0xbb, 0x78, 0xef, 0x77, 0x00, 0x00, 0x00, 0xff, 0xff, 0x20, 0x48, 0x2d, 0xb9, 0x2f, 0x07, 0x00, + 0x00, +} + +func (this *LabelValuesForMetricNameRequest) Equal(that interface{}) bool { if that == nil { return this == nil } - that1, ok := that.(*QueryIndexResponse) + that1, ok := that.(*LabelValuesForMetricNameRequest) if !ok { - that2, ok := that.(QueryIndexResponse) + that2, ok := that.(LabelValuesForMetricNameRequest) if ok { that1 = &that2 } else { @@ -308,27 +573,31 @@ func (this *QueryIndexResponse) Equal(that interface{}) bool { } else if this == nil { return false } - if this.QueryKey != that1.QueryKey { + if this.MetricName != that1.MetricName { return false } - if len(this.Rows) != len(that1.Rows) { + if this.LabelName != that1.LabelName { return false } - for i := range this.Rows { - if !this.Rows[i].Equal(that1.Rows[i]) { - return false - } + if !this.From.Equal(that1.From) { + return false + } + if !this.Through.Equal(that1.Through) { + return false + } + if this.Matchers != that1.Matchers { + return false } return true } -func (this *Row) Equal(that interface{}) bool { +func (this *LabelNamesForMetricNameRequest) Equal(that interface{}) bool { if that == nil { return this == nil } - that1, ok := that.(*Row) + that1, ok := that.(*LabelNamesForMetricNameRequest) if !ok { - that2, ok := that.(Row) + that2, ok := that.(LabelNamesForMetricNameRequest) if ok { that1 = &that2 } else { @@ -340,22 +609,25 @@ func (this *Row) Equal(that interface{}) bool { } else if this == nil { return false } - if !bytes.Equal(this.RangeValue, that1.RangeValue) { + if this.MetricName != that1.MetricName { return false } - if !bytes.Equal(this.Value, that1.Value) { + if !this.From.Equal(that1.From) { + return false + } + if !this.Through.Equal(that1.Through) { return false } return true } -func (this *QueryIndexRequest) Equal(that interface{}) bool { +func (this *LabelResponse) Equal(that interface{}) bool { if that == nil { return this == nil } - that1, ok := that.(*QueryIndexRequest) + that1, ok := that.(*LabelResponse) if !ok { - that2, ok := that.(QueryIndexRequest) + that2, ok := that.(LabelResponse) if ok { that1 = &that2 } else { @@ -367,24 +639,24 @@ func (this *QueryIndexRequest) Equal(that interface{}) bool { } else if this == nil { return false } - if len(this.Queries) != len(that1.Queries) { + if len(this.Values) != len(that1.Values) { return false } - for i := range this.Queries { - if !this.Queries[i].Equal(that1.Queries[i]) { + for i := range this.Values { + if this.Values[i] != that1.Values[i] { return false } } return true } -func (this *IndexQuery) Equal(that interface{}) bool { +func (this *GetChunkRefRequest) Equal(that interface{}) bool { if that == nil { return this == nil } - that1, ok := that.(*IndexQuery) + that1, ok := that.(*GetChunkRefRequest) if !ok { - that2, ok := that.(IndexQuery) + that2, ok := that.(GetChunkRefRequest) if ok { that1 = &that2 } else { @@ -396,60 +668,267 @@ func (this *IndexQuery) Equal(that interface{}) bool { } else if this == nil { return false } - if this.TableName != that1.TableName { + if !this.From.Equal(that1.From) { return false } - if this.HashValue != that1.HashValue { + if !this.Through.Equal(that1.Through) { return false } - if !bytes.Equal(this.RangeValuePrefix, that1.RangeValuePrefix) { + if this.Matchers != that1.Matchers { return false } - if !bytes.Equal(this.RangeValueStart, that1.RangeValueStart) { + return true +} +func (this *GetChunkRefResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*GetChunkRefResponse) + if !ok { + that2, ok := that.(GetChunkRefResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { return false } - if !bytes.Equal(this.ValueEqual, that1.ValueEqual) { + if len(this.Refs) != len(that1.Refs) { return false } + for i := range this.Refs { + if !this.Refs[i].Equal(that1.Refs[i]) { + return false + } + } return true } -func (this *QueryIndexResponse) GoString() string { - if this == nil { - return "nil" +func (this *QueryIndexResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil } - s := make([]string, 0, 6) - s = append(s, "&indexgatewaypb.QueryIndexResponse{") - s = append(s, "QueryKey: "+fmt.Sprintf("%#v", this.QueryKey)+",\n") - if this.Rows != nil { - s = append(s, "Rows: "+fmt.Sprintf("%#v", this.Rows)+",\n") + + that1, ok := that.(*QueryIndexResponse) + if !ok { + that2, ok := that.(QueryIndexResponse) + if ok { + that1 = &that2 + } else { + return false + } } - s = append(s, "}") - return strings.Join(s, "") -} -func (this *Row) GoString() string { - if this == nil { - return "nil" + if that1 == nil { + return this == nil + } else if this == nil { + return false } - s := make([]string, 0, 6) - s = append(s, "&indexgatewaypb.Row{") - s = append(s, "RangeValue: "+fmt.Sprintf("%#v", this.RangeValue)+",\n") - s = append(s, "Value: "+fmt.Sprintf("%#v", this.Value)+",\n") - s = append(s, "}") - return strings.Join(s, "") -} -func (this *QueryIndexRequest) GoString() string { - if this == nil { - return "nil" + if this.QueryKey != that1.QueryKey { + return false } - s := make([]string, 0, 5) - s = append(s, "&indexgatewaypb.QueryIndexRequest{") - if this.Queries != nil { - s = append(s, "Queries: "+fmt.Sprintf("%#v", this.Queries)+",\n") + if len(this.Rows) != len(that1.Rows) { + return false } - s = append(s, "}") - return strings.Join(s, "") -} -func (this *IndexQuery) GoString() string { + for i := range this.Rows { + if !this.Rows[i].Equal(that1.Rows[i]) { + return false + } + } + return true +} +func (this *Row) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Row) + if !ok { + that2, ok := that.(Row) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if !bytes.Equal(this.RangeValue, that1.RangeValue) { + return false + } + if !bytes.Equal(this.Value, that1.Value) { + return false + } + return true +} +func (this *QueryIndexRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*QueryIndexRequest) + if !ok { + that2, ok := that.(QueryIndexRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if len(this.Queries) != len(that1.Queries) { + return false + } + for i := range this.Queries { + if !this.Queries[i].Equal(that1.Queries[i]) { + return false + } + } + return true +} +func (this *IndexQuery) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*IndexQuery) + if !ok { + that2, ok := that.(IndexQuery) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.TableName != that1.TableName { + return false + } + if this.HashValue != that1.HashValue { + return false + } + if !bytes.Equal(this.RangeValuePrefix, that1.RangeValuePrefix) { + return false + } + if !bytes.Equal(this.RangeValueStart, that1.RangeValueStart) { + return false + } + if !bytes.Equal(this.ValueEqual, that1.ValueEqual) { + return false + } + return true +} +func (this *LabelValuesForMetricNameRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 9) + s = append(s, "&indexgatewaypb.LabelValuesForMetricNameRequest{") + s = append(s, "MetricName: "+fmt.Sprintf("%#v", this.MetricName)+",\n") + s = append(s, "LabelName: "+fmt.Sprintf("%#v", this.LabelName)+",\n") + s = append(s, "From: "+fmt.Sprintf("%#v", this.From)+",\n") + s = append(s, "Through: "+fmt.Sprintf("%#v", this.Through)+",\n") + s = append(s, "Matchers: "+fmt.Sprintf("%#v", this.Matchers)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *LabelNamesForMetricNameRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&indexgatewaypb.LabelNamesForMetricNameRequest{") + s = append(s, "MetricName: "+fmt.Sprintf("%#v", this.MetricName)+",\n") + s = append(s, "From: "+fmt.Sprintf("%#v", this.From)+",\n") + s = append(s, "Through: "+fmt.Sprintf("%#v", this.Through)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *LabelResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&indexgatewaypb.LabelResponse{") + s = append(s, "Values: "+fmt.Sprintf("%#v", this.Values)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *GetChunkRefRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&indexgatewaypb.GetChunkRefRequest{") + s = append(s, "From: "+fmt.Sprintf("%#v", this.From)+",\n") + s = append(s, "Through: "+fmt.Sprintf("%#v", this.Through)+",\n") + s = append(s, "Matchers: "+fmt.Sprintf("%#v", this.Matchers)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *GetChunkRefResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&indexgatewaypb.GetChunkRefResponse{") + if this.Refs != nil { + s = append(s, "Refs: "+fmt.Sprintf("%#v", this.Refs)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *QueryIndexResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&indexgatewaypb.QueryIndexResponse{") + s = append(s, "QueryKey: "+fmt.Sprintf("%#v", this.QueryKey)+",\n") + if this.Rows != nil { + s = append(s, "Rows: "+fmt.Sprintf("%#v", this.Rows)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *Row) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&indexgatewaypb.Row{") + s = append(s, "RangeValue: "+fmt.Sprintf("%#v", this.RangeValue)+",\n") + s = append(s, "Value: "+fmt.Sprintf("%#v", this.Value)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *QueryIndexRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&indexgatewaypb.QueryIndexRequest{") + if this.Queries != nil { + s = append(s, "Queries: "+fmt.Sprintf("%#v", this.Queries)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *IndexQuery) GoString() string { if this == nil { return "nil" } @@ -487,6 +966,10 @@ type IndexGatewayClient interface { /// QueryIndex reads the indexes required for given query & sends back the batch of rows /// in rpc streams QueryIndex(ctx context.Context, in *QueryIndexRequest, opts ...grpc.CallOption) (IndexGateway_QueryIndexClient, error) + /// GetChunkRef returns chunk reference that match the provided label matchers + GetChunkRef(ctx context.Context, in *GetChunkRefRequest, opts ...grpc.CallOption) (*GetChunkRefResponse, error) + LabelNamesForMetricName(ctx context.Context, in *LabelNamesForMetricNameRequest, opts ...grpc.CallOption) (*LabelResponse, error) + LabelValuesForMetricName(ctx context.Context, in *LabelValuesForMetricNameRequest, opts ...grpc.CallOption) (*LabelResponse, error) } type indexGatewayClient struct { @@ -529,11 +1012,42 @@ func (x *indexGatewayQueryIndexClient) Recv() (*QueryIndexResponse, error) { return m, nil } +func (c *indexGatewayClient) GetChunkRef(ctx context.Context, in *GetChunkRefRequest, opts ...grpc.CallOption) (*GetChunkRefResponse, error) { + out := new(GetChunkRefResponse) + err := c.cc.Invoke(ctx, "/indexgatewaypb.IndexGateway/GetChunkRef", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *indexGatewayClient) LabelNamesForMetricName(ctx context.Context, in *LabelNamesForMetricNameRequest, opts ...grpc.CallOption) (*LabelResponse, error) { + out := new(LabelResponse) + err := c.cc.Invoke(ctx, "/indexgatewaypb.IndexGateway/LabelNamesForMetricName", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *indexGatewayClient) LabelValuesForMetricName(ctx context.Context, in *LabelValuesForMetricNameRequest, opts ...grpc.CallOption) (*LabelResponse, error) { + out := new(LabelResponse) + err := c.cc.Invoke(ctx, "/indexgatewaypb.IndexGateway/LabelValuesForMetricName", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // IndexGatewayServer is the server API for IndexGateway service. type IndexGatewayServer interface { /// QueryIndex reads the indexes required for given query & sends back the batch of rows /// in rpc streams QueryIndex(*QueryIndexRequest, IndexGateway_QueryIndexServer) error + /// GetChunkRef returns chunk reference that match the provided label matchers + GetChunkRef(context.Context, *GetChunkRefRequest) (*GetChunkRefResponse, error) + LabelNamesForMetricName(context.Context, *LabelNamesForMetricNameRequest) (*LabelResponse, error) + LabelValuesForMetricName(context.Context, *LabelValuesForMetricNameRequest) (*LabelResponse, error) } // UnimplementedIndexGatewayServer can be embedded to have forward compatible implementations. @@ -543,6 +1057,15 @@ type UnimplementedIndexGatewayServer struct { func (*UnimplementedIndexGatewayServer) QueryIndex(req *QueryIndexRequest, srv IndexGateway_QueryIndexServer) error { return status.Errorf(codes.Unimplemented, "method QueryIndex not implemented") } +func (*UnimplementedIndexGatewayServer) GetChunkRef(ctx context.Context, req *GetChunkRefRequest) (*GetChunkRefResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetChunkRef not implemented") +} +func (*UnimplementedIndexGatewayServer) LabelNamesForMetricName(ctx context.Context, req *LabelNamesForMetricNameRequest) (*LabelResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method LabelNamesForMetricName not implemented") +} +func (*UnimplementedIndexGatewayServer) LabelValuesForMetricName(ctx context.Context, req *LabelValuesForMetricNameRequest) (*LabelResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method LabelValuesForMetricName not implemented") +} func RegisterIndexGatewayServer(s *grpc.Server, srv IndexGatewayServer) { s.RegisterService(&_IndexGateway_serviceDesc, srv) @@ -569,10 +1092,77 @@ func (x *indexGatewayQueryIndexServer) Send(m *QueryIndexResponse) error { return x.ServerStream.SendMsg(m) } +func _IndexGateway_GetChunkRef_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetChunkRefRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(IndexGatewayServer).GetChunkRef(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/indexgatewaypb.IndexGateway/GetChunkRef", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(IndexGatewayServer).GetChunkRef(ctx, req.(*GetChunkRefRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _IndexGateway_LabelNamesForMetricName_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(LabelNamesForMetricNameRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(IndexGatewayServer).LabelNamesForMetricName(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/indexgatewaypb.IndexGateway/LabelNamesForMetricName", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(IndexGatewayServer).LabelNamesForMetricName(ctx, req.(*LabelNamesForMetricNameRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _IndexGateway_LabelValuesForMetricName_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(LabelValuesForMetricNameRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(IndexGatewayServer).LabelValuesForMetricName(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/indexgatewaypb.IndexGateway/LabelValuesForMetricName", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(IndexGatewayServer).LabelValuesForMetricName(ctx, req.(*LabelValuesForMetricNameRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _IndexGateway_serviceDesc = grpc.ServiceDesc{ ServiceName: "indexgatewaypb.IndexGateway", HandlerType: (*IndexGatewayServer)(nil), - Methods: []grpc.MethodDesc{}, + Methods: []grpc.MethodDesc{ + { + MethodName: "GetChunkRef", + Handler: _IndexGateway_GetChunkRef_Handler, + }, + { + MethodName: "LabelNamesForMetricName", + Handler: _IndexGateway_LabelNamesForMetricName_Handler, + }, + { + MethodName: "LabelValuesForMetricName", + Handler: _IndexGateway_LabelValuesForMetricName_Handler, + }, + }, Streams: []grpc.StreamDesc{ { StreamName: "QueryIndex", @@ -583,7 +1173,7 @@ var _IndexGateway_serviceDesc = grpc.ServiceDesc{ Metadata: "pkg/storage/stores/shipper/indexgateway/indexgatewaypb/gateway.proto", } -func (m *QueryIndexResponse) Marshal() (dAtA []byte, err error) { +func (m *LabelValuesForMetricNameRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -593,41 +1183,51 @@ func (m *QueryIndexResponse) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *QueryIndexResponse) MarshalTo(dAtA []byte) (int, error) { +func (m *LabelValuesForMetricNameRequest) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *QueryIndexResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *LabelValuesForMetricNameRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l - if len(m.Rows) > 0 { - for iNdEx := len(m.Rows) - 1; iNdEx >= 0; iNdEx-- { - { - size, err := m.Rows[iNdEx].MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintGateway(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0x12 - } + if len(m.Matchers) > 0 { + i -= len(m.Matchers) + copy(dAtA[i:], m.Matchers) + i = encodeVarintGateway(dAtA, i, uint64(len(m.Matchers))) + i-- + dAtA[i] = 0x2a } - if len(m.QueryKey) > 0 { - i -= len(m.QueryKey) - copy(dAtA[i:], m.QueryKey) - i = encodeVarintGateway(dAtA, i, uint64(len(m.QueryKey))) + if m.Through != 0 { + i = encodeVarintGateway(dAtA, i, uint64(m.Through)) + i-- + dAtA[i] = 0x20 + } + if m.From != 0 { + i = encodeVarintGateway(dAtA, i, uint64(m.From)) + i-- + dAtA[i] = 0x18 + } + if len(m.LabelName) > 0 { + i -= len(m.LabelName) + copy(dAtA[i:], m.LabelName) + i = encodeVarintGateway(dAtA, i, uint64(len(m.LabelName))) + i-- + dAtA[i] = 0x12 + } + if len(m.MetricName) > 0 { + i -= len(m.MetricName) + copy(dAtA[i:], m.MetricName) + i = encodeVarintGateway(dAtA, i, uint64(len(m.MetricName))) i-- dAtA[i] = 0xa } return len(dAtA) - i, nil } -func (m *Row) Marshal() (dAtA []byte, err error) { +func (m *LabelNamesForMetricNameRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -637,26 +1237,219 @@ func (m *Row) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *Row) MarshalTo(dAtA []byte) (int, error) { +func (m *LabelNamesForMetricNameRequest) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *Row) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *LabelNamesForMetricNameRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l - if len(m.Value) > 0 { - i -= len(m.Value) - copy(dAtA[i:], m.Value) - i = encodeVarintGateway(dAtA, i, uint64(len(m.Value))) + if m.Through != 0 { + i = encodeVarintGateway(dAtA, i, uint64(m.Through)) i-- - dAtA[i] = 0x12 + dAtA[i] = 0x18 } - if len(m.RangeValue) > 0 { - i -= len(m.RangeValue) - copy(dAtA[i:], m.RangeValue) + if m.From != 0 { + i = encodeVarintGateway(dAtA, i, uint64(m.From)) + i-- + dAtA[i] = 0x10 + } + if len(m.MetricName) > 0 { + i -= len(m.MetricName) + copy(dAtA[i:], m.MetricName) + i = encodeVarintGateway(dAtA, i, uint64(len(m.MetricName))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *LabelResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *LabelResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *LabelResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Values) > 0 { + for iNdEx := len(m.Values) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Values[iNdEx]) + copy(dAtA[i:], m.Values[iNdEx]) + i = encodeVarintGateway(dAtA, i, uint64(len(m.Values[iNdEx]))) + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *GetChunkRefRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetChunkRefRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetChunkRefRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Matchers) > 0 { + i -= len(m.Matchers) + copy(dAtA[i:], m.Matchers) + i = encodeVarintGateway(dAtA, i, uint64(len(m.Matchers))) + i-- + dAtA[i] = 0x1a + } + if m.Through != 0 { + i = encodeVarintGateway(dAtA, i, uint64(m.Through)) + i-- + dAtA[i] = 0x10 + } + if m.From != 0 { + i = encodeVarintGateway(dAtA, i, uint64(m.From)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *GetChunkRefResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetChunkRefResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetChunkRefResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Refs) > 0 { + for iNdEx := len(m.Refs) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Refs[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintGateway(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *QueryIndexResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *QueryIndexResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryIndexResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Rows) > 0 { + for iNdEx := len(m.Rows) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Rows[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintGateway(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + } + if len(m.QueryKey) > 0 { + i -= len(m.QueryKey) + copy(dAtA[i:], m.QueryKey) + i = encodeVarintGateway(dAtA, i, uint64(len(m.QueryKey))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *Row) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Row) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Row) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Value) > 0 { + i -= len(m.Value) + copy(dAtA[i:], m.Value) + i = encodeVarintGateway(dAtA, i, uint64(len(m.Value))) + i-- + dAtA[i] = 0x12 + } + if len(m.RangeValue) > 0 { + i -= len(m.RangeValue) + copy(dAtA[i:], m.RangeValue) i = encodeVarintGateway(dAtA, i, uint64(len(m.RangeValue))) i-- dAtA[i] = 0xa @@ -770,6 +1563,101 @@ func encodeVarintGateway(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return base } +func (m *LabelValuesForMetricNameRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.MetricName) + if l > 0 { + n += 1 + l + sovGateway(uint64(l)) + } + l = len(m.LabelName) + if l > 0 { + n += 1 + l + sovGateway(uint64(l)) + } + if m.From != 0 { + n += 1 + sovGateway(uint64(m.From)) + } + if m.Through != 0 { + n += 1 + sovGateway(uint64(m.Through)) + } + l = len(m.Matchers) + if l > 0 { + n += 1 + l + sovGateway(uint64(l)) + } + return n +} + +func (m *LabelNamesForMetricNameRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.MetricName) + if l > 0 { + n += 1 + l + sovGateway(uint64(l)) + } + if m.From != 0 { + n += 1 + sovGateway(uint64(m.From)) + } + if m.Through != 0 { + n += 1 + sovGateway(uint64(m.Through)) + } + return n +} + +func (m *LabelResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Values) > 0 { + for _, s := range m.Values { + l = len(s) + n += 1 + l + sovGateway(uint64(l)) + } + } + return n +} + +func (m *GetChunkRefRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.From != 0 { + n += 1 + sovGateway(uint64(m.From)) + } + if m.Through != 0 { + n += 1 + sovGateway(uint64(m.Through)) + } + l = len(m.Matchers) + if l > 0 { + n += 1 + l + sovGateway(uint64(l)) + } + return n +} + +func (m *GetChunkRefResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Refs) > 0 { + for _, e := range m.Refs { + l = e.Size() + n += 1 + l + sovGateway(uint64(l)) + } + } + return n +} + func (m *QueryIndexResponse) Size() (n int) { if m == nil { return 0 @@ -856,54 +1744,117 @@ func sovGateway(x uint64) (n int) { func sozGateway(x uint64) (n int) { return sovGateway(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } -func (this *QueryIndexResponse) String() string { +func (this *LabelValuesForMetricNameRequest) String() string { if this == nil { return "nil" } - repeatedStringForRows := "[]*Row{" - for _, f := range this.Rows { - repeatedStringForRows += strings.Replace(f.String(), "Row", "Row", 1) + "," - } - repeatedStringForRows += "}" - s := strings.Join([]string{`&QueryIndexResponse{`, - `QueryKey:` + fmt.Sprintf("%v", this.QueryKey) + `,`, - `Rows:` + repeatedStringForRows + `,`, + s := strings.Join([]string{`&LabelValuesForMetricNameRequest{`, + `MetricName:` + fmt.Sprintf("%v", this.MetricName) + `,`, + `LabelName:` + fmt.Sprintf("%v", this.LabelName) + `,`, + `From:` + fmt.Sprintf("%v", this.From) + `,`, + `Through:` + fmt.Sprintf("%v", this.Through) + `,`, + `Matchers:` + fmt.Sprintf("%v", this.Matchers) + `,`, `}`, }, "") return s } -func (this *Row) String() string { +func (this *LabelNamesForMetricNameRequest) String() string { if this == nil { return "nil" } - s := strings.Join([]string{`&Row{`, - `RangeValue:` + fmt.Sprintf("%v", this.RangeValue) + `,`, - `Value:` + fmt.Sprintf("%v", this.Value) + `,`, + s := strings.Join([]string{`&LabelNamesForMetricNameRequest{`, + `MetricName:` + fmt.Sprintf("%v", this.MetricName) + `,`, + `From:` + fmt.Sprintf("%v", this.From) + `,`, + `Through:` + fmt.Sprintf("%v", this.Through) + `,`, `}`, }, "") return s } -func (this *QueryIndexRequest) String() string { +func (this *LabelResponse) String() string { if this == nil { return "nil" } - repeatedStringForQueries := "[]*IndexQuery{" - for _, f := range this.Queries { - repeatedStringForQueries += strings.Replace(f.String(), "IndexQuery", "IndexQuery", 1) + "," - } - repeatedStringForQueries += "}" - s := strings.Join([]string{`&QueryIndexRequest{`, - `Queries:` + repeatedStringForQueries + `,`, + s := strings.Join([]string{`&LabelResponse{`, + `Values:` + fmt.Sprintf("%v", this.Values) + `,`, `}`, }, "") return s } -func (this *IndexQuery) String() string { +func (this *GetChunkRefRequest) String() string { if this == nil { return "nil" } - s := strings.Join([]string{`&IndexQuery{`, - `TableName:` + fmt.Sprintf("%v", this.TableName) + `,`, + s := strings.Join([]string{`&GetChunkRefRequest{`, + `From:` + fmt.Sprintf("%v", this.From) + `,`, + `Through:` + fmt.Sprintf("%v", this.Through) + `,`, + `Matchers:` + fmt.Sprintf("%v", this.Matchers) + `,`, + `}`, + }, "") + return s +} +func (this *GetChunkRefResponse) String() string { + if this == nil { + return "nil" + } + repeatedStringForRefs := "[]*ChunkRef{" + for _, f := range this.Refs { + repeatedStringForRefs += strings.Replace(fmt.Sprintf("%v", f), "ChunkRef", "logproto.ChunkRef", 1) + "," + } + repeatedStringForRefs += "}" + s := strings.Join([]string{`&GetChunkRefResponse{`, + `Refs:` + repeatedStringForRefs + `,`, + `}`, + }, "") + return s +} +func (this *QueryIndexResponse) String() string { + if this == nil { + return "nil" + } + repeatedStringForRows := "[]*Row{" + for _, f := range this.Rows { + repeatedStringForRows += strings.Replace(f.String(), "Row", "Row", 1) + "," + } + repeatedStringForRows += "}" + s := strings.Join([]string{`&QueryIndexResponse{`, + `QueryKey:` + fmt.Sprintf("%v", this.QueryKey) + `,`, + `Rows:` + repeatedStringForRows + `,`, + `}`, + }, "") + return s +} +func (this *Row) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Row{`, + `RangeValue:` + fmt.Sprintf("%v", this.RangeValue) + `,`, + `Value:` + fmt.Sprintf("%v", this.Value) + `,`, + `}`, + }, "") + return s +} +func (this *QueryIndexRequest) String() string { + if this == nil { + return "nil" + } + repeatedStringForQueries := "[]*IndexQuery{" + for _, f := range this.Queries { + repeatedStringForQueries += strings.Replace(f.String(), "IndexQuery", "IndexQuery", 1) + "," + } + repeatedStringForQueries += "}" + s := strings.Join([]string{`&QueryIndexRequest{`, + `Queries:` + repeatedStringForQueries + `,`, + `}`, + }, "") + return s +} +func (this *IndexQuery) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&IndexQuery{`, + `TableName:` + fmt.Sprintf("%v", this.TableName) + `,`, `HashValue:` + fmt.Sprintf("%v", this.HashValue) + `,`, `RangeValuePrefix:` + fmt.Sprintf("%v", this.RangeValuePrefix) + `,`, `RangeValueStart:` + fmt.Sprintf("%v", this.RangeValueStart) + `,`, @@ -920,6 +1871,611 @@ func valueToStringGateway(v interface{}) string { pv := reflect.Indirect(rv).Interface() return fmt.Sprintf("*%v", pv) } +func (m *LabelValuesForMetricNameRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LabelValuesForMetricNameRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LabelValuesForMetricNameRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MetricName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGateway + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGateway + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.MetricName = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field LabelName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGateway + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGateway + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.LabelName = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field From", wireType) + } + m.From = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.From |= github_com_prometheus_common_model.Time(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Through", wireType) + } + m.Through = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Through |= github_com_prometheus_common_model.Time(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Matchers", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGateway + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGateway + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Matchers = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipGateway(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGateway + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGateway + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *LabelNamesForMetricNameRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LabelNamesForMetricNameRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LabelNamesForMetricNameRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MetricName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGateway + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGateway + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.MetricName = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field From", wireType) + } + m.From = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.From |= github_com_prometheus_common_model.Time(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Through", wireType) + } + m.Through = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Through |= github_com_prometheus_common_model.Time(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipGateway(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGateway + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGateway + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *LabelResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LabelResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LabelResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Values", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGateway + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGateway + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Values = append(m.Values, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipGateway(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGateway + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGateway + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *GetChunkRefRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetChunkRefRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetChunkRefRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field From", wireType) + } + m.From = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.From |= github_com_prometheus_common_model.Time(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Through", wireType) + } + m.Through = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Through |= github_com_prometheus_common_model.Time(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Matchers", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGateway + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGateway + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Matchers = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipGateway(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGateway + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGateway + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *GetChunkRefResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetChunkRefResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetChunkRefResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Refs", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthGateway + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthGateway + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Refs = append(m.Refs, &logproto.ChunkRef{}) + if err := m.Refs[len(m.Refs)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipGateway(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGateway + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGateway + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *QueryIndexResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/gateway.proto b/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/gateway.proto index e4b3d6d0f4ef2..24ab75d135608 100644 --- a/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/gateway.proto +++ b/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/gateway.proto @@ -2,10 +2,46 @@ syntax = "proto3"; package indexgatewaypb; +import "pkg/logproto/logproto.proto"; +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; + + service IndexGateway { /// QueryIndex reads the indexes required for given query & sends back the batch of rows /// in rpc streams rpc QueryIndex(QueryIndexRequest) returns (stream QueryIndexResponse); + /// GetChunkRef returns chunk reference that match the provided label matchers + rpc GetChunkRef(GetChunkRefRequest) returns (GetChunkRefResponse) {}; + rpc LabelNamesForMetricName(LabelNamesForMetricNameRequest) returns (LabelResponse) {}; + rpc LabelValuesForMetricName(LabelValuesForMetricNameRequest) returns (LabelResponse) {}; +} + +message LabelValuesForMetricNameRequest { + string metric_name = 1; + string label_name = 2; + int64 from = 3 [ (gogoproto.customtype) = "github.com/prometheus/common/model.Time", (gogoproto.nullable) = false]; + int64 through = 4 [ (gogoproto.customtype) = "github.com/prometheus/common/model.Time", (gogoproto.nullable) = false]; + string matchers = 5; +} + +message LabelNamesForMetricNameRequest { + string metric_name = 1; + int64 from = 2 [ (gogoproto.customtype) = "github.com/prometheus/common/model.Time", (gogoproto.nullable) = false]; + int64 through = 3 [ (gogoproto.customtype) = "github.com/prometheus/common/model.Time", (gogoproto.nullable) = false]; +} + +message LabelResponse { + repeated string values = 1; +} + +message GetChunkRefRequest { + int64 from = 1 [ (gogoproto.customtype) = "github.com/prometheus/common/model.Time", (gogoproto.nullable) = false]; + int64 through = 2 [ (gogoproto.customtype) = "github.com/prometheus/common/model.Time", (gogoproto.nullable) = false]; + string matchers = 3; +} + +message GetChunkRefResponse { + repeated logproto.ChunkRef refs = 1; } message QueryIndexResponse { @@ -29,4 +65,3 @@ message IndexQuery { bytes rangeValueStart = 4; bytes valueEqual = 5; } -