Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a more scalable index-gateway API. #5892

Merged
merged 8 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ func (t *Loki) setupModuleManager() error {
Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs, UsageReport},
TableManager: {Server, UsageReport},
Compactor: {Server, Overrides, MemberlistKV, UsageReport},
IndexGateway: {Server, Overrides, UsageReport, MemberlistKV},
IndexGateway: {Server, Store, Overrides, UsageReport, MemberlistKV},
IngesterQuerier: {Ring},
IndexGatewayRing: {RuntimeConfig, Server, MemberlistKV},
All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor},
Expand Down
17 changes: 7 additions & 10 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
// and queried as part of live data until the cache TTL expires on the index entry.
t.Cfg.Ingester.RetainPeriod = t.Cfg.StorageConfig.IndexCacheValidity + 1*time.Minute
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg) + 2*time.Minute
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read):
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read), t.isModuleActive(IndexGateway):
// We do not want query to do any updates to index
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
default:
Expand All @@ -437,6 +437,9 @@ func (t *Loki) initStore() (_ services.Service, err error) {
// Use AsyncStore to query both ingesters local store and chunk store for store queries.
// Only queriers should use the AsyncStore, it should never be used in ingesters.
asyncStore = true
case t.Cfg.isModuleEnabled(IndexGateway):
// we want to use the actual storage when running the index-gateway, so we remove the Addr from the config
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Address = ""
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
case t.Cfg.isModuleEnabled(All):
// We want ingester to also query the store when using boltdb-shipper but only when running with target All.
// We do not want to use AsyncStore otherwise it would start spiraling around doing queries over and over again to the ingesters and store.
Expand Down Expand Up @@ -772,26 +775,20 @@ func (t *Loki) initCompactor() (services.Service, error) {
}

func (t *Loki) initIndexGateway() (services.Service, error) {
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
t.Cfg.IndexGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort

objectClient, err := storage.NewObjectClient(t.Cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType, t.Cfg.StorageConfig, t.clientMetrics)
if err != nil {
return nil, err
}

shipperIndexClient, err := shipper.NewShipper(t.Cfg.StorageConfig.BoltDBShipperConfig, objectClient, t.overrides, prometheus.DefaultRegisterer)
indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}

gateway, err := indexgateway.NewIndexGateway(t.Cfg.IndexGateway, util_log.Logger, prometheus.DefaultRegisterer, shipperIndexClient)
gateway, err := indexgateway.NewIndexGateway(t.Cfg.IndexGateway, util_log.Logger, prometheus.DefaultRegisterer, t.Store, indexClient)
if err != nil {
return nil, err
}

t.Server.HTTP.Path("/indexgateway/ring").Methods("GET", "POST").Handler(gateway)

indexgatewaypb.RegisterIndexGatewayServer(t.Server.GRPC, gateway)
return gateway, nil
}
Expand Down
19 changes: 17 additions & 2 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores"
"github.com/grafana/loki/pkg/storage/stores/series"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
)
Expand Down Expand Up @@ -196,8 +197,22 @@ func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client,
schema = index.NewSchemaCaching(schema, time.Duration(s.storeCfg.CacheLookupsOlderThan))
}

return series.NewWriter(f, s.schemaCfg, idx, schema, s.writeDedupeCache, s.storeCfg.DisableIndexDeduplication),
series.NewIndexStore(s.schemaCfg, schema, idx, f, s.cfg.MaxChunkBatchSize),
var (
writer stores.ChunkWriter = series.NewWriter(f, s.schemaCfg, idx, schema, s.writeDedupeCache, s.storeCfg.DisableIndexDeduplication)
seriesdIndex *series.IndexStore = series.NewIndexStore(s.schemaCfg, schema, idx, f, s.cfg.MaxChunkBatchSize)
index stores.Index = seriesdIndex
)
if s.cfg.BoltDBShipperConfig.Mode == shipper.ModeReadOnly && (s.cfg.BoltDBShipperConfig.IndexGatewayClientConfig.Address != "" || s.cfg.BoltDBShipperConfig.IndexGatewayClientConfig.Ring != nil) {
// inject the index-gateway client into the index store
gw, err := shipper.NewGatewayClient(s.cfg.BoltDBShipperConfig.IndexGatewayClientConfig, indexClientReg, s.logger)
if err != nil {
return nil, nil, nil, err
}
index = series.NewIndexGatewayClientStore(gw, seriesdIndex)
}

return writer,
index,
func() {
chunkClient.Stop()
f.Stop()
Expand Down
104 changes: 104 additions & 0 deletions pkg/storage/stores/series/series_index_gateway_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package series

import (
"context"

"github.com/gogo/status"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
)

type IndexGatewayClientStore struct {
client IndexGatewayClient
*IndexStore
}

type IndexGatewayClient interface {
GetChunkRef(ctx context.Context, in *indexgatewaypb.GetChunkRefRequest, opts ...grpc.CallOption) (*indexgatewaypb.GetChunkRefResponse, error)
LabelNamesForMetricName(ctx context.Context, in *indexgatewaypb.LabelNamesForMetricNameRequest, opts ...grpc.CallOption) (*indexgatewaypb.LabelResponse, error)
LabelValuesForMetricName(ctx context.Context, in *indexgatewaypb.LabelValuesForMetricNameRequest, opts ...grpc.CallOption) (*indexgatewaypb.LabelResponse, error)
}

func NewIndexGatewayClientStore(client IndexGatewayClient, index *IndexStore) *IndexGatewayClientStore {
return &IndexGatewayClientStore{
client: client,
IndexStore: index,
}
}

func (c *IndexGatewayClientStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, allMatchers ...*labels.Matcher) ([]logproto.ChunkRef, error) {
response, err := c.client.GetChunkRef(ctx, &indexgatewaypb.GetChunkRefRequest{
From: from,
Through: through,
Matchers: (&syntax.MatchersExpr{Mts: allMatchers}).String(),
})
if err != nil {
if isUnimplementedCallError(err) {
// Handle communication with older index gateways gracefully, by falling back to the index store calls.
return c.IndexStore.GetChunkRefs(ctx, userID, from, through, allMatchers...)
}
return nil, err
}
result := make([]logproto.ChunkRef, len(response.Refs))
for i, ref := range response.Refs {
result[i] = *ref
}

return result, nil
}

func (c *IndexGatewayClientStore) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
refs, err := c.GetChunkRefs(ctx, userID, from, through, matchers...)
if err != nil {
return nil, err
}
return c.chunksToSeries(ctx, refs, matchers)
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *IndexGatewayClientStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
resp, err := c.client.LabelNamesForMetricName(ctx, &indexgatewaypb.LabelNamesForMetricNameRequest{
MetricName: metricName,
From: from,
Through: through,
})
if isUnimplementedCallError(err) {
// Handle communication with older index gateways gracefully, by falling back to the index store calls.
return c.IndexStore.LabelNamesForMetricName(ctx, userID, from, through, metricName)
}
return resp.Values, err
}

func (c *IndexGatewayClientStore) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) {
resp, err := c.client.LabelValuesForMetricName(ctx, &indexgatewaypb.LabelValuesForMetricNameRequest{
MetricName: metricName,
LabelName: labelName,
From: from,
Through: through,
Matchers: (&syntax.MatchersExpr{Mts: matchers}).String(),
})
if isUnimplementedCallError(err) {
// Handle communication with older index gateways gracefully, by falling back to the index store calls.
return c.IndexStore.LabelNamesForMetricName(ctx, userID, from, through, metricName)
}
return resp.Values, err
}

// isUnimplementedCallError tells if the GRPC error is a gRPC error with code Unimplemented.
func isUnimplementedCallError(err error) bool {
if err == nil {
return false
}

s, ok := status.FromError(err)
if !ok {
return false
}
return (s.Code() == codes.Unimplemented)
}
104 changes: 104 additions & 0 deletions pkg/storage/stores/series/series_index_gateway_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package series

import (
"context"
"log"
"net"
"testing"
"time"

"github.com/grafana/dskit/grpcclient"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/grafana/loki/pkg/storage/chunk/client/testutils"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
)

type fakeClient struct {
indexgatewaypb.IndexGatewayClient
}

func (fakeClient) GetChunkRef(ctx context.Context, in *indexgatewaypb.GetChunkRefRequest, opts ...grpc.CallOption) (*indexgatewaypb.GetChunkRefResponse, error) {
return &indexgatewaypb.GetChunkRefResponse{}, nil
}

func Test_IndexGatewayClient(t *testing.T) {
idx := IndexGatewayClientStore{
client: fakeClient{},
IndexStore: &IndexStore{
chunkBatchSize: 1,
},
}
_, err := idx.GetSeries(context.Background(), "foo", model.Earliest, model.Latest)
require.NoError(t, err)
}

func Test_IndexGatewayClient_Fallback(t *testing.T) {
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
s := grpc.NewServer()

// register fake grpc service with missing methods
desc := grpc.ServiceDesc{
ServiceName: "indexgatewaypb.IndexGateway",
HandlerType: (*indexgatewaypb.IndexGatewayServer)(nil),
Streams: []grpc.StreamDesc{
{
StreamName: "QueryIndex",
Handler: nil,
ServerStreams: true,
},
},
Metadata: "pkg/storage/stores/shipper/indexgateway/indexgatewaypb/gateway.proto",
}
s.RegisterService(&desc, nil)

go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}()
defer func() {
s.GracefulStop()
}()

cfg := grpcclient.Config{
MaxRecvMsgSize: 1024,
MaxSendMsgSize: 1024,
}

dialOpts, err := cfg.DialOption(nil, nil)
require.NoError(t, err)

conn, err := grpc.Dial(lis.Addr().String(), dialOpts...)
require.NoError(t, err)
defer conn.Close()
schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{
{From: config.DayTime{Time: model.Now().Add(-24 * time.Hour)}, Schema: "v12", RowShards: 16},
},
}
schema, err := index.CreateSchema(schemaCfg.Configs[0])
require.NoError(t, err)
testutils.ResetMockStorage()
tm, err := index.NewTableManager(index.TableManagerConfig{}, schemaCfg, 2*time.Hour, testutils.NewMockStorage(), nil, nil, nil)
require.NoError(t, err)
require.NoError(t, tm.SyncTables(context.Background()))
idx := NewIndexGatewayClientStore(
indexgatewaypb.NewIndexGatewayClient(conn),
&IndexStore{
chunkBatchSize: 1,
schema: schema,
schemaCfg: schemaCfg,
index: testutils.NewMockStorage(),
},
)

_, err = idx.GetSeries(context.Background(), "foo", model.Now(), model.Now().Add(1*time.Hour), labels.MustNewMatcher(labels.MatchEqual, "__name__", "logs"))
require.NoError(t, err)
}
6 changes: 5 additions & 1 deletion pkg/storage/stores/series/series_index_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,13 @@ func (c *IndexStore) GetSeries(ctx context.Context, userID string, from, through
return nil, err
}

return c.chunksToSeries(ctx, chks, matchers)
}

func (c *IndexStore) chunksToSeries(ctx context.Context, in []logproto.ChunkRef, matchers []*labels.Matcher) ([]labels.Labels, error) {
// download one per series and merge
// group chunks by series
chunksBySeries, keys := filterChunkRefsByUniqueFingerprint(c.schemaCfg, chks)
chunksBySeries, keys := filterChunkRefsByUniqueFingerprint(c.schemaCfg, in)

results := make([]labels.Labels, 0, len(chunksBySeries))

Expand Down
Loading