Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tsdb index gateway #6158

Merged
merged 5 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 21 additions & 44 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/series"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper"
Expand All @@ -32,7 +31,6 @@ import (
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/deletion"
util_log "github.com/grafana/loki/pkg/util/log"
)

var (
Expand Down Expand Up @@ -200,58 +198,37 @@ func shouldUseBoltDBIndexGatewayClient(cfg Config) bool {
return true
}

func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client, f *fetcher.Fetcher) (stores.ChunkWriter, stores.Index, func(), error) {
func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client, f *fetcher.Fetcher) (stores.ChunkWriter, series.IndexStore, func(), error) {
indexClientReg := prometheus.WrapRegistererWith(
prometheus.Labels{"component": "index-store-" + p.From.String()}, s.registerer)

if p.IndexType == config.TSDBType {
var (
nodeName = s.cfg.TSDBShipperConfig.IngesterName
dir = s.cfg.TSDBShipperConfig.ActiveIndexDirectory
)
tsdbMetrics := tsdb.NewMetrics(indexClientReg)
objectClient, err := NewObjectClient(s.cfg.TSDBShipperConfig.SharedStoreType, s.cfg, s.clientMetrics)
if err != nil {
return nil, nil, nil, err
}

shpr, err := indexshipper.NewIndexShipper(
s.cfg.TSDBShipperConfig,
objectClient,
s.limits,
tsdb.OpenShippableTSDB,
)
// ToDo(Sandeep): Avoid initializing writer when in read only mode
writer, idx, err := tsdb.NewStore(s.cfg.TSDBShipperConfig, p, f, objectClient, s.limits, indexClientReg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see tsdb.NewStore implemented yet, is that right? I'm guessing we'll just copy this over to the new function (we can also add a check on whether to enable the head manager as well).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed pushing the file. I have pushed it yesterday.

if err != nil {
return nil, nil, nil, err
}
tsdbManager := tsdb.NewTSDBManager(
nodeName,
dir,
shpr,
p.IndexTables.Period,
util_log.Logger,
tsdbMetrics,
)
// TODO(owen-d): Only need HeadManager
// on the ingester. Otherwise, the TSDBManager is sufficient
headManager := tsdb.NewHeadManager(
util_log.Logger,
dir,
tsdbMetrics,
tsdbManager,
)
if err := headManager.Start(); err != nil {
return nil, nil, nil, err
}
idx := tsdb.NewIndexClient(headManager, p)
writer := tsdb.NewChunkWriter(f, p, headManager)

// TODO(owen-d): add TSDB index-gateway support
// ToDo(Sandeep): Refactor code to not use boltdb-shipper index gateway client config
if shouldUseBoltDBIndexGatewayClient(s.cfg) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be better to have this function accept on the tsdb-shipper config as an argument instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should all be cleaned up once I get to migrate boltdb-shipper to index-shipper.

// inject the index-gateway client into the index store
gw, err := shipper.NewGatewayClient(s.cfg.BoltDBShipperConfig.IndexGatewayClientConfig, indexClientReg, s.logger)
if err != nil {
return nil, nil, nil, err
}
idx = series.NewIndexGatewayClientStore(gw, idx)
}

return writer, idx,
func() {
chunkClient.Stop()
f.Stop()
chunkClient.Stop()
objectClient.Stop()
}, nil
}

Expand All @@ -269,22 +246,22 @@ func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client,
}

var (
writer stores.ChunkWriter = series.NewWriter(f, s.schemaCfg, idx, schema, s.writeDedupeCache, s.storeCfg.DisableIndexDeduplication)
seriesdIndex *series.IndexStore = series.NewIndexStore(s.schemaCfg, schema, idx, f, s.cfg.MaxChunkBatchSize)
index stores.Index = seriesdIndex
writer stores.ChunkWriter = series.NewWriter(f, s.schemaCfg, idx, schema, s.writeDedupeCache, s.storeCfg.DisableIndexDeduplication)
indexStore = series.NewIndexStore(s.schemaCfg, schema, idx, f, s.cfg.MaxChunkBatchSize)
)

if shouldUseBoltDBIndexGatewayClient(s.cfg) {
// (Sandeep): Disable IndexGatewayClientStore for stores other than tsdb until we are ready to enable it again
/*if shouldUseBoltDBIndexGatewayClient(s.cfg) {
// inject the index-gateway client into the index store
gw, err := shipper.NewGatewayClient(s.cfg.BoltDBShipperConfig.IndexGatewayClientConfig, indexClientReg, s.logger)
if err != nil {
return nil, nil, nil, err
}
index = series.NewIndexGatewayClientStore(gw, seriesdIndex)
}
indexStore = series.NewIndexGatewayClientStore(gw, indexStore)
}*/

return writer,
index,
indexStore,
func() {
chunkClient.Stop()
f.Stop()
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/stores/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/stores/series"
"github.com/grafana/loki/pkg/util"
)

Expand Down Expand Up @@ -44,7 +45,7 @@ func NewCompositeStore(limits StoreLimits) *CompositeStore {
return &CompositeStore{compositeStore{}, limits}
}

func (c *CompositeStore) AddStore(start model.Time, fetcher *fetcher.Fetcher, index Index, writer ChunkWriter, stop func()) {
func (c *CompositeStore) AddStore(start model.Time, fetcher *fetcher.Fetcher, index series.IndexStore, writer ChunkWriter, stop func()) {
c.stores = append(c.stores, compositeStoreEntry{
start: start,
Store: &storeEntry{
Expand Down
15 changes: 2 additions & 13 deletions pkg/storage/stores/composite_store_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/errors"
"github.com/grafana/loki/pkg/storage/stores/series"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/spanlogger"
"github.com/grafana/loki/pkg/util/validation"
Expand All @@ -25,17 +25,6 @@ type StoreLimits interface {
MaxQueryLength(userID string) time.Duration
}

type Index interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we move this into series.IndexStore?

Copy link
Contributor Author

@sandeepsukhani sandeepsukhani May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to pass an interface to series.NewIndexGatewayClientStore without which it was not possible to pass tsdb index client. I had to move it to series.IndexStore to avoid circular dependency.

GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]logproto.ChunkRef, error)
GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error)
LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error)
LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error)
// SetChunkFilterer sets a chunk filter to be used when retrieving chunks.
// This is only used for GetSeries implementation.
// Todo we might want to pass it as a parameter to GetSeries instead.
SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer)
}

type ChunkWriter interface {
Put(ctx context.Context, chunks []chunk.Chunk) error
PutOne(ctx context.Context, from, through model.Time, chunk chunk.Chunk) error
Expand All @@ -50,7 +39,7 @@ type storeEntry struct {
limits StoreLimits
stop func()
fetcher *fetcher.Fetcher
index Index
index series.IndexStore
ChunkWriter
}

Expand Down
90 changes: 84 additions & 6 deletions pkg/storage/stores/series/series_index_gateway_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,123 @@ package series
import (
"context"

"github.com/gogo/status"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
)

type IndexGatewayClientStore struct {
client IndexGatewayClient
*IndexStore
IndexStore
}

type IndexGatewayClient interface {
GetChunkRef(ctx context.Context, in *indexgatewaypb.GetChunkRefRequest, opts ...grpc.CallOption) (*indexgatewaypb.GetChunkRefResponse, error)
GetSeries(ctx context.Context, in *indexgatewaypb.GetSeriesRequest, opts ...grpc.CallOption) (*indexgatewaypb.GetSeriesResponse, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3 Adding this here, thanks.

LabelNamesForMetricName(ctx context.Context, in *indexgatewaypb.LabelNamesForMetricNameRequest, opts ...grpc.CallOption) (*indexgatewaypb.LabelResponse, error)
LabelValuesForMetricName(ctx context.Context, in *indexgatewaypb.LabelValuesForMetricNameRequest, opts ...grpc.CallOption) (*indexgatewaypb.LabelResponse, error)
}

func NewIndexGatewayClientStore(client IndexGatewayClient, index *IndexStore) *IndexGatewayClientStore {
func NewIndexGatewayClientStore(client IndexGatewayClient, index IndexStore) IndexStore {
return &IndexGatewayClientStore{
client: client,
IndexStore: index,
}
}

func (c *IndexGatewayClientStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, allMatchers ...*labels.Matcher) ([]logproto.ChunkRef, error) {
return c.IndexStore.GetChunkRefs(ctx, userID, from, through, allMatchers...)
response, err := c.client.GetChunkRef(ctx, &indexgatewaypb.GetChunkRefRequest{
From: from,
Through: through,
Matchers: (&syntax.MatchersExpr{Mts: allMatchers}).String(),
})
if err != nil {
if isUnimplementedCallError(err) {
// Handle communication with older index gateways gracefully, by falling back to the index store calls.
return c.IndexStore.GetChunkRefs(ctx, userID, from, through, allMatchers...)
}
return nil, err
}
result := make([]logproto.ChunkRef, len(response.Refs))
for i, ref := range response.Refs {
result[i] = *ref
}

return result, nil
}

func (c *IndexGatewayClientStore) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
return c.IndexStore.GetSeries(ctx, userID, from, through, matchers...)
resp, err := c.client.GetSeries(ctx, &indexgatewaypb.GetSeriesRequest{
From: from,
Through: through,
Matchers: (&syntax.MatchersExpr{Mts: matchers}).String(),
})
if err != nil {
if isUnimplementedCallError(err) {
// Handle communication with older index gateways gracefully, by falling back to the index store calls.
return c.IndexStore.GetSeries(ctx, userID, from, through, matchers...)
}
return nil, err
}

result := make([]labels.Labels, len(resp.Series))
for i, s := range resp.Series {
result[i] = logproto.FromLabelAdaptersToLabels(s.Labels)
}

return result, nil
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *IndexGatewayClientStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
return c.IndexStore.LabelNamesForMetricName(ctx, userID, from, through, metricName)
resp, err := c.client.LabelNamesForMetricName(ctx, &indexgatewaypb.LabelNamesForMetricNameRequest{
MetricName: metricName,
From: from,
Through: through,
})
if isUnimplementedCallError(err) {
// Handle communication with older index gateways gracefully, by falling back to the index store calls.
return c.IndexStore.LabelNamesForMetricName(ctx, userID, from, through, metricName)
}
if err != nil {
return nil, err
}
return resp.Values, nil
}

func (c *IndexGatewayClientStore) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) {
return c.IndexStore.LabelValuesForMetricName(ctx, userID, from, through, metricName, labelName)
resp, err := c.client.LabelValuesForMetricName(ctx, &indexgatewaypb.LabelValuesForMetricNameRequest{
MetricName: metricName,
LabelName: labelName,
From: from,
Through: through,
Matchers: (&syntax.MatchersExpr{Mts: matchers}).String(),
})
if isUnimplementedCallError(err) {
// Handle communication with older index gateways gracefully, by falling back to the index store calls.
return c.IndexStore.LabelValuesForMetricName(ctx, userID, from, through, metricName, labelName, matchers...)
}
if err != nil {
return nil, err
}
return resp.Values, nil
}

// isUnimplementedCallError tells if the GRPC error is a gRPC error with code Unimplemented.
func isUnimplementedCallError(err error) bool {
if err == nil {
return false
}

s, ok := status.FromError(err)
if !ok {
return false
}
return (s.Code() == codes.Unimplemented)
}
25 changes: 7 additions & 18 deletions pkg/storage/stores/series/series_index_gateway_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,18 @@ func (fakeClient) GetChunkRef(ctx context.Context, in *indexgatewaypb.GetChunkRe
return &indexgatewaypb.GetChunkRefResponse{}, nil
}

func Test_IndexGatewayClient(t *testing.T) {
schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{
{From: config.DayTime{Time: model.Now().Add(-24 * time.Hour)}, Schema: "v12", RowShards: 16},
},
}
schema, err := index.CreateSchema(schemaCfg.Configs[0])
require.NoError(t, err)
testutils.ResetMockStorage()
tm, err := index.NewTableManager(index.TableManagerConfig{}, schemaCfg, 2*time.Hour, testutils.NewMockStorage(), nil, nil, nil)
require.NoError(t, err)
require.NoError(t, tm.SyncTables(context.Background()))
func (fakeClient) GetSeries(ctx context.Context, in *indexgatewaypb.GetSeriesRequest, opts ...grpc.CallOption) (*indexgatewaypb.GetSeriesResponse, error) {
return &indexgatewaypb.GetSeriesResponse{}, nil
}

func Test_IndexGatewayClient(t *testing.T) {
idx := IndexGatewayClientStore{
client: fakeClient{},
IndexStore: &IndexStore{
IndexStore: &indexStore{
chunkBatchSize: 1,
schema: schema,
schemaCfg: schemaCfg,
index: testutils.NewMockStorage(),
},
}
_, err = idx.GetSeries(context.Background(), "foo", model.Now(), model.Now().Add(1*time.Hour), labels.MustNewMatcher(labels.MatchEqual, "__name__", "logs"))
_, err := idx.GetSeries(context.Background(), "foo", model.Earliest, model.Latest)
require.NoError(t, err)
}

Expand Down Expand Up @@ -106,7 +95,7 @@ func Test_IndexGatewayClient_Fallback(t *testing.T) {
require.NoError(t, tm.SyncTables(context.Background()))
idx := NewIndexGatewayClientStore(
indexgatewaypb.NewIndexGatewayClient(conn),
&IndexStore{
&indexStore{
chunkBatchSize: 1,
schema: schema,
schemaCfg: schemaCfg,
Expand Down
Loading