Skip to content

Commit

Permalink
feat(dataobj): Ensure constant sharding for the dataobj querier (#16273)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Feb 14, 2025
1 parent 49a69b2 commit 78a141c
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 6 deletions.
4 changes: 4 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,10 @@ dataobj:
# CLI flag: -dataobj-querier-from
[from: <daytime> | default = 1970-01-01]

# The number of shards to use for the dataobj querier.
# CLI flag: -dataobj-querier-shard-factor
[shard_factor: <int> | default = 32]

# The prefix to use for the storage bucket.
# CLI flag: -dataobj-storage-bucket-prefix
[storage_bucket_prefix: <string> | default = "dataobj/"]
Expand Down
15 changes: 13 additions & 2 deletions pkg/dataobj/querier/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/querier"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/config"
storageconfig "github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
Expand Down Expand Up @@ -60,13 +61,15 @@ var (
)

type Config struct {
Enabled bool `yaml:"enabled" doc:"description=Enable the dataobj querier."`
From storageconfig.DayTime `yaml:"from" doc:"description=The date of the first day of when the dataobj querier should start querying from. In YYYY-MM-DD format, for example: 2018-04-15."`
Enabled bool `yaml:"enabled" doc:"description=Enable the dataobj querier."`
From storageconfig.DayTime `yaml:"from" doc:"description=The date of the first day of when the dataobj querier should start querying from. In YYYY-MM-DD format, for example: 2018-04-15."`
ShardFactor int `yaml:"shard_factor" doc:"description=The number of shards to use for the dataobj querier."`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&c.Enabled, "dataobj-querier-enabled", false, "Enable the dataobj querier.")
f.Var(&c.From, "dataobj-querier-from", "The start time to query from.")
f.IntVar(&c.ShardFactor, "dataobj-querier-shard-factor", 32, "The number of shards to use for the dataobj querier.")
}

func (c *Config) Validate() error {
Expand All @@ -76,6 +79,14 @@ func (c *Config) Validate() error {
return nil
}

func (c *Config) PeriodConfig() config.PeriodConfig {
return config.PeriodConfig{
From: c.From,
RowShards: uint32(c.ShardFactor),
Schema: "v13",
}
}

// Store implements querier.Store for querying data objects.
type Store struct {
bucket objstore.Bucket
Expand Down
13 changes: 12 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http/httputil"
"net/url"
"os"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -1056,13 +1057,23 @@ func (i ingesterQueryOptions) QueryIngestersWithin() time.Duration {
func (t *Loki) initQueryFrontendMiddleware() (_ services.Service, err error) {
level.Debug(util_log.Logger).Log("msg", "initializing query frontend tripperware")

schemas := t.Cfg.SchemaConfig
// Adjust schema config to use constant sharding for the timerange of dataobj querier.
if t.Cfg.DataObj.Querier.Enabled {
schemas = schemas.Clone()
schemas.Configs = append(schemas.Configs, t.Cfg.DataObj.Querier.PeriodConfig())
sort.Slice(schemas.Configs, func(i, j int) bool {
return schemas.Configs[i].From.UnixNano() < schemas.Configs[j].From.UnixNano()
})
}

middleware, stopper, err := queryrange.NewMiddleware(
t.Cfg.QueryRange,
t.Cfg.Querier.Engine,
ingesterQueryOptions{t.Cfg.Querier},
util_log.Logger,
t.Overrides,
t.Cfg.SchemaConfig,
schemas,
t.cacheGenerationLoader, t.Cfg.CompactorConfig.RetentionEnabled,
prometheus.DefaultRegisterer,
t.Cfg.MetricsNamespace,
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ func WeightedParallelism(
return 0
}

func minMaxModelTime(a, b model.Time) (min, max model.Time) {
func minMaxModelTime(a, b model.Time) (model.Time, model.Time) {
if a.Before(b) {
return a, b
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ func NewMiddleware(

detectedFieldsTripperware, err := NewDetectedFieldsTripperware(
limits,
schema,
limitedTripperware,
logFilterTripperware,
)
Expand Down Expand Up @@ -1222,7 +1221,6 @@ func sharedIndexTripperware(
// NewDetectedFieldsTripperware creates a new frontend tripperware responsible for handling detected field requests, which are basically log filter requests with a bit more processing.
func NewDetectedFieldsTripperware(
limits Limits,
_ config.SchemaConfig,
limitedTripperware base.Middleware,
logTripperware base.Middleware,
) (base.Middleware, error) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/config/schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,13 @@ type SchemaConfig struct {
fileName string
}

func (cfg *SchemaConfig) Clone() SchemaConfig {
clone := *cfg
clone.Configs = make([]PeriodConfig, len(cfg.Configs))
copy(clone.Configs, cfg.Configs)
return clone
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.fileName, "schema-config-file", "", "The path to the schema config file. The schema config is used only when running Cortex with the chunks storage.")
Expand Down

0 comments on commit 78a141c

Please sign in to comment.