Skip to content

Commit

Permalink
Loki: Add a ring to the query scheduler to allow discovery via the ri…
Browse files Browse the repository at this point in the history
…ng as an alternative to DNS (#4424)

* Fork the frontend and scheduler so we can add support for discovery via the ring.

* Add a ring to the query scheduler which is then used by the frontend worker and querier workers to find the scheduler address as an alternative to using DNS

* update some of the forked code to use dskit grpcclient and grpcutil since #4312 was merged after the original fork was created.

* remove query scheduler dependency from querier

Signed-off-by: Trevor Whitney <trevorjwhitney@gmail.com>

* migrate logging package and make linter happy

Signed-off-by: Trevor Whitney <trevorjwhitney@gmail.com>

* add SafeReadRing for instances when scheduler is not enabled on same instance

* Doc changes from code review

A few doc fixes

Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com>

* go mod tidy on go-kit dep

* update changelog

* remove ReadRing in favor of SafeReadRing

Co-authored-by: Trevor Whitney <trevorjwhitney@gmail.com>
Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 21, 2021
1 parent 2427fab commit 2b5f300
Show file tree
Hide file tree
Showing 24 changed files with 3,309 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [4443](https://github.com/grafana/loki/pull/4443) **DylanGuedes**: Loki: Change how push API checks for contentType
* [4415](https://github.com/grafana/loki/pull/4415) **DylanGuedes**: Change default limits to common values
* [4473](https://github.com/grafana/loki/pull/4473) **trevorwhitney**: Config: add object storage configuration to common config
* [4425](https://github.com/grafana/loki/pull/4425) **trevorwhitney** and **slim-bean**: Add a ring for the query scheduler

# 2.3.0 (2021/08/06)

Expand Down
112 changes: 109 additions & 3 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ Pass the `-config.expand-env` flag at the command line to enable this way of set
# just the querier.
[querier: <querier_config>]

# The query_scheduler block configures the Loki query scheduler.
# When configured it separates the tenant query queues from the query-frontend
[query_scheduler: <query_scheduler_config>]

# The query_frontend_config configures the Loki query-frontend.
[frontend: <query_frontend_config>]

Expand Down Expand Up @@ -282,6 +286,106 @@ engine:
[max_look_back_period: <duration> | default = 30s]
```

## query_scheduler_config

The `query_scheduler_config` block configures the Loki query scheduler.

```yaml
# Maximum number of outstanding requests per tenant per query-scheduler.
# In-flight requests above this limit will fail with HTTP response status code
# 429.
# CLI flag: -query-scheduler.max-outstanding-requests-per-tenant
[max_outstanding_requests_per_tenant: <int> | default = 100]
# This configures the gRPC client used to report errors back to the
# query-frontend.
[grpc_client_config: <grpc_client_config>]
# Set to true to have the query schedulers create and place themselves in a ring.
# If no frontend_address or scheduler_address are present
# anywhere else in the configuration, Loki will toggle this value to true.
[use_scheduler_ring: <boolean> | default = false]
# The hash ring configuration. This option is required only if use_scheduler_ring is true
scheduler_ring:
# The key-value store used to share the hash ring across multiple instances.
kvstore:
# Backend storage to use for the ring. Supported values are: consul, etcd,
# inmemory, memberlist, multi.
# CLI flag: -scheduler.ring.store
[store: <string> | default = "memberlist"]
# The prefix for the keys in the store. Should end with a /.
# CLI flag: -scheduler.ring.prefix
[prefix: <string> | default = "schedulers/"]
# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: scheduler.ring
[consul: <consul_config>]
# The etcd_config configures the etcd client.
# The CLI flags prefix for this block config is: scheduler.ring
[etcd: <etcd_config>]
multi:
# Primary backend storage used by multi-client.
# CLI flag: -scheduler.ring.multi.primary
[primary: <string> | default = ""]
# Secondary backend storage used by multi-client.
# CLI flag: -scheduler.ring.multi.secondary
[secondary: <string> | default = ""]
# Mirror writes to secondary store.
# CLI flag: -scheduler.ring.multi.mirror-enabled
[mirror_enabled: <boolean> | default = false]
# Timeout for storing value to secondary store.
# CLI flag: -scheduler.ring.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]
# Interval between heartbeats sent to the ring. 0 = disabled.
# CLI flag: -scheduler.ring.heartbeat-period
[heartbeat_period: <duration> | default = 15s]
# The heartbeat timeout after which store gateways are considered unhealthy
# within the ring. 0 = never (timeout disabled). This option needs be set both
# on the store-gateway and querier when running in microservices mode.
# CLI flag: -scheduler.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
# File path where tokens are stored. If empty, tokens are neither stored at
# shutdown nor restored at startup.
# CLI flag: -scheduler.ring.tokens-file-path
[tokens_file_path: <string> | default = ""]
# True to enable zone-awareness and replicate blocks across different
# availability zones.
# CLI flag: -scheduler.ring.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]
# Name of network interface to read addresses from.
# CLI flag: -scheduler.ring.instance-interface-names
[instance_interface_names: <list of string> | default = [eth0 en0]]
# IP address to advertise in the ring.
# CLI flag: -scheduler.ring.instance-addr
[instance_addr: <list of string> | default = first from instance_interface_names]
# Port to advertise in the ring
# CLI flag: -scheduler.ring.instance-port
[instance_port: <list of string> | default = server.grpc-listen-port]
# Instance ID to register in the ring.
# CLI flag: -scheduler.ring.instance-id
[instance_id: <list of string> | default = os.Hostname()]
# The availability zone where this instance is running. Required if
# zone-awareness is enabled.
# CLI flag: -scheduler.ring.instance-availability-zone
[instance_availability_zone: <string> | default = ""]
```

## query_frontend_config

The query_frontend_config configures the Loki query-frontend.
Expand Down Expand Up @@ -315,8 +419,9 @@ The query_frontend_config configures the Loki query-frontend.
# How often to resolve the scheduler-address, in order to look for new
# query-scheduler instances.
# Also used to determine how often to poll the scheduler-ring for addresses if configured.
# CLI flag: -frontend.scheduler-dns-lookup-period
[scheduler_dns_lookup_period: <duration> | default = 10s]
[scheduler_dns_lookup_period: <duration> | default = 3s]
# Number of concurrent workers forwarding queries to single query-scheduler.
# CLI flag: -frontend.scheduler-worker-concurrency
Expand Down Expand Up @@ -776,9 +881,10 @@ The `frontend_worker_config` configures the worker - running within the Loki que
# CLI flag: -querier.worker-parallelism
[parallelism: <int> | default = 10]
# How often to query DNS.
# How often to query the frontend_address DNS to resolve frontend addresses.
# Also used to determine how often to poll the scheduler-ring for addresses if configured.
# CLI flag: -querier.dns-lookup-period
[dns_lookup_duration: <duration> | default = 10s]
[dns_lookup_duration: <duration> | default = 3s]
# The CLI flags prefix for this block config is: querier.frontend-client
[grpc_client_config: <grpc_client_config>]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
github.com/felixge/fgprof v0.9.1
github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c
github.com/fsouza/fake-gcs-server v1.7.0
github.com/go-kit/kit v0.11.0 // indirect
github.com/go-kit/log v0.2.0
github.com/go-logfmt/logfmt v0.5.1
github.com/go-redis/redis/v8 v8.11.4
Expand Down Expand Up @@ -147,7 +148,6 @@ require (
github.com/edsrzf/mmap-go v1.0.0 // indirect
github.com/felixge/httpsnoop v1.0.1 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-kit/kit v0.11.0 // indirect
github.com/go-logr/logr v0.4.0 // indirect
github.com/go-openapi/analysis v0.20.0 // indirect
github.com/go-openapi/errors v0.20.0 // indirect
Expand Down
11 changes: 10 additions & 1 deletion pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,22 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
}
}

// If nobody has defined any frontend address or scheduler address
// we can default to using the query scheduler ring for scheduler discovery.
if r.Worker.FrontendAddress == "" &&
r.Worker.SchedulerAddress == "" &&
r.Frontend.FrontendV2.SchedulerAddress == "" {
r.QueryScheduler.UseSchedulerRing = true
}

applyMemberlistConfig(r)
applyStorageConfig(r, &defaults)

return nil
}
}

// applyMemberlistConfig will change the default ingester, distributor, and ruler ring configurations to use memberlist
// applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist
// if the -memberlist.join_members config is provided. The idea here is that if a user explicitly configured the
// memberlist configuration section, they probably want to be using memberlist for all their ring configurations.
// Since a user can still explicitly override a specific ring configuration (for example, use consul for the distributor),
Expand All @@ -90,6 +98,7 @@ func applyMemberlistConfig(r *ConfigWrapper) {
r.Ingester.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr
r.Distributor.DistributorRing.KVStore.Store = memberlistStr
r.Ruler.Ring.KVStore.Store = memberlistStr
r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr
}
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ import (
"net/http"

cortex_tripper "github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/ring"
cortex_ruler "github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/ruler/rulestore"
"github.com/cortexproject/cortex/pkg/scheduler"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/fakeauth"
util_log "github.com/cortexproject/cortex/pkg/util/log"
Expand All @@ -38,8 +36,10 @@ import (
"github.com/grafana/loki/pkg/lokifrontend"
"github.com/grafana/loki/pkg/querier"
"github.com/grafana/loki/pkg/querier/queryrange"
"github.com/grafana/loki/pkg/querier/worker"
"github.com/grafana/loki/pkg/ruler"
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/scheduler"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
Expand Down Expand Up @@ -224,6 +224,7 @@ type Loki struct {
MemberlistKV *memberlist.KVInitService
compactor *compactor.Compactor
QueryFrontEndTripperware cortex_tripper.Tripperware
queryScheduler *scheduler.Scheduler

HTTPAuthMiddleware middleware.Interface
}
Expand Down Expand Up @@ -435,13 +436,13 @@ func (t *Loki) setupModuleManager() error {
Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs},
QueryFrontendTripperware: {Server, Overrides, TenantConfigs},
QueryFrontend: {QueryFrontendTripperware},
QueryScheduler: {Server, Overrides},
QueryScheduler: {Server, Overrides, MemberlistKV},
Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs},
TableManager: {Server},
Compactor: {Server, Overrides},
IndexGateway: {Server},
IngesterQuerier: {Ring},
All: {QueryFrontend, Querier, Ingester, Distributor, TableManager, Ruler},
All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler},
}

// Add IngesterQuerier as a dependency for store when target is either ingester or querier.
Expand Down
25 changes: 20 additions & 5 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,10 @@ import (

"github.com/NYTimes/gziphandler"
"github.com/cortexproject/cortex/pkg/cortex"
"github.com/cortexproject/cortex/pkg/frontend"
"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
"github.com/cortexproject/cortex/pkg/ring"
cortex_ruler "github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/scheduler"
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/log/level"
Expand All @@ -37,10 +34,13 @@ import (
"github.com/grafana/loki/pkg/ingester"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/lokifrontend/frontend"
"github.com/grafana/loki/pkg/lokifrontend/frontend/transport"
"github.com/grafana/loki/pkg/querier"
"github.com/grafana/loki/pkg/querier/queryrange"
"github.com/grafana/loki/pkg/ruler"
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/scheduler"
loki_storage "github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/cache"
Expand Down Expand Up @@ -210,6 +210,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
QuerierWorkerConfig: &t.Cfg.Worker,
QueryFrontendEnabled: t.Cfg.isModuleEnabled(QueryFrontend),
QuerySchedulerEnabled: t.Cfg.isModuleEnabled(QueryScheduler),
SchedulerRing: t.queryScheduler.SafeReadRing(),
}

var queryHandlers = map[string]http.Handler{
Expand Down Expand Up @@ -414,12 +415,20 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
level.Debug(util_log.Logger).Log("msg", "initializing query frontend", "config", fmt.Sprintf("%+v", t.Cfg.Frontend))

roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(frontend.CombinedFrontendConfig{
combinedCfg := frontend.CombinedFrontendConfig{
Handler: t.Cfg.Frontend.Handler,
FrontendV1: t.Cfg.Frontend.FrontendV1,
FrontendV2: t.Cfg.Frontend.FrontendV2,
DownstreamURL: t.Cfg.Frontend.DownstreamURL,
}, disabledShuffleShardingLimits{}, t.Cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer)
}
roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(
combinedCfg,
t.queryScheduler.SafeReadRing(),
disabledShuffleShardingLimits{},
t.Cfg.Server.GRPCListenPort,
util_log.Logger,
prometheus.DefaultRegisterer)

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -660,13 +669,19 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
}

func (t *Loki) initQueryScheduler() (services.Service, error) {
// Set some config sections from other config sections in the config struct
t.Cfg.QueryScheduler.SchedulerRing.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.QueryScheduler.SchedulerRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV

s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.overrides, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}

schedulerpb.RegisterSchedulerForFrontendServer(t.Server.GRPC, s)
schedulerpb.RegisterSchedulerForQuerierServer(t.Server.GRPC, s)
t.Server.HTTP.Handle("/scheduler/ring", s)
t.queryScheduler = s
return s, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/lokifrontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package lokifrontend
import (
"flag"

"github.com/cortexproject/cortex/pkg/frontend/transport"
v1 "github.com/cortexproject/cortex/pkg/frontend/v1"
v2 "github.com/cortexproject/cortex/pkg/frontend/v2"
"github.com/grafana/loki/pkg/lokifrontend/frontend/transport"
v1 "github.com/grafana/loki/pkg/lokifrontend/frontend/v1"
v2 "github.com/grafana/loki/pkg/lokifrontend/frontend/v2"
)

type Config struct {
Expand Down
Loading

0 comments on commit 2b5f300

Please sign in to comment.