diff --git a/CHANGELOG.md b/CHANGELOG.md index c6631fa6bb508..ec6db7cc15d5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * [4443](https://github.com/grafana/loki/pull/4443) **DylanGuedes**: Loki: Change how push API checks for contentType * [4415](https://github.com/grafana/loki/pull/4415) **DylanGuedes**: Change default limits to common values * [4473](https://github.com/grafana/loki/pull/4473) **trevorwhitney**: Config: add object storage configuration to common config +* [4425](https://github.com/grafana/loki/pull/4425) **trevorwhitney** and **slim-bean**: Add a ring for the query scheduler # 2.3.0 (2021/08/06) diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 963452faa93bc..68fb5bc29040a 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -97,6 +97,10 @@ Pass the `-config.expand-env` flag at the command line to enable this way of set # just the querier. [querier: ] +# The query_scheduler block configures the Loki query scheduler. +# When configured it separates the tenant query queues from the query-frontend +[query_scheduler: ] + # The query_frontend_config configures the Loki query-frontend. [frontend: ] @@ -282,6 +286,106 @@ engine: [max_look_back_period: | default = 30s] ``` +## query_scheduler_config + +The `query_scheduler_config` block configures the Loki query scheduler. + +```yaml +# Maximum number of outstanding requests per tenant per query-scheduler. +# In-flight requests above this limit will fail with HTTP response status code +# 429. +# CLI flag: -query-scheduler.max-outstanding-requests-per-tenant +[max_outstanding_requests_per_tenant: | default = 100] + +# This configures the gRPC client used to report errors back to the +# query-frontend. +[grpc_client_config: ] + +# Set to true to have the query schedulers create and place themselves in a ring. +# If no frontend_address or scheduler_address are present +# anywhere else in the configuration, Loki will toggle this value to true. +[use_scheduler_ring: | default = false] + +# The hash ring configuration. This option is required only if use_scheduler_ring is true +scheduler_ring: + # The key-value store used to share the hash ring across multiple instances. + kvstore: + # Backend storage to use for the ring. Supported values are: consul, etcd, + # inmemory, memberlist, multi. + # CLI flag: -scheduler.ring.store + [store: | default = "memberlist"] + + # The prefix for the keys in the store. Should end with a /. + # CLI flag: -scheduler.ring.prefix + [prefix: | default = "schedulers/"] + + # The consul_config configures the consul client. + # The CLI flags prefix for this block config is: scheduler.ring + [consul: ] + + # The etcd_config configures the etcd client. + # The CLI flags prefix for this block config is: scheduler.ring + [etcd: ] + + multi: + # Primary backend storage used by multi-client. + # CLI flag: -scheduler.ring.multi.primary + [primary: | default = ""] + + # Secondary backend storage used by multi-client. + # CLI flag: -scheduler.ring.multi.secondary + [secondary: | default = ""] + + # Mirror writes to secondary store. + # CLI flag: -scheduler.ring.multi.mirror-enabled + [mirror_enabled: | default = false] + + # Timeout for storing value to secondary store. + # CLI flag: -scheduler.ring.multi.mirror-timeout + [mirror_timeout: | default = 2s] + + # Interval between heartbeats sent to the ring. 0 = disabled. + # CLI flag: -scheduler.ring.heartbeat-period + [heartbeat_period: | default = 15s] + + # The heartbeat timeout after which store gateways are considered unhealthy + # within the ring. 0 = never (timeout disabled). This option needs be set both + # on the store-gateway and querier when running in microservices mode. + # CLI flag: -scheduler.ring.heartbeat-timeout + [heartbeat_timeout: | default = 1m] + + # File path where tokens are stored. If empty, tokens are neither stored at + # shutdown nor restored at startup. + # CLI flag: -scheduler.ring.tokens-file-path + [tokens_file_path: | default = ""] + + # True to enable zone-awareness and replicate blocks across different + # availability zones. + # CLI flag: -scheduler.ring.zone-awareness-enabled + [zone_awareness_enabled: | default = false] + + # Name of network interface to read addresses from. + # CLI flag: -scheduler.ring.instance-interface-names + [instance_interface_names: | default = [eth0 en0]] + + # IP address to advertise in the ring. + # CLI flag: -scheduler.ring.instance-addr + [instance_addr: | default = first from instance_interface_names] + + # Port to advertise in the ring + # CLI flag: -scheduler.ring.instance-port + [instance_port: | default = server.grpc-listen-port] + + # Instance ID to register in the ring. + # CLI flag: -scheduler.ring.instance-id + [instance_id: | default = os.Hostname()] + + # The availability zone where this instance is running. Required if + # zone-awareness is enabled. + # CLI flag: -scheduler.ring.instance-availability-zone + [instance_availability_zone: | default = ""] +``` + ## query_frontend_config The query_frontend_config configures the Loki query-frontend. @@ -315,8 +419,9 @@ The query_frontend_config configures the Loki query-frontend. # How often to resolve the scheduler-address, in order to look for new # query-scheduler instances. +# Also used to determine how often to poll the scheduler-ring for addresses if configured. # CLI flag: -frontend.scheduler-dns-lookup-period -[scheduler_dns_lookup_period: | default = 10s] +[scheduler_dns_lookup_period: | default = 3s] # Number of concurrent workers forwarding queries to single query-scheduler. # CLI flag: -frontend.scheduler-worker-concurrency @@ -776,9 +881,10 @@ The `frontend_worker_config` configures the worker - running within the Loki que # CLI flag: -querier.worker-parallelism [parallelism: | default = 10] -# How often to query DNS. +# How often to query the frontend_address DNS to resolve frontend addresses. +# Also used to determine how often to poll the scheduler-ring for addresses if configured. # CLI flag: -querier.dns-lookup-period -[dns_lookup_duration: | default = 10s] +[dns_lookup_duration: | default = 3s] # The CLI flags prefix for this block config is: querier.frontend-client [grpc_client_config: ] diff --git a/go.mod b/go.mod index eacc1a7d63970..9568996b4eb5d 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/felixge/fgprof v0.9.1 github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c github.com/fsouza/fake-gcs-server v1.7.0 + github.com/go-kit/kit v0.11.0 // indirect github.com/go-kit/log v0.2.0 github.com/go-logfmt/logfmt v0.5.1 github.com/go-redis/redis/v8 v8.11.4 @@ -147,7 +148,6 @@ require ( github.com/edsrzf/mmap-go v1.0.0 // indirect github.com/felixge/httpsnoop v1.0.1 // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect - github.com/go-kit/kit v0.11.0 // indirect github.com/go-logr/logr v0.4.0 // indirect github.com/go-openapi/analysis v0.20.0 // indirect github.com/go-openapi/errors v0.20.0 // indirect diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index ecacd94ce8a52..14a088871043b 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -73,6 +73,14 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { } } + // If nobody has defined any frontend address or scheduler address + // we can default to using the query scheduler ring for scheduler discovery. + if r.Worker.FrontendAddress == "" && + r.Worker.SchedulerAddress == "" && + r.Frontend.FrontendV2.SchedulerAddress == "" { + r.QueryScheduler.UseSchedulerRing = true + } + applyMemberlistConfig(r) applyStorageConfig(r, &defaults) @@ -80,7 +88,7 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { } } -// applyMemberlistConfig will change the default ingester, distributor, and ruler ring configurations to use memberlist +// applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist // if the -memberlist.join_members config is provided. The idea here is that if a user explicitly configured the // memberlist configuration section, they probably want to be using memberlist for all their ring configurations. // Since a user can still explicitly override a specific ring configuration (for example, use consul for the distributor), @@ -90,6 +98,7 @@ func applyMemberlistConfig(r *ConfigWrapper) { r.Ingester.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr r.Distributor.DistributorRing.KVStore.Store = memberlistStr r.Ruler.Ring.KVStore.Store = memberlistStr + r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr } } diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 192792aa422d6..d19ab4c55751c 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -8,11 +8,9 @@ import ( "net/http" cortex_tripper "github.com/cortexproject/cortex/pkg/querier/queryrange" - "github.com/cortexproject/cortex/pkg/querier/worker" "github.com/cortexproject/cortex/pkg/ring" cortex_ruler "github.com/cortexproject/cortex/pkg/ruler" "github.com/cortexproject/cortex/pkg/ruler/rulestore" - "github.com/cortexproject/cortex/pkg/scheduler" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/fakeauth" util_log "github.com/cortexproject/cortex/pkg/util/log" @@ -38,8 +36,10 @@ import ( "github.com/grafana/loki/pkg/lokifrontend" "github.com/grafana/loki/pkg/querier" "github.com/grafana/loki/pkg/querier/queryrange" + "github.com/grafana/loki/pkg/querier/worker" "github.com/grafana/loki/pkg/ruler" "github.com/grafana/loki/pkg/runtime" + "github.com/grafana/loki/pkg/scheduler" "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" @@ -224,6 +224,7 @@ type Loki struct { MemberlistKV *memberlist.KVInitService compactor *compactor.Compactor QueryFrontEndTripperware cortex_tripper.Tripperware + queryScheduler *scheduler.Scheduler HTTPAuthMiddleware middleware.Interface } @@ -435,13 +436,13 @@ func (t *Loki) setupModuleManager() error { Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs}, QueryFrontendTripperware: {Server, Overrides, TenantConfigs}, QueryFrontend: {QueryFrontendTripperware}, - QueryScheduler: {Server, Overrides}, + QueryScheduler: {Server, Overrides, MemberlistKV}, Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs}, TableManager: {Server}, Compactor: {Server, Overrides}, IndexGateway: {Server}, IngesterQuerier: {Ring}, - All: {QueryFrontend, Querier, Ingester, Distributor, TableManager, Ruler}, + All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler}, } // Add IngesterQuerier as a dependency for store when target is either ingester or querier. diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index edd46c0524d32..d898dbe1ccb75 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -12,13 +12,10 @@ import ( "github.com/NYTimes/gziphandler" "github.com/cortexproject/cortex/pkg/cortex" - "github.com/cortexproject/cortex/pkg/frontend" - "github.com/cortexproject/cortex/pkg/frontend/transport" "github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb" "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" "github.com/cortexproject/cortex/pkg/ring" cortex_ruler "github.com/cortexproject/cortex/pkg/ruler" - "github.com/cortexproject/cortex/pkg/scheduler" "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/log/level" @@ -37,10 +34,13 @@ import ( "github.com/grafana/loki/pkg/ingester" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/lokifrontend/frontend" + "github.com/grafana/loki/pkg/lokifrontend/frontend/transport" "github.com/grafana/loki/pkg/querier" "github.com/grafana/loki/pkg/querier/queryrange" "github.com/grafana/loki/pkg/ruler" "github.com/grafana/loki/pkg/runtime" + "github.com/grafana/loki/pkg/scheduler" loki_storage "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/cache" @@ -210,6 +210,7 @@ func (t *Loki) initQuerier() (services.Service, error) { QuerierWorkerConfig: &t.Cfg.Worker, QueryFrontendEnabled: t.Cfg.isModuleEnabled(QueryFrontend), QuerySchedulerEnabled: t.Cfg.isModuleEnabled(QueryScheduler), + SchedulerRing: t.queryScheduler.SafeReadRing(), } var queryHandlers = map[string]http.Handler{ @@ -414,12 +415,20 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) { func (t *Loki) initQueryFrontend() (_ services.Service, err error) { level.Debug(util_log.Logger).Log("msg", "initializing query frontend", "config", fmt.Sprintf("%+v", t.Cfg.Frontend)) - roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(frontend.CombinedFrontendConfig{ + combinedCfg := frontend.CombinedFrontendConfig{ Handler: t.Cfg.Frontend.Handler, FrontendV1: t.Cfg.Frontend.FrontendV1, FrontendV2: t.Cfg.Frontend.FrontendV2, DownstreamURL: t.Cfg.Frontend.DownstreamURL, - }, disabledShuffleShardingLimits{}, t.Cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer) + } + roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend( + combinedCfg, + t.queryScheduler.SafeReadRing(), + disabledShuffleShardingLimits{}, + t.Cfg.Server.GRPCListenPort, + util_log.Logger, + prometheus.DefaultRegisterer) + if err != nil { return nil, err } @@ -660,6 +669,10 @@ func (t *Loki) initIndexGateway() (services.Service, error) { } func (t *Loki) initQueryScheduler() (services.Service, error) { + // Set some config sections from other config sections in the config struct + t.Cfg.QueryScheduler.SchedulerRing.ListenPort = t.Cfg.Server.GRPCListenPort + t.Cfg.QueryScheduler.SchedulerRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.overrides, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, err @@ -667,6 +680,8 @@ func (t *Loki) initQueryScheduler() (services.Service, error) { schedulerpb.RegisterSchedulerForFrontendServer(t.Server.GRPC, s) schedulerpb.RegisterSchedulerForQuerierServer(t.Server.GRPC, s) + t.Server.HTTP.Handle("/scheduler/ring", s) + t.queryScheduler = s return s, nil } diff --git a/pkg/lokifrontend/config.go b/pkg/lokifrontend/config.go index 78becdfc0a970..f4764ff6b4153 100644 --- a/pkg/lokifrontend/config.go +++ b/pkg/lokifrontend/config.go @@ -3,9 +3,9 @@ package lokifrontend import ( "flag" - "github.com/cortexproject/cortex/pkg/frontend/transport" - v1 "github.com/cortexproject/cortex/pkg/frontend/v1" - v2 "github.com/cortexproject/cortex/pkg/frontend/v2" + "github.com/grafana/loki/pkg/lokifrontend/frontend/transport" + v1 "github.com/grafana/loki/pkg/lokifrontend/frontend/v1" + v2 "github.com/grafana/loki/pkg/lokifrontend/frontend/v2" ) type Config struct { diff --git a/pkg/lokifrontend/frontend/config.go b/pkg/lokifrontend/frontend/config.go new file mode 100644 index 0000000000000..74c4548750e84 --- /dev/null +++ b/pkg/lokifrontend/frontend/config.go @@ -0,0 +1,75 @@ +package frontend + +import ( + "flag" + "net/http" + + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/loki/pkg/lokifrontend/frontend/transport" + v1 "github.com/grafana/loki/pkg/lokifrontend/frontend/v1" + v2 "github.com/grafana/loki/pkg/lokifrontend/frontend/v2" +) + +// This struct combines several configuration options together to preserve backwards compatibility. +type CombinedFrontendConfig struct { + Handler transport.HandlerConfig `yaml:",inline"` + FrontendV1 v1.Config `yaml:",inline"` + FrontendV2 v2.Config `yaml:",inline"` + + DownstreamURL string `yaml:"downstream_url"` +} + +func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) { + cfg.Handler.RegisterFlags(f) + cfg.FrontendV1.RegisterFlags(f) + cfg.FrontendV2.RegisterFlags(f) + + f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.") +} + +// InitFrontend initializes frontend (either V1 -- without scheduler, or V2 -- with scheduler) or no frontend at +// all if downstream Prometheus URL is used instead. +// +// Returned RoundTripper can be wrapped in more round-tripper middlewares, and then eventually registered +// into HTTP server using the Handler from this package. Returned RoundTripper is always non-nil +// (if there are no errors), and it uses the returned frontend (if any). +func InitFrontend(cfg CombinedFrontendConfig, ring ring.ReadRing, limits v1.Limits, grpcListenPort int, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, *v1.Frontend, *v2.Frontend, error) { + switch { + case cfg.DownstreamURL != "": + // If the user has specified a downstream Prometheus, then we should use that. + rt, err := NewDownstreamRoundTripper(cfg.DownstreamURL, http.DefaultTransport) + return rt, nil, nil, err + case ring != nil: + fallthrough + case cfg.FrontendV2.SchedulerAddress != "": + // If query-scheduler address is configured, use Frontend. + if cfg.FrontendV2.Addr == "" { + addr, err := util.GetFirstAddressOf(cfg.FrontendV2.InfNames) + if err != nil { + return nil, nil, nil, errors.Wrap(err, "failed to get frontend address") + } + + cfg.FrontendV2.Addr = addr + } + + if cfg.FrontendV2.Port == 0 { + cfg.FrontendV2.Port = grpcListenPort + } + + fr, err := v2.NewFrontend(cfg.FrontendV2, ring, log, reg) + return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), nil, fr, err + + default: + // No scheduler = use original frontend. + fr, err := v1.New(cfg.FrontendV1, limits, log, reg) + if err != nil { + return nil, nil, nil, err + } + return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), fr, nil, nil + } +} diff --git a/pkg/lokifrontend/frontend/downstream_roundtripper.go b/pkg/lokifrontend/frontend/downstream_roundtripper.go new file mode 100644 index 0000000000000..d52ced81938ab --- /dev/null +++ b/pkg/lokifrontend/frontend/downstream_roundtripper.go @@ -0,0 +1,41 @@ +package frontend + +import ( + "net/http" + "net/url" + "path" + + "github.com/opentracing/opentracing-go" +) + +// RoundTripper that forwards requests to downstream URL. +type downstreamRoundTripper struct { + downstreamURL *url.URL + transport http.RoundTripper +} + +func NewDownstreamRoundTripper(downstreamURL string, transport http.RoundTripper) (http.RoundTripper, error) { + u, err := url.Parse(downstreamURL) + if err != nil { + return nil, err + } + + return &downstreamRoundTripper{downstreamURL: u, transport: transport}, nil +} + +func (d downstreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(r.Context()) + if tracer != nil && span != nil { + carrier := opentracing.HTTPHeadersCarrier(r.Header) + err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier) + if err != nil { + return nil, err + } + } + + r.URL.Scheme = d.downstreamURL.Scheme + r.URL.Host = d.downstreamURL.Host + r.URL.Path = path.Join(d.downstreamURL.Path, r.URL.Path) + r.Host = "" + return d.transport.RoundTrip(r) +} diff --git a/pkg/lokifrontend/frontend/transport/handler.go b/pkg/lokifrontend/frontend/transport/handler.go new file mode 100644 index 0000000000000..b6689802108bd --- /dev/null +++ b/pkg/lokifrontend/frontend/transport/handler.go @@ -0,0 +1,254 @@ +package transport + +import ( + "bytes" + "context" + "flag" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/httpgrpc/server" + + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util" + util_log "github.com/cortexproject/cortex/pkg/util/log" +) + +const ( + // StatusClientClosedRequest is the status code for when a client request cancellation of an http request + StatusClientClosedRequest = 499 + ServiceTimingHeaderName = "Server-Timing" +) + +var ( + errCanceled = httpgrpc.Errorf(StatusClientClosedRequest, context.Canceled.Error()) + errDeadlineExceeded = httpgrpc.Errorf(http.StatusGatewayTimeout, context.DeadlineExceeded.Error()) + errRequestEntityTooLarge = httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "http: request body too large") +) + +// Config for a Handler. +type HandlerConfig struct { + LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"` + MaxBodySize int64 `yaml:"max_body_size"` + QueryStatsEnabled bool `yaml:"query_stats_enabled"` +} + +func (cfg *HandlerConfig) RegisterFlags(f *flag.FlagSet) { + f.DurationVar(&cfg.LogQueriesLongerThan, "frontend.log-queries-longer-than", 0, "Log queries that are slower than the specified duration. Set to 0 to disable. Set to < 0 to enable on all queries.") + f.Int64Var(&cfg.MaxBodySize, "frontend.max-body-size", 10*1024*1024, "Max body size for downstream prometheus.") + f.BoolVar(&cfg.QueryStatsEnabled, "frontend.query-stats-enabled", false, "True to enable query statistics tracking. When enabled, a message with some statistics is logged for every query.") +} + +// Handler accepts queries and forwards them to RoundTripper. It can log slow queries, +// but all other logic is inside the RoundTripper. +type Handler struct { + cfg HandlerConfig + log log.Logger + roundTripper http.RoundTripper + + // Metrics. + querySeconds *prometheus.CounterVec + querySeries *prometheus.CounterVec + queryBytes *prometheus.CounterVec + activeUsers *util.ActiveUsersCleanupService +} + +// NewHandler creates a new frontend handler. +func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) http.Handler { + h := &Handler{ + cfg: cfg, + log: log, + roundTripper: roundTripper, + } + + if cfg.QueryStatsEnabled { + h.querySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_query_seconds_total", + Help: "Total amount of wall clock time spend processing queries.", + }, []string{"user"}) + + h.querySeries = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_query_fetched_series_total", + Help: "Number of series fetched to execute a query.", + }, []string{"user"}) + + h.queryBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_query_fetched_chunks_bytes_total", + Help: "Size of all chunks fetched to execute a query in bytes.", + }, []string{"user"}) + + h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) { + h.querySeconds.DeleteLabelValues(user) + h.querySeries.DeleteLabelValues(user) + h.queryBytes.DeleteLabelValues(user) + }) + // If cleaner stops or fail, we will simply not clean the metrics for inactive users. + _ = h.activeUsers.StartAsync(context.Background()) + } + + return h +} + +func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + var ( + stats *querier_stats.Stats + queryString url.Values + ) + + // Initialise the stats in the context and make sure it's propagated + // down the request chain. + if f.cfg.QueryStatsEnabled { + var ctx context.Context + stats, ctx = querier_stats.ContextWithEmptyStats(r.Context()) + r = r.WithContext(ctx) + } + + defer func() { + _ = r.Body.Close() + }() + + // Buffer the body for later use to track slow queries. + var buf bytes.Buffer + r.Body = http.MaxBytesReader(w, r.Body, f.cfg.MaxBodySize) + r.Body = ioutil.NopCloser(io.TeeReader(r.Body, &buf)) + + startTime := time.Now() + resp, err := f.roundTripper.RoundTrip(r) + queryResponseTime := time.Since(startTime) + + if err != nil { + writeError(w, err) + return + } + + hs := w.Header() + for h, vs := range resp.Header { + hs[h] = vs + } + + if f.cfg.QueryStatsEnabled { + writeServiceTimingHeader(queryResponseTime, hs, stats) + } + + w.WriteHeader(resp.StatusCode) + // we don't check for copy error as there is no much we can do at this point + _, _ = io.Copy(w, resp.Body) + + // Check whether we should parse the query string. + shouldReportSlowQuery := f.cfg.LogQueriesLongerThan > 0 && queryResponseTime > f.cfg.LogQueriesLongerThan + if shouldReportSlowQuery || f.cfg.QueryStatsEnabled { + queryString = f.parseRequestQueryString(r, buf) + } + + if shouldReportSlowQuery { + f.reportSlowQuery(r, queryString, queryResponseTime) + } + if f.cfg.QueryStatsEnabled { + f.reportQueryStats(r, queryString, queryResponseTime, stats) + } +} + +// reportSlowQuery reports slow queries. +func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, queryResponseTime time.Duration) { + logMessage := append([]interface{}{ + "msg", "slow query detected", + "method", r.Method, + "host", r.Host, + "path", r.URL.Path, + "time_taken", queryResponseTime.String(), + }, formatQueryString(queryString)...) + + level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...) +} + +func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.Stats) { + tenantIDs, err := tenant.TenantIDs(r.Context()) + if err != nil { + return + } + userID := tenant.JoinTenantIDs(tenantIDs) + wallTime := stats.LoadWallTime() + numSeries := stats.LoadFetchedSeries() + numBytes := stats.LoadFetchedChunkBytes() + + // Track stats. + f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds()) + f.querySeries.WithLabelValues(userID).Add(float64(numSeries)) + f.queryBytes.WithLabelValues(userID).Add(float64(numBytes)) + f.activeUsers.UpdateUserTimestamp(userID, time.Now()) + + // Log stats. + logMessage := append([]interface{}{ + "msg", "query stats", + "component", "query-frontend", + "method", r.Method, + "path", r.URL.Path, + "response_time", queryResponseTime, + "query_wall_time_seconds", wallTime.Seconds(), + "fetched_series_count", numSeries, + "fetched_chunks_bytes", numBytes, + }, formatQueryString(queryString)...) + + level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...) +} + +func (f *Handler) parseRequestQueryString(r *http.Request, bodyBuf bytes.Buffer) url.Values { + // Use previously buffered body. + r.Body = ioutil.NopCloser(&bodyBuf) + + // Ensure the form has been parsed so all the parameters are present + err := r.ParseForm() + if err != nil { + level.Warn(util_log.WithContext(r.Context(), f.log)).Log("msg", "unable to parse request form", "err", err) + return nil + } + + return r.Form +} + +func formatQueryString(queryString url.Values) (fields []interface{}) { + for k, v := range queryString { + fields = append(fields, fmt.Sprintf("param_%s", k), strings.Join(v, ",")) + } + return fields +} + +func writeError(w http.ResponseWriter, err error) { + switch err { + case context.Canceled: + err = errCanceled + case context.DeadlineExceeded: + err = errDeadlineExceeded + default: + if util.IsRequestBodyTooLarge(err) { + err = errRequestEntityTooLarge + } + } + server.WriteError(w, err) +} + +func writeServiceTimingHeader(queryResponseTime time.Duration, headers http.Header, stats *querier_stats.Stats) { + if stats != nil { + parts := make([]string, 0) + parts = append(parts, statsValue("querier_wall_time", stats.LoadWallTime())) + parts = append(parts, statsValue("response_time", queryResponseTime)) + headers.Set(ServiceTimingHeaderName, strings.Join(parts, ", ")) + } +} + +func statsValue(name string, d time.Duration) string { + durationInMs := strconv.FormatFloat(float64(d)/float64(time.Millisecond), 'f', -1, 64) + return name + ";dur=" + durationInMs +} diff --git a/pkg/lokifrontend/frontend/transport/roundtripper.go b/pkg/lokifrontend/frontend/transport/roundtripper.go new file mode 100644 index 0000000000000..d9ba57ccba438 --- /dev/null +++ b/pkg/lokifrontend/frontend/transport/roundtripper.go @@ -0,0 +1,58 @@ +package transport + +import ( + "bytes" + "context" + "io" + "io/ioutil" + "net/http" + + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/httpgrpc/server" +) + +// GrpcRoundTripper is similar to http.RoundTripper, but works with HTTP requests converted to protobuf messages. +type GrpcRoundTripper interface { + RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) +} + +func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper { + return &grpcRoundTripperAdapter{roundTripper: r} +} + +// This adapter wraps GrpcRoundTripper and converted it into http.RoundTripper +type grpcRoundTripperAdapter struct { + roundTripper GrpcRoundTripper +} + +type buffer struct { + buff []byte + io.ReadCloser +} + +func (b *buffer) Bytes() []byte { + return b.buff +} + +func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, error) { + req, err := server.HTTPRequest(r) + if err != nil { + return nil, err + } + + resp, err := a.roundTripper.RoundTripGRPC(r.Context(), req) + if err != nil { + return nil, err + } + + httpResp := &http.Response{ + StatusCode: int(resp.Code), + Body: &buffer{buff: resp.Body, ReadCloser: ioutil.NopCloser(bytes.NewReader(resp.Body))}, + Header: http.Header{}, + ContentLength: int64(len(resp.Body)), + } + for _, h := range resp.Headers { + httpResp.Header[h.Key] = h.Values + } + return httpResp, nil +} diff --git a/pkg/lokifrontend/frontend/v1/frontend.go b/pkg/lokifrontend/frontend/v1/frontend.go new file mode 100644 index 0000000000000..3c4decd9f1859 --- /dev/null +++ b/pkg/lokifrontend/frontend/v1/frontend.go @@ -0,0 +1,353 @@ +package v1 + +import ( + "context" + "flag" + "fmt" + "net/http" + "time" + + "github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb" + "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/scheduler/queue" + "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/validation" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/services" + "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/httpgrpc" + + lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" +) + +var ( + errTooManyRequest = httpgrpc.Errorf(http.StatusTooManyRequests, "too many outstanding requests") +) + +// Config for a Frontend. +type Config struct { + MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"` + QuerierForgetDelay time.Duration `yaml:"querier_forget_delay"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet. +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.") + f.DurationVar(&cfg.QuerierForgetDelay, "query-frontend.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-frontend will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.") +} + +type Limits interface { + // Returns max queriers to use per tenant, or 0 if shuffle sharding is disabled. + MaxQueriersPerUser(user string) int +} + +// Frontend queues HTTP requests, dispatches them to backends, and handles retries +// for requests which failed. +type Frontend struct { + services.Service + + cfg Config + log log.Logger + limits Limits + + requestQueue *queue.RequestQueue + activeUsers *util.ActiveUsersCleanupService + + // Subservices manager. + subservices *services.Manager + subservicesWatcher *services.FailureWatcher + + // Metrics. + queueLength *prometheus.GaugeVec + discardedRequests *prometheus.CounterVec + numClients prometheus.GaugeFunc + queueDuration prometheus.Histogram +} + +type request struct { + enqueueTime time.Time + queueSpan opentracing.Span + originalCtx context.Context + + request *httpgrpc.HTTPRequest + err chan error + response chan *httpgrpc.HTTPResponse +} + +// New creates a new frontend. Frontend implements service, and must be started and stopped. +func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Frontend, error) { + f := &Frontend{ + cfg: cfg, + log: log, + limits: limits, + queueLength: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_query_frontend_queue_length", + Help: "Number of queries in the queue.", + }, []string{"user"}), + discardedRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_query_frontend_discarded_requests_total", + Help: "Total number of query requests discarded.", + }, []string{"user"}), + queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ + Name: "cortex_query_frontend_queue_duration_seconds", + Help: "Time spend by requests queued.", + Buckets: prometheus.DefBuckets, + }), + } + + f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests) + f.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(f.cleanupInactiveUserMetrics) + + var err error + f.subservices, err = services.NewManager(f.requestQueue, f.activeUsers) + if err != nil { + return nil, err + } + + f.numClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_query_frontend_connected_clients", + Help: "Number of worker clients currently connected to the frontend.", + }, f.requestQueue.GetConnectedQuerierWorkersMetric) + + f.Service = services.NewBasicService(f.starting, f.running, f.stopping) + return f, nil +} + +func (f *Frontend) starting(ctx context.Context) error { + f.subservicesWatcher.WatchManager(f.subservices) + + if err := services.StartManagerAndAwaitHealthy(ctx, f.subservices); err != nil { + return errors.Wrap(err, "unable to start frontend subservices") + } + + return nil +} + +func (f *Frontend) running(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case err := <-f.subservicesWatcher.Chan(): + return errors.Wrap(err, "frontend subservice failed") + } + } +} + +func (f *Frontend) stopping(_ error) error { + // This will also stop the requests queue, which stop accepting new requests and errors out any pending requests. + return services.StopManagerAndAwaitStopped(context.Background(), f.subservices) +} + +func (f *Frontend) cleanupInactiveUserMetrics(user string) { + f.queueLength.DeleteLabelValues(user) + f.discardedRequests.DeleteLabelValues(user) +} + +// RoundTripGRPC round trips a proto (instead of a HTTP request). +func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { + // Propagate trace context in gRPC too - this will be ignored if using HTTP. + tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx) + if tracer != nil && span != nil { + carrier := (*lokigrpc.HeadersCarrier)(req) + err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier) + if err != nil { + return nil, err + } + } + + request := request{ + request: req, + originalCtx: ctx, + + // Buffer of 1 to ensure response can be written by the server side + // of the Process stream, even if this goroutine goes away due to + // client context cancellation. + err: make(chan error, 1), + response: make(chan *httpgrpc.HTTPResponse, 1), + } + + if err := f.queueRequest(ctx, &request); err != nil { + return nil, err + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + + case resp := <-request.response: + return resp, nil + + case err := <-request.err: + return nil, err + } +} + +// Process allows backends to pull requests from the frontend. +func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error { + querierID, err := getQuerierID(server) + if err != nil { + return err + } + + f.requestQueue.RegisterQuerierConnection(querierID) + defer f.requestQueue.UnregisterQuerierConnection(querierID) + + // If the downstream request(from querier -> frontend) is cancelled, + // we need to ping the condition variable to unblock getNextRequestForQuerier. + // Ideally we'd have ctx aware condition variables... + go func() { + <-server.Context().Done() + f.requestQueue.QuerierDisconnecting() + }() + + lastUserIndex := queue.FirstUser() + + for { + reqWrapper, idx, err := f.requestQueue.GetNextRequestForQuerier(server.Context(), lastUserIndex, querierID) + if err != nil { + return err + } + lastUserIndex = idx + + req := reqWrapper.(*request) + + f.queueDuration.Observe(time.Since(req.enqueueTime).Seconds()) + req.queueSpan.Finish() + + /* + We want to dequeue the next unexpired request from the chosen tenant queue. + The chance of choosing a particular tenant for dequeueing is (1/active_tenants). + This is problematic under load, especially with other middleware enabled such as + querier.split-by-interval, where one request may fan out into many. + If expired requests aren't exhausted before checking another tenant, it would take + n_active_tenants * n_expired_requests_at_front_of_queue requests being processed + before an active request was handled for the tenant in question. + If this tenant meanwhile continued to queue requests, + it's possible that it's own queue would perpetually contain only expired requests. + */ + if req.originalCtx.Err() != nil { + lastUserIndex = lastUserIndex.ReuseLastUser() + continue + } + + // Handle the stream sending & receiving on a goroutine so we can + // monitoring the contexts in a select and cancel things appropriately. + resps := make(chan *frontendv1pb.ClientToFrontend, 1) + errs := make(chan error, 1) + go func() { + err = server.Send(&frontendv1pb.FrontendToClient{ + Type: frontendv1pb.HTTP_REQUEST, + HttpRequest: req.request, + StatsEnabled: stats.IsEnabled(req.originalCtx), + }) + if err != nil { + errs <- err + return + } + + resp, err := server.Recv() + if err != nil { + errs <- err + return + } + + resps <- resp + }() + + select { + // If the upstream request is cancelled, we need to cancel the + // downstream req. Only way we can do that is to close the stream. + // The worker client is expecting this semantics. + case <-req.originalCtx.Done(): + return req.originalCtx.Err() + + // Is there was an error handling this request due to network IO, + // then error out this upstream request _and_ stream. + case err := <-errs: + req.err <- err + return err + + // Happy path: merge the stats and propagate the response. + case resp := <-resps: + if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) { + stats := stats.FromContext(req.originalCtx) + stats.Merge(resp.Stats) // Safe if stats is nil. + } + + req.response <- resp.HttpResponse + } + } +} + +func (f *Frontend) NotifyClientShutdown(_ context.Context, req *frontendv1pb.NotifyClientShutdownRequest) (*frontendv1pb.NotifyClientShutdownResponse, error) { + level.Info(f.log).Log("msg", "received shutdown notification from querier", "querier", req.GetClientID()) + f.requestQueue.NotifyQuerierShutdown(req.GetClientID()) + + return &frontendv1pb.NotifyClientShutdownResponse{}, nil +} + +func getQuerierID(server frontendv1pb.Frontend_ProcessServer) (string, error) { + err := server.Send(&frontendv1pb.FrontendToClient{ + Type: frontendv1pb.GET_ID, + // Old queriers don't support GET_ID, and will try to use the request. + // To avoid confusing them, include dummy request. + HttpRequest: &httpgrpc.HTTPRequest{ + Method: "GET", + Url: "/invalid_request_sent_by_frontend", + }, + }) + + if err != nil { + return "", err + } + + resp, err := server.Recv() + + // Old queriers will return empty string, which is fine. All old queriers will be + // treated as single querier with lot of connections. + // (Note: if resp is nil, GetClientID() returns "") + return resp.GetClientID(), err +} + +func (f *Frontend) queueRequest(ctx context.Context, req *request) error { + tenantIDs, err := tenant.TenantIDs(ctx) + if err != nil { + return err + } + + now := time.Now() + req.enqueueTime = now + req.queueSpan, _ = opentracing.StartSpanFromContext(ctx, "queued") + + // aggregate the max queriers limit in the case of a multi tenant query + maxQueriers := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, f.limits.MaxQueriersPerUser) + + joinedTenantID := tenant.JoinTenantIDs(tenantIDs) + f.activeUsers.UpdateUserTimestamp(joinedTenantID, now) + + err = f.requestQueue.EnqueueRequest(joinedTenantID, req, maxQueriers, nil) + if err == queue.ErrTooManyRequests { + return errTooManyRequest + } + return err +} + +// CheckReady determines if the query frontend is ready. Function parameters/return +// chosen to match the same method in the ingester +func (f *Frontend) CheckReady(_ context.Context) error { + // if we have more than one querier connected we will consider ourselves ready + connectedClients := f.requestQueue.GetConnectedQuerierWorkersMetric() + if connectedClients > 0 { + return nil + } + + msg := fmt.Sprintf("not ready: number of queriers connected to query-frontend is %d", int64(connectedClients)) + level.Info(f.log).Log("msg", msg) + return errors.New(msg) +} diff --git a/pkg/lokifrontend/frontend/v2/frontend.go b/pkg/lokifrontend/frontend/v2/frontend.go new file mode 100644 index 0000000000000..6f05b7088f21f --- /dev/null +++ b/pkg/lokifrontend/frontend/v2/frontend.go @@ -0,0 +1,320 @@ +package v2 + +import ( + "context" + "flag" + "fmt" + "math/rand" + "net/http" + "sync" + "time" + + "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" + "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/tenant" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/grpcclient" + "github.com/grafana/dskit/services" + "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/httpgrpc" + "go.uber.org/atomic" + + lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" +) + +// Config for a Frontend. +type Config struct { + SchedulerAddress string `yaml:"scheduler_address"` + DNSLookupPeriod time.Duration `yaml:"scheduler_dns_lookup_period"` + WorkerConcurrency int `yaml:"scheduler_worker_concurrency"` + GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` + + // Used to find local IP address, that is sent to scheduler and querier-worker. + InfNames []string `yaml:"instance_interface_names"` + + // If set, address is not computed from interfaces. + Addr string `yaml:"address" doc:"hidden"` + Port int `doc:"hidden"` +} + +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&cfg.SchedulerAddress, "frontend.scheduler-address", "", "DNS hostname used for finding query-schedulers.") + f.DurationVar(&cfg.DNSLookupPeriod, "frontend.scheduler-dns-lookup-period", 10*time.Second, "How often to resolve the scheduler-address, in order to look for new query-scheduler instances. Also used to determine how often to poll the scheduler-ring for addresses if the scheduler-ring is configured.") + f.IntVar(&cfg.WorkerConcurrency, "frontend.scheduler-worker-concurrency", 5, "Number of concurrent workers forwarding queries to single query-scheduler.") + + cfg.InfNames = []string{"eth0", "en0"} + f.Var((*flagext.StringSlice)(&cfg.InfNames), "frontend.instance-interface-names", "Name of network interface to read address from. This address is sent to query-scheduler and querier, which uses it to send the query response back to query-frontend.") + f.StringVar(&cfg.Addr, "frontend.instance-addr", "", "IP address to advertise to querier (via scheduler) (resolved via interfaces by default).") + f.IntVar(&cfg.Port, "frontend.instance-port", 0, "Port to advertise to querier (via scheduler) (defaults to server.grpc-listen-port).") + + cfg.GRPCClientConfig.RegisterFlagsWithPrefix("frontend.grpc-client-config", f) +} + +// Frontend implements GrpcRoundTripper. It queues HTTP requests, +// dispatches them to backends via gRPC, and handles retries for requests which failed. +type Frontend struct { + services.Service + + cfg Config + log log.Logger + + lastQueryID atomic.Uint64 + + // frontend workers will read from this channel, and send request to scheduler. + requestsCh chan *frontendRequest + + schedulerWorkers *frontendSchedulerWorkers + requests *requestsInProgress +} + +type frontendRequest struct { + queryID uint64 + request *httpgrpc.HTTPRequest + userID string + statsEnabled bool + + cancel context.CancelFunc + + enqueue chan enqueueResult + response chan *frontendv2pb.QueryResultRequest +} + +type enqueueStatus int + +const ( + // Sent to scheduler successfully, and frontend should wait for response now. + waitForResponse enqueueStatus = iota + + // Failed to forward request to scheduler, frontend will try again. + failed +) + +type enqueueResult struct { + status enqueueStatus + + cancelCh chan<- uint64 // Channel that can be used for request cancellation. If nil, cancellation is not possible. +} + +// NewFrontend creates a new frontend. +func NewFrontend(cfg Config, ring ring.ReadRing, log log.Logger, reg prometheus.Registerer) (*Frontend, error) { + requestsCh := make(chan *frontendRequest) + + schedulerWorkers, err := newFrontendSchedulerWorkers(cfg, fmt.Sprintf("%s:%d", cfg.Addr, cfg.Port), ring, requestsCh, log) + if err != nil { + return nil, err + } + + f := &Frontend{ + cfg: cfg, + log: log, + requestsCh: requestsCh, + schedulerWorkers: schedulerWorkers, + requests: newRequestsInProgress(), + } + // Randomize to avoid getting responses from queries sent before restart, which could lead to mixing results + // between different queries. Note that frontend verifies the user, so it cannot leak results between tenants. + // This isn't perfect, but better than nothing. + f.lastQueryID.Store(rand.Uint64()) + + promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_query_frontend_queries_in_progress", + Help: "Number of queries in progress handled by this frontend.", + }, func() float64 { + return float64(f.requests.count()) + }) + + promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_query_frontend_connected_schedulers", + Help: "Number of schedulers this frontend is connected to.", + }, func() float64 { + return float64(f.schedulerWorkers.getWorkersCount()) + }) + + f.Service = services.NewIdleService(f.starting, f.stopping) + return f, nil +} + +func (f *Frontend) starting(ctx context.Context) error { + return errors.Wrap(services.StartAndAwaitRunning(ctx, f.schedulerWorkers), "failed to start frontend scheduler workers") +} + +func (f *Frontend) stopping(_ error) error { + return errors.Wrap(services.StopAndAwaitTerminated(context.Background(), f.schedulerWorkers), "failed to stop frontend scheduler workers") +} + +// RoundTripGRPC round trips a proto (instead of a HTTP request). +func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { + if s := f.State(); s != services.Running { + return nil, fmt.Errorf("frontend not running: %v", s) + } + + tenantIDs, err := tenant.TenantIDs(ctx) + if err != nil { + return nil, err + } + userID := tenant.JoinTenantIDs(tenantIDs) + + // Propagate trace context in gRPC too - this will be ignored if using HTTP. + tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx) + if tracer != nil && span != nil { + carrier := (*lokigrpc.HeadersCarrier)(req) + if err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier); err != nil { + return nil, err + } + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + freq := &frontendRequest{ + queryID: f.lastQueryID.Inc(), + request: req, + userID: userID, + statsEnabled: stats.IsEnabled(ctx), + + cancel: cancel, + + // Buffer of 1 to ensure response or error can be written to the channel + // even if this goroutine goes away due to client context cancellation. + enqueue: make(chan enqueueResult, 1), + response: make(chan *frontendv2pb.QueryResultRequest, 1), + } + + f.requests.put(freq) + defer f.requests.delete(freq.queryID) + + retries := f.cfg.WorkerConcurrency + 1 // To make sure we hit at least two different schedulers. + +enqueueAgain: + select { + case <-ctx.Done(): + return nil, ctx.Err() + + case f.requestsCh <- freq: + // Enqueued, let's wait for response. + } + + var cancelCh chan<- uint64 + + select { + case <-ctx.Done(): + return nil, ctx.Err() + + case enqRes := <-freq.enqueue: + if enqRes.status == waitForResponse { + cancelCh = enqRes.cancelCh + break // go wait for response. + } else if enqRes.status == failed { + retries-- + if retries > 0 { + goto enqueueAgain + } + } + + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "failed to enqueue request") + } + + select { + case <-ctx.Done(): + if cancelCh != nil { + select { + case cancelCh <- freq.queryID: + // cancellation sent. + default: + // failed to cancel, ignore. + } + } + return nil, ctx.Err() + + case resp := <-freq.response: + if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) { + stats := stats.FromContext(ctx) + stats.Merge(resp.Stats) // Safe if stats is nil. + } + + return resp.HttpResponse, nil + } +} + +func (f *Frontend) QueryResult(ctx context.Context, qrReq *frontendv2pb.QueryResultRequest) (*frontendv2pb.QueryResultResponse, error) { + tenantIDs, err := tenant.TenantIDs(ctx) + if err != nil { + return nil, err + } + userID := tenant.JoinTenantIDs(tenantIDs) + + req := f.requests.get(qrReq.QueryID) + // It is possible that some old response belonging to different user was received, if frontend has restarted. + // To avoid leaking query results between users, we verify the user here. + // To avoid mixing results from different queries, we randomize queryID counter on start. + if req != nil && req.userID == userID { + select { + case req.response <- qrReq: + // Should always be possible, unless QueryResult is called multiple times with the same queryID. + default: + level.Warn(f.log).Log("msg", "failed to write query result to the response channel", "queryID", qrReq.QueryID, "user", userID) + } + } + + return &frontendv2pb.QueryResultResponse{}, nil +} + +// CheckReady determines if the query frontend is ready. Function parameters/return +// chosen to match the same method in the ingester +func (f *Frontend) CheckReady(_ context.Context) error { + workers := f.schedulerWorkers.getWorkersCount() + + // If frontend is connected to at least one scheduler, we are ready. + if workers > 0 { + return nil + } + + msg := fmt.Sprintf("not ready: number of schedulers this worker is connected to is %d", workers) + level.Info(f.log).Log("msg", msg) + return errors.New(msg) +} + +type requestsInProgress struct { + mu sync.Mutex + requests map[uint64]*frontendRequest +} + +func newRequestsInProgress() *requestsInProgress { + return &requestsInProgress{ + requests: map[uint64]*frontendRequest{}, + } +} + +func (r *requestsInProgress) count() int { + r.mu.Lock() + defer r.mu.Unlock() + + return len(r.requests) +} + +func (r *requestsInProgress) put(req *frontendRequest) { + r.mu.Lock() + defer r.mu.Unlock() + + r.requests[req.queryID] = req +} + +func (r *requestsInProgress) delete(queryID uint64) { + r.mu.Lock() + defer r.mu.Unlock() + + delete(r.requests, queryID) +} + +func (r *requestsInProgress) get(queryID uint64) *frontendRequest { + r.mu.Lock() + defer r.mu.Unlock() + + return r.requests[queryID] +} diff --git a/pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go b/pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go new file mode 100644 index 0000000000000..bf95eb0c27f7c --- /dev/null +++ b/pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go @@ -0,0 +1,342 @@ +package v2 + +import ( + "context" + "net/http" + "sync" + "time" + + "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/services" + "github.com/pkg/errors" + "github.com/weaveworks/common/httpgrpc" + "google.golang.org/grpc" + + lokiutil "github.com/grafana/loki/pkg/util" +) + +type frontendSchedulerWorkers struct { + services.Service + + cfg Config + logger log.Logger + frontendAddress string + + // Channel with requests that should be forwarded to the scheduler. + requestsCh <-chan *frontendRequest + + watcher services.Service + + mu sync.Mutex + // Set to nil when stop is called... no more workers are created afterwards. + workers map[string]*frontendSchedulerWorker +} + +func newFrontendSchedulerWorkers(cfg Config, frontendAddress string, ring ring.ReadRing, requestsCh <-chan *frontendRequest, logger log.Logger) (*frontendSchedulerWorkers, error) { + f := &frontendSchedulerWorkers{ + cfg: cfg, + logger: logger, + frontendAddress: frontendAddress, + requestsCh: requestsCh, + workers: map[string]*frontendSchedulerWorker{}, + } + + switch { + case ring != nil: + // Use the scheduler ring and RingWatcher to find schedulers. + w, err := lokiutil.NewRingWatcher(log.With(logger, "component", "frontend-scheduler-worker"), ring, cfg.DNSLookupPeriod, f) + if err != nil { + return nil, err + } + f.watcher = w + default: + // If there is no ring config fallback on using DNS for the frontend scheduler worker to find the schedulers. + w, err := util.NewDNSWatcher(cfg.SchedulerAddress, cfg.DNSLookupPeriod, f) + if err != nil { + return nil, err + } + f.watcher = w + } + + f.Service = services.NewIdleService(f.starting, f.stopping) + return f, nil +} + +func (f *frontendSchedulerWorkers) starting(ctx context.Context) error { + return services.StartAndAwaitRunning(ctx, f.watcher) +} + +func (f *frontendSchedulerWorkers) stopping(_ error) error { + err := services.StopAndAwaitTerminated(context.Background(), f.watcher) + + f.mu.Lock() + defer f.mu.Unlock() + + for _, w := range f.workers { + w.stop() + } + f.workers = nil + + return err +} + +func (f *frontendSchedulerWorkers) AddressAdded(address string) { + f.mu.Lock() + ws := f.workers + w := f.workers[address] + f.mu.Unlock() + + // Already stopped or we already have worker for this address. + if ws == nil || w != nil { + return + } + + level.Info(f.logger).Log("msg", "adding connection to scheduler", "addr", address) + conn, err := f.connectToScheduler(context.Background(), address) + if err != nil { + level.Error(f.logger).Log("msg", "error connecting to scheduler", "addr", address, "err", err) + return + } + + // No worker for this address yet, start a new one. + w = newFrontendSchedulerWorker(conn, address, f.frontendAddress, f.requestsCh, f.cfg.WorkerConcurrency, f.logger) + + f.mu.Lock() + defer f.mu.Unlock() + + // Can be nil if stopping has been called already. + if f.workers != nil { + f.workers[address] = w + w.start() + } +} + +func (f *frontendSchedulerWorkers) AddressRemoved(address string) { + level.Info(f.logger).Log("msg", "removing connection to scheduler", "addr", address) + + f.mu.Lock() + // This works fine if f.workers is nil already. + w := f.workers[address] + delete(f.workers, address) + f.mu.Unlock() + + if w != nil { + w.stop() + } +} + +// Get number of workers. +func (f *frontendSchedulerWorkers) getWorkersCount() int { + f.mu.Lock() + defer f.mu.Unlock() + + return len(f.workers) +} + +func (f *frontendSchedulerWorkers) connectToScheduler(ctx context.Context, address string) (*grpc.ClientConn, error) { + // Because we only use single long-running method, it doesn't make sense to inject user ID, send over tracing or add metrics. + opts, err := f.cfg.GRPCClientConfig.DialOption(nil, nil) + if err != nil { + return nil, err + } + + conn, err := grpc.DialContext(ctx, address, opts...) + if err != nil { + return nil, err + } + return conn, nil +} + +// Worker managing single gRPC connection to Scheduler. Each worker starts multiple goroutines for forwarding +// requests and cancellations to scheduler. +type frontendSchedulerWorker struct { + log log.Logger + + conn *grpc.ClientConn + concurrency int + schedulerAddr string + frontendAddr string + + // Context and cancellation used by individual goroutines. + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + + // Shared between all frontend workers. + requestCh <-chan *frontendRequest + + // Cancellation requests for this scheduler are received via this channel. It is passed to frontend after + // query has been enqueued to scheduler. + cancelCh chan uint64 +} + +func newFrontendSchedulerWorker(conn *grpc.ClientConn, schedulerAddr string, frontendAddr string, requestCh <-chan *frontendRequest, concurrency int, log log.Logger) *frontendSchedulerWorker { + w := &frontendSchedulerWorker{ + log: log, + conn: conn, + concurrency: concurrency, + schedulerAddr: schedulerAddr, + frontendAddr: frontendAddr, + requestCh: requestCh, + cancelCh: make(chan uint64), + } + w.ctx, w.cancel = context.WithCancel(context.Background()) + + return w +} + +func (w *frontendSchedulerWorker) start() { + client := schedulerpb.NewSchedulerForFrontendClient(w.conn) + for i := 0; i < w.concurrency; i++ { + w.wg.Add(1) + go func() { + defer w.wg.Done() + w.runOne(w.ctx, client) + }() + } +} + +func (w *frontendSchedulerWorker) stop() { + w.cancel() + w.wg.Wait() + if err := w.conn.Close(); err != nil { + level.Error(w.log).Log("msg", "error while closing connection to scheduler", "err", err) + } +} + +func (w *frontendSchedulerWorker) runOne(ctx context.Context, client schedulerpb.SchedulerForFrontendClient) { + backoffConfig := backoff.Config{ + MinBackoff: 500 * time.Millisecond, + MaxBackoff: 5 * time.Second, + } + + backoff := backoff.New(ctx, backoffConfig) + for backoff.Ongoing() { + loop, loopErr := client.FrontendLoop(ctx) + if loopErr != nil { + level.Error(w.log).Log("msg", "error contacting scheduler", "err", loopErr, "addr", w.schedulerAddr) + backoff.Wait() + continue + } + + loopErr = w.schedulerLoop(loop) + if closeErr := loop.CloseSend(); closeErr != nil { + level.Debug(w.log).Log("msg", "failed to close frontend loop", "err", loopErr, "addr", w.schedulerAddr) + } + + if loopErr != nil { + level.Error(w.log).Log("msg", "error sending requests to scheduler", "err", loopErr, "addr", w.schedulerAddr) + backoff.Wait() + continue + } + + backoff.Reset() + } +} + +func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFrontend_FrontendLoopClient) error { + if err := loop.Send(&schedulerpb.FrontendToScheduler{ + Type: schedulerpb.INIT, + FrontendAddress: w.frontendAddr, + }); err != nil { + return err + } + + if resp, err := loop.Recv(); err != nil || resp.Status != schedulerpb.OK { + if err != nil { + return err + } + return errors.Errorf("unexpected status received for init: %v", resp.Status) + } + + ctx := loop.Context() + + for { + select { + case <-ctx.Done(): + // No need to report error if our internal context is canceled. This can happen during shutdown, + // or when scheduler is no longer resolvable. (It would be nice if this context reported "done" also when + // connection scheduler stops the call, but that doesn't seem to be the case). + // + // Reporting error here would delay reopening the stream (if the worker context is not done yet). + level.Debug(w.log).Log("msg", "stream context finished", "err", ctx.Err()) + return nil + + case req := <-w.requestCh: + err := loop.Send(&schedulerpb.FrontendToScheduler{ + Type: schedulerpb.ENQUEUE, + QueryID: req.queryID, + UserID: req.userID, + HttpRequest: req.request, + FrontendAddress: w.frontendAddr, + StatsEnabled: req.statsEnabled, + }) + + if err != nil { + req.enqueue <- enqueueResult{status: failed} + return err + } + + resp, err := loop.Recv() + if err != nil { + req.enqueue <- enqueueResult{status: failed} + return err + } + + switch resp.Status { + case schedulerpb.OK: + req.enqueue <- enqueueResult{status: waitForResponse, cancelCh: w.cancelCh} + // Response will come from querier. + + case schedulerpb.SHUTTING_DOWN: + // Scheduler is shutting down, report failure to enqueue and stop this loop. + req.enqueue <- enqueueResult{status: failed} + return errors.New("scheduler is shutting down") + + case schedulerpb.ERROR: + req.enqueue <- enqueueResult{status: waitForResponse} + req.response <- &frontendv2pb.QueryResultRequest{ + HttpResponse: &httpgrpc.HTTPResponse{ + Code: http.StatusInternalServerError, + Body: []byte(err.Error()), + }, + } + + case schedulerpb.TOO_MANY_REQUESTS_PER_TENANT: + req.enqueue <- enqueueResult{status: waitForResponse} + req.response <- &frontendv2pb.QueryResultRequest{ + HttpResponse: &httpgrpc.HTTPResponse{ + Code: http.StatusTooManyRequests, + Body: []byte("too many outstanding requests"), + }, + } + } + + case reqID := <-w.cancelCh: + err := loop.Send(&schedulerpb.FrontendToScheduler{ + Type: schedulerpb.CANCEL, + QueryID: reqID, + }) + + if err != nil { + return err + } + + resp, err := loop.Recv() + if err != nil { + return err + } + + // Scheduler may be shutting down, report that. + if resp.Status != schedulerpb.OK { + return errors.Errorf("unexpected status received for cancellation: %v", resp.Status) + } + } + } +} diff --git a/pkg/querier/worker/frontend_processor.go b/pkg/querier/worker/frontend_processor.go new file mode 100644 index 0000000000000..25100661dd5dd --- /dev/null +++ b/pkg/querier/worker/frontend_processor.go @@ -0,0 +1,147 @@ +package worker + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb" + "github.com/cortexproject/cortex/pkg/querier/stats" + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" + "github.com/weaveworks/common/httpgrpc" + "google.golang.org/grpc" +) + +var ( + processorBackoffConfig = backoff.Config{ + MinBackoff: 500 * time.Millisecond, + MaxBackoff: 5 * time.Second, + } +) + +func newFrontendProcessor(cfg Config, handler RequestHandler, log log.Logger) processor { + return &frontendProcessor{ + log: log, + handler: handler, + maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize, + querierID: cfg.QuerierID, + } +} + +// Handles incoming queries from frontend. +type frontendProcessor struct { + handler RequestHandler + maxMessageSize int + querierID string + + log log.Logger +} + +// notifyShutdown implements processor. +func (fp *frontendProcessor) notifyShutdown(ctx context.Context, conn *grpc.ClientConn, address string) { + client := frontendv1pb.NewFrontendClient(conn) + + req := &frontendv1pb.NotifyClientShutdownRequest{ClientID: fp.querierID} + if _, err := client.NotifyClientShutdown(ctx, req); err != nil { + // Since we're shutting down there's nothing we can do except logging it. + level.Warn(fp.log).Log("msg", "failed to notify querier shutdown to query-frontend", "address", address, "err", err) + } +} + +// runOne loops, trying to establish a stream to the frontend to begin request processing. +func (fp *frontendProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) { + client := frontendv1pb.NewFrontendClient(conn) + + backoff := backoff.New(ctx, processorBackoffConfig) + for backoff.Ongoing() { + c, err := client.Process(ctx) + if err != nil { + level.Error(fp.log).Log("msg", "error contacting frontend", "address", address, "err", err) + backoff.Wait() + continue + } + + if err := fp.process(c); err != nil { + level.Error(fp.log).Log("msg", "error processing requests", "address", address, "err", err) + backoff.Wait() + continue + } + + backoff.Reset() + } +} + +// process loops processing requests on an established stream. +func (fp *frontendProcessor) process(c frontendv1pb.Frontend_ProcessClient) error { + // Build a child context so we can cancel a query when the stream is closed. + ctx, cancel := context.WithCancel(c.Context()) + defer cancel() + + for { + request, err := c.Recv() + if err != nil { + return err + } + + switch request.Type { + case frontendv1pb.HTTP_REQUEST: + // Handle the request on a "background" goroutine, so we go back to + // blocking on c.Recv(). This allows us to detect the stream closing + // and cancel the query. We don't actually handle queries in parallel + // here, as we're running in lock step with the server - each Recv is + // paired with a Send. + go fp.runRequest(ctx, request.HttpRequest, request.StatsEnabled, func(response *httpgrpc.HTTPResponse, stats *stats.Stats) error { + return c.Send(&frontendv1pb.ClientToFrontend{ + HttpResponse: response, + Stats: stats, + }) + }) + + case frontendv1pb.GET_ID: + err := c.Send(&frontendv1pb.ClientToFrontend{ClientID: fp.querierID}) + if err != nil { + return err + } + + default: + return fmt.Errorf("unknown request type: %v", request.Type) + } + } +} + +func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.HTTPRequest, statsEnabled bool, sendHTTPResponse func(response *httpgrpc.HTTPResponse, stats *stats.Stats) error) { + var stats *querier_stats.Stats + if statsEnabled { + stats, ctx = querier_stats.ContextWithEmptyStats(ctx) + } + + response, err := fp.handler.Handle(ctx, request) + if err != nil { + var ok bool + response, ok = httpgrpc.HTTPResponseFromError(err) + if !ok { + response = &httpgrpc.HTTPResponse{ + Code: http.StatusInternalServerError, + Body: []byte(err.Error()), + } + } + } + + // Ensure responses that are too big are not retried. + if len(response.Body) >= fp.maxMessageSize { + errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", len(response.Body), fp.maxMessageSize) + response = &httpgrpc.HTTPResponse{ + Code: http.StatusRequestEntityTooLarge, + Body: []byte(errMsg), + } + level.Error(fp.log).Log("msg", "error processing query", "err", errMsg) + } + + if err := sendHTTPResponse(response, stats); err != nil { + level.Error(fp.log).Log("msg", "error processing requests", "err", err) + } +} diff --git a/pkg/querier/worker/processor_manager.go b/pkg/querier/worker/processor_manager.go new file mode 100644 index 0000000000000..5d675c88a6576 --- /dev/null +++ b/pkg/querier/worker/processor_manager.go @@ -0,0 +1,86 @@ +package worker + +import ( + "context" + "sync" + "time" + + "go.uber.org/atomic" + "google.golang.org/grpc" +) + +const ( + notifyShutdownTimeout = 5 * time.Second +) + +// Manages processor goroutines for single grpc connection. +type processorManager struct { + p processor + conn *grpc.ClientConn + address string + + // Main context to control all goroutines. + ctx context.Context + wg sync.WaitGroup + + // Cancel functions for individual goroutines. + cancelsMu sync.Mutex + cancels []context.CancelFunc + + currentProcessors *atomic.Int32 +} + +func newProcessorManager(ctx context.Context, p processor, conn *grpc.ClientConn, address string) *processorManager { + return &processorManager{ + p: p, + ctx: ctx, + conn: conn, + address: address, + currentProcessors: atomic.NewInt32(0), + } +} + +func (pm *processorManager) stop() { + // Notify the remote query-frontend or query-scheduler we're shutting down. + // We use a new context to make sure it's not cancelled. + notifyCtx, cancel := context.WithTimeout(context.Background(), notifyShutdownTimeout) + defer cancel() + pm.p.notifyShutdown(notifyCtx, pm.conn, pm.address) + + // Stop all goroutines. + pm.concurrency(0) + + // Wait until they finish. + pm.wg.Wait() + + _ = pm.conn.Close() +} + +func (pm *processorManager) concurrency(n int) { + pm.cancelsMu.Lock() + defer pm.cancelsMu.Unlock() + + if n < 0 { + n = 0 + } + + for len(pm.cancels) < n { + ctx, cancel := context.WithCancel(pm.ctx) + pm.cancels = append(pm.cancels, cancel) + + pm.wg.Add(1) + go func() { + defer pm.wg.Done() + + pm.currentProcessors.Inc() + defer pm.currentProcessors.Dec() + + pm.p.processQueriesOnSingleStream(ctx, pm.conn, pm.address) + }() + } + + for len(pm.cancels) > n { + pm.cancels[0]() + pm.cancels = pm.cancels[1:] + } +} diff --git a/pkg/querier/worker/scheduler_processor.go b/pkg/querier/worker/scheduler_processor.go new file mode 100644 index 0000000000000..d1efe87de135f --- /dev/null +++ b/pkg/querier/worker/scheduler_processor.go @@ -0,0 +1,234 @@ +package worker + +import ( + "context" + "fmt" + "net/http" + "strings" + "time" + + "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/ring/client" + "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" + util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/grpcclient" + dskit_middleware "github.com/grafana/dskit/middleware" + "github.com/grafana/dskit/services" + otgrpc "github.com/opentracing-contrib/go-grpc" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" + + lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" +) + +func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer) (*schedulerProcessor, []services.Service) { + p := &schedulerProcessor{ + log: log, + handler: handler, + maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize, + querierID: cfg.QuerierID, + grpcConfig: cfg.GRPCClientConfig, + + frontendClientRequestDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_querier_query_frontend_request_duration_seconds", + Help: "Time spend doing requests to frontend.", + Buckets: prometheus.ExponentialBuckets(0.001, 4, 6), + }, []string{"operation", "status_code"}), + } + + frontendClientsGauge := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_querier_query_frontend_clients", + Help: "The current number of clients connected to query-frontend.", + }) + + poolConfig := client.PoolConfig{ + CheckInterval: 5 * time.Second, + HealthCheckEnabled: true, + HealthCheckTimeout: 1 * time.Second, + } + + p.frontendPool = client.NewPool("frontend", poolConfig, nil, p.createFrontendClient, frontendClientsGauge, log) + return p, []services.Service{p.frontendPool} +} + +// Handles incoming queries from query-scheduler. +type schedulerProcessor struct { + log log.Logger + handler RequestHandler + grpcConfig grpcclient.Config + maxMessageSize int + querierID string + + frontendPool *client.Pool + frontendClientRequestDuration *prometheus.HistogramVec +} + +// notifyShutdown implements processor. +func (sp *schedulerProcessor) notifyShutdown(ctx context.Context, conn *grpc.ClientConn, address string) { + client := schedulerpb.NewSchedulerForQuerierClient(conn) + + req := &schedulerpb.NotifyQuerierShutdownRequest{QuerierID: sp.querierID} + if _, err := client.NotifyQuerierShutdown(ctx, req); err != nil { + // Since we're shutting down there's nothing we can do except logging it. + level.Warn(sp.log).Log("msg", "failed to notify querier shutdown to query-scheduler", "address", address, "err", err) + } +} + +func (sp *schedulerProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) { + schedulerClient := schedulerpb.NewSchedulerForQuerierClient(conn) + + backoff := backoff.New(ctx, processorBackoffConfig) + for backoff.Ongoing() { + c, err := schedulerClient.QuerierLoop(ctx) + if err == nil { + err = c.Send(&schedulerpb.QuerierToScheduler{QuerierID: sp.querierID}) + } + + if err != nil { + level.Error(sp.log).Log("msg", "error contacting scheduler", "err", err, "addr", address) + backoff.Wait() + continue + } + + if err := sp.querierLoop(c, address); err != nil { + // E.Welch I don't know how to do this any better but context cancelations seem common, + // likely because of an underlying connection being close, + // they are noisy and I don't think they communicate anything useful. + if !strings.Contains(err.Error(), "context canceled") { + level.Error(sp.log).Log("msg", "error processing requests from scheduler", "err", err, "addr", address) + } + backoff.Wait() + continue + } + + backoff.Reset() + } +} + +// process loops processing requests on an established stream. +func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_QuerierLoopClient, address string) error { + // Build a child context so we can cancel a query when the stream is closed. + ctx, cancel := context.WithCancel(c.Context()) + defer cancel() + + for { + request, err := c.Recv() + if err != nil { + return err + } + + // Handle the request on a "background" goroutine, so we go back to + // blocking on c.Recv(). This allows us to detect the stream closing + // and cancel the query. We don't actually handle queries in parallel + // here, as we're running in lock step with the server - each Recv is + // paired with a Send. + go func() { + // We need to inject user into context for sending response back. + ctx := user.InjectOrgID(ctx, request.UserID) + + tracer := opentracing.GlobalTracer() + // Ignore errors here. If we cannot get parent span, we just don't create new one. + parentSpanContext, _ := lokigrpc.GetParentSpanForRequest(tracer, request.HttpRequest) + if parentSpanContext != nil { + queueSpan, spanCtx := opentracing.StartSpanFromContextWithTracer(ctx, tracer, "querier_processor_runRequest", opentracing.ChildOf(parentSpanContext)) + defer queueSpan.Finish() + + ctx = spanCtx + } + logger := util_log.WithContext(ctx, sp.log) + + sp.runRequest(ctx, logger, request.QueryID, request.FrontendAddress, request.StatsEnabled, request.HttpRequest) + + // Report back to scheduler that processing of the query has finished. + if err := c.Send(&schedulerpb.QuerierToScheduler{}); err != nil { + level.Error(logger).Log("msg", "error notifying scheduler about finished query", "err", err, "addr", address) + } + }() + } +} + +func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger, queryID uint64, frontendAddress string, statsEnabled bool, request *httpgrpc.HTTPRequest) { + var stats *querier_stats.Stats + if statsEnabled { + stats, ctx = querier_stats.ContextWithEmptyStats(ctx) + } + + response, err := sp.handler.Handle(ctx, request) + if err != nil { + var ok bool + response, ok = httpgrpc.HTTPResponseFromError(err) + if !ok { + response = &httpgrpc.HTTPResponse{ + Code: http.StatusInternalServerError, + Body: []byte(err.Error()), + } + } + } + + // Ensure responses that are too big are not retried. + if len(response.Body) >= sp.maxMessageSize { + level.Error(logger).Log("msg", "response larger than max message size", "size", len(response.Body), "maxMessageSize", sp.maxMessageSize) + + errMsg := fmt.Sprintf("response larger than the max message size (%d vs %d)", len(response.Body), sp.maxMessageSize) + response = &httpgrpc.HTTPResponse{ + Code: http.StatusRequestEntityTooLarge, + Body: []byte(errMsg), + } + } + + c, err := sp.frontendPool.GetClientFor(frontendAddress) + if err == nil { + // Response is empty and uninteresting. + _, err = c.(frontendv2pb.FrontendForQuerierClient).QueryResult(ctx, &frontendv2pb.QueryResultRequest{ + QueryID: queryID, + HttpResponse: response, + Stats: stats, + }) + } + if err != nil { + level.Error(logger).Log("msg", "error notifying frontend about finished query", "err", err, "frontend", frontendAddress) + } +} + +func (sp *schedulerProcessor) createFrontendClient(addr string) (client.PoolClient, error) { + opts, err := sp.grpcConfig.DialOption([]grpc.UnaryClientInterceptor{ + otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), + middleware.ClientUserHeaderInterceptor, + dskit_middleware.PrometheusGRPCUnaryInstrumentation(sp.frontendClientRequestDuration), + }, nil) + + if err != nil { + return nil, err + } + + conn, err := grpc.Dial(addr, opts...) + if err != nil { + return nil, err + } + + return &frontendClient{ + FrontendForQuerierClient: frontendv2pb.NewFrontendForQuerierClient(conn), + HealthClient: grpc_health_v1.NewHealthClient(conn), + conn: conn, + }, nil +} + +type frontendClient struct { + frontendv2pb.FrontendForQuerierClient + grpc_health_v1.HealthClient + conn *grpc.ClientConn +} + +func (fc *frontendClient) Close() error { + return fc.conn.Close() +} diff --git a/pkg/querier/worker/worker.go b/pkg/querier/worker/worker.go new file mode 100644 index 0000000000000..0caedd892b389 --- /dev/null +++ b/pkg/querier/worker/worker.go @@ -0,0 +1,284 @@ +package worker + +import ( + "context" + "flag" + "os" + "sync" + "time" + + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/grpcclient" + "github.com/grafana/dskit/services" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/weaveworks/common/httpgrpc" + "google.golang.org/grpc" + + lokiutil "github.com/grafana/loki/pkg/util" +) + +type Config struct { + FrontendAddress string `yaml:"frontend_address"` + SchedulerAddress string `yaml:"scheduler_address"` + DNSLookupPeriod time.Duration `yaml:"dns_lookup_duration"` + + Parallelism int `yaml:"parallelism"` + MatchMaxConcurrency bool `yaml:"match_max_concurrent"` + MaxConcurrentRequests int `yaml:"-"` // Must be same as passed to PromQL Engine. + + QuerierID string `yaml:"id"` + + GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` +} + +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&cfg.SchedulerAddress, "querier.scheduler-address", "", "Hostname (and port) of scheduler that querier will periodically resolve, connect to and receive queries from. Only one of -querier.frontend-address or -querier.scheduler-address can be set. If neither is set, queries are only received via HTTP endpoint.") + f.StringVar(&cfg.FrontendAddress, "querier.frontend-address", "", "Address of query frontend service, in host:port format. If -querier.scheduler-address is set as well, querier will use scheduler instead. Only one of -querier.frontend-address or -querier.scheduler-address can be set. If neither is set, queries are only received via HTTP endpoint.") + + f.DurationVar(&cfg.DNSLookupPeriod, "querier.dns-lookup-period", 3*time.Second, "How often to query DNS for query-frontend or query-scheduler address. Also used to determine how often to poll the scheduler-ring for addresses if the scheduler-ring is configured.") + + f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 10, "Number of simultaneous queries to process per query-frontend or query-scheduler.") + f.BoolVar(&cfg.MatchMaxConcurrency, "querier.worker-match-max-concurrent", false, "Force worker concurrency to match the -querier.max-concurrent option. Overrides querier.worker-parallelism.") + f.StringVar(&cfg.QuerierID, "querier.id", "", "Querier ID, sent to frontend service to identify requests from the same querier. Defaults to hostname.") + + cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f) +} + +func (cfg *Config) Validate(log log.Logger) error { + if cfg.FrontendAddress != "" && cfg.SchedulerAddress != "" { + return errors.New("frontend address and scheduler address are mutually exclusive, please use only one") + } + return cfg.GRPCClientConfig.Validate(log) +} + +// Handler for HTTP requests wrapped in protobuf messages. +type RequestHandler interface { + Handle(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) +} + +// Single processor handles all streaming operations to query-frontend or query-scheduler to fetch queries +// and process them. +type processor interface { + // Each invocation of processQueriesOnSingleStream starts new streaming operation to query-frontend + // or query-scheduler to fetch queries and execute them. + // + // This method must react on context being finished, and stop when that happens. + // + // processorManager (not processor) is responsible for starting as many goroutines as needed for each connection. + processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) + + // notifyShutdown notifies the remote query-frontend or query-scheduler that the querier is + // shutting down. + notifyShutdown(ctx context.Context, conn *grpc.ClientConn, address string) +} + +type querierWorker struct { + *services.BasicService + + cfg Config + logger log.Logger + + processor processor + + subservices *services.Manager + + mu sync.Mutex + // Set to nil when stop is called... no more managers are created afterwards. + managers map[string]*processorManager +} + +func NewQuerierWorker(cfg Config, rng ring.ReadRing, handler RequestHandler, logger log.Logger, reg prometheus.Registerer) (services.Service, error) { + if cfg.QuerierID == "" { + hostname, err := os.Hostname() + if err != nil { + return nil, errors.Wrap(err, "failed to get hostname for configuring querier ID") + } + cfg.QuerierID = hostname + } + + var processor processor + var servs []services.Service + var address string + + switch { + case rng != nil: + level.Info(logger).Log("msg", "Starting querier worker using query-scheduler and scheduler ring for addresses") + processor, servs = newSchedulerProcessor(cfg, handler, logger, reg) + case cfg.SchedulerAddress != "": + level.Info(logger).Log("msg", "Starting querier worker connected to query-scheduler", "scheduler", cfg.SchedulerAddress) + + address = cfg.SchedulerAddress + processor, servs = newSchedulerProcessor(cfg, handler, logger, reg) + + case cfg.FrontendAddress != "": + level.Info(logger).Log("msg", "Starting querier worker connected to query-frontend", "frontend", cfg.FrontendAddress) + + address = cfg.FrontendAddress + processor = newFrontendProcessor(cfg, handler, logger) + default: + return nil, errors.New("unable to start the querier worker, need to configure one of frontend_address, scheduler_address, or a ring config in the query_scheduler config block") + } + + return newQuerierWorkerWithProcessor(cfg, logger, processor, address, rng, servs) +} + +func newQuerierWorkerWithProcessor(cfg Config, logger log.Logger, processor processor, address string, ring ring.ReadRing, servs []services.Service) (*querierWorker, error) { + f := &querierWorker{ + cfg: cfg, + logger: logger, + managers: map[string]*processorManager{}, + processor: processor, + } + + // Empty address is only used in tests, where individual targets are added manually. + if address != "" { + w, err := util.NewDNSWatcher(address, cfg.DNSLookupPeriod, f) + if err != nil { + return nil, err + } + + servs = append(servs, w) + } + + if ring != nil { + w, err := lokiutil.NewRingWatcher(log.With(logger, "component", "querier-scheduler-worker"), ring, cfg.DNSLookupPeriod, f) + if err != nil { + return nil, err + } + servs = append(servs, w) + } + + if len(servs) > 0 { + subservices, err := services.NewManager(servs...) + if err != nil { + return nil, errors.Wrap(err, "querier worker subservices") + } + + f.subservices = subservices + } + + f.BasicService = services.NewIdleService(f.starting, f.stopping) + return f, nil +} + +func (w *querierWorker) starting(ctx context.Context) error { + if w.subservices == nil { + return nil + } + return services.StartManagerAndAwaitHealthy(ctx, w.subservices) +} + +func (w *querierWorker) stopping(_ error) error { + // Stop all goroutines fetching queries. Note that in Stopping state, + // worker no longer creates new managers in AddressAdded method. + w.mu.Lock() + for _, m := range w.managers { + m.stop() + } + w.mu.Unlock() + + if w.subservices == nil { + return nil + } + + // Stop DNS watcher and services used by processor. + return services.StopManagerAndAwaitStopped(context.Background(), w.subservices) +} + +func (w *querierWorker) AddressAdded(address string) { + ctx := w.ServiceContext() + if ctx == nil || ctx.Err() != nil { + return + } + + w.mu.Lock() + defer w.mu.Unlock() + + if m := w.managers[address]; m != nil { + return + } + + level.Info(w.logger).Log("msg", "adding connection", "addr", address) + conn, err := w.connect(context.Background(), address) + if err != nil { + level.Error(w.logger).Log("msg", "error connecting", "addr", address, "err", err) + return + } + + w.managers[address] = newProcessorManager(ctx, w.processor, conn, address) + // Called with lock. + w.resetConcurrency() +} + +func (w *querierWorker) AddressRemoved(address string) { + level.Info(w.logger).Log("msg", "removing connection", "addr", address) + + w.mu.Lock() + p := w.managers[address] + delete(w.managers, address) + // Called with lock. + w.resetConcurrency() + w.mu.Unlock() + + if p != nil { + p.stop() + } +} + +// Must be called with lock. +func (w *querierWorker) resetConcurrency() { + totalConcurrency := 0 + index := 0 + + for _, m := range w.managers { + concurrency := 0 + + if w.cfg.MatchMaxConcurrency { + concurrency = w.cfg.MaxConcurrentRequests / len(w.managers) + + // If max concurrency does not evenly divide into our frontends a subset will be chosen + // to receive an extra connection. Frontend addresses were shuffled above so this will be a + // random selection of frontends. + if index < w.cfg.MaxConcurrentRequests%len(w.managers) { + level.Warn(w.logger).Log("msg", "max concurrency is not evenly divisible across targets, adding an extra connection", "addr", m.address) + concurrency++ + } + } else { + concurrency = w.cfg.Parallelism + } + + // If concurrency is 0 then MaxConcurrentRequests is less than the total number of + // frontends/schedulers. In order to prevent accidentally starving a frontend or scheduler we are just going to + // always connect once to every target. This is dangerous b/c we may start exceeding PromQL + // max concurrency. + if concurrency == 0 { + concurrency = 1 + } + + totalConcurrency += concurrency + m.concurrency(concurrency) + index++ + } + + if totalConcurrency > w.cfg.MaxConcurrentRequests { + level.Warn(w.logger).Log("msg", "total worker concurrency is greater than promql max concurrency. Queries may be queued in the querier which reduces QOS") + } +} + +func (w *querierWorker) connect(ctx context.Context, address string) (*grpc.ClientConn, error) { + // Because we only use single long-running method, it doesn't make sense to inject user ID, send over tracing or add metrics. + opts, err := w.cfg.GRPCClientConfig.DialOption(nil, nil) + if err != nil { + return nil, err + } + + conn, err := grpc.DialContext(ctx, address, opts...) + if err != nil { + return nil, err + } + return conn, nil +} diff --git a/pkg/querier/worker_service.go b/pkg/querier/worker_service.go index 9203a459e67ba..4fff395ee122b 100644 --- a/pkg/querier/worker_service.go +++ b/pkg/querier/worker_service.go @@ -4,7 +4,7 @@ import ( "fmt" "net/http" - querier_worker "github.com/cortexproject/cortex/pkg/querier/worker" + "github.com/cortexproject/cortex/pkg/ring" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/log/level" "github.com/gorilla/mux" @@ -15,6 +15,7 @@ import ( httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/middleware" + querier_worker "github.com/grafana/loki/pkg/querier/worker" serverutil "github.com/grafana/loki/pkg/util/server" ) @@ -25,6 +26,7 @@ type WorkerServiceConfig struct { QuerierWorkerConfig *querier_worker.Config QueryFrontendEnabled bool QuerySchedulerEnabled bool + SchedulerRing ring.ReadRing } // InitWorkerService takes a config object, a map of routes to handlers, an external http router and external @@ -78,12 +80,16 @@ func InitWorkerService( // If a frontend or scheduler address has been configured, return a querier worker service that uses // the external Loki Server HTTP server, which has now has the internal handler's routes registered with it return querier_worker.NewQuerierWorker( - *(cfg.QuerierWorkerConfig), httpgrpc_server.NewServer(externalHandler), util_log.Logger, prometheus.DefaultRegisterer) + *(cfg.QuerierWorkerConfig), + cfg.SchedulerRing, + httpgrpc_server.NewServer(externalHandler), + util_log.Logger, + prometheus.DefaultRegisterer) } - // Since we must be running a querier with either a frontend and/or scheduler at this point, if no frontend or scheduler address + // Since we must be running a querier with either a frontend and/or scheduler at this point, if no scheduler ring, frontend, or scheduler address // is configured, Loki will default to using the frontend on localhost on it's own GRPC listening port. - if (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" { + if cfg.SchedulerRing == nil && (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" { address := fmt.Sprintf("127.0.0.1:%d", cfg.GrpcListenPort) level.Warn(util_log.Logger).Log( "msg", "Worker address is empty, attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.", @@ -117,7 +123,11 @@ func InitWorkerService( //Return a querier worker pointed to the internal querier HTTP handler so there is not a conflict in routes between the querier //and the query frontend return querier_worker.NewQuerierWorker( - *(cfg.QuerierWorkerConfig), httpgrpc_server.NewServer(internalHandler), util_log.Logger, prometheus.DefaultRegisterer) + *(cfg.QuerierWorkerConfig), + cfg.SchedulerRing, + httpgrpc_server.NewServer(internalHandler), + util_log.Logger, + prometheus.DefaultRegisterer) } func registerRoutesExternally(routes []string, externalRouter *mux.Router, internalHandler http.Handler, authMiddleware middleware.Interface) { diff --git a/pkg/querier/worker_service_test.go b/pkg/querier/worker_service_test.go index f11411d2f374d..39abb3c30840b 100644 --- a/pkg/querier/worker_service_test.go +++ b/pkg/querier/worker_service_test.go @@ -5,12 +5,13 @@ import ( "net/http/httptest" "testing" - querier_worker "github.com/cortexproject/cortex/pkg/querier/worker" "github.com/gorilla/mux" "github.com/grafana/dskit/services" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/middleware" + + querier_worker "github.com/grafana/loki/pkg/querier/worker" ) func Test_InitQuerierService(t *testing.T) { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go new file mode 100644 index 0000000000000..fca4e1655ec10 --- /dev/null +++ b/pkg/scheduler/scheduler.go @@ -0,0 +1,673 @@ +package scheduler + +import ( + "context" + "flag" + "io" + "net/http" + "sync" + "time" + + "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/scheduler/queue" + "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" + "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/validation" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/grpcclient" + "github.com/grafana/dskit/kv" + "github.com/grafana/dskit/services" + otgrpc "github.com/opentracing-contrib/go-grpc" + "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" + "google.golang.org/grpc" + + lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" +) + +var ( + errSchedulerIsNotRunning = errors.New("scheduler is not running") +) + +const ( + // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance + // in the ring will be automatically removed. + ringAutoForgetUnhealthyPeriods = 10 +) + +// Scheduler is responsible for queueing and dispatching queries to Queriers. +type Scheduler struct { + services.Service + + cfg Config + log log.Logger + + limits Limits + + connectedFrontendsMu sync.Mutex + connectedFrontends map[string]*connectedFrontend + + requestQueue *queue.RequestQueue + activeUsers *util.ActiveUsersCleanupService + + pendingRequestsMu sync.Mutex + pendingRequests map[requestKey]*schedulerRequest // Request is kept in this map even after being dispatched to querier. It can still be canceled at that time. + + // Subservices manager. + subservices *services.Manager + subservicesWatcher *services.FailureWatcher + + // Metrics. + queueLength *prometheus.GaugeVec + discardedRequests *prometheus.CounterVec + connectedQuerierClients prometheus.GaugeFunc + connectedFrontendClients prometheus.GaugeFunc + queueDuration prometheus.Histogram + + // Ring used for finding schedulers + ringLifecycler *ring.BasicLifecycler + ring *ring.Ring +} + +type requestKey struct { + frontendAddr string + queryID uint64 +} + +type connectedFrontend struct { + connections int + + // This context is used for running all queries from the same frontend. + // When last frontend connection is closed, context is canceled. + ctx context.Context + cancel context.CancelFunc +} + +type Config struct { + MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant"` + QuerierForgetDelay time.Duration `yaml:"-"` + GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=This configures the gRPC client used to report errors back to the query-frontend."` + // Schedulers ring + UseSchedulerRing bool `yaml:"use_scheduler_ring"` + SchedulerRing RingConfig `yaml:"scheduler_ring,omitempty"` +} + +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.IntVar(&cfg.MaxOutstandingPerTenant, "query-scheduler.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per query scheduler. In-flight requests above this limit will fail with HTTP response status code 429.") + // Loki doesn't have query shuffle sharding yet for which this config is intended + // use the default value of 0 until someday when this config may be needed. + cfg.QuerierForgetDelay = 0 + cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f) + f.BoolVar(&cfg.UseSchedulerRing, "query-scheduler.use-scheduler-ring", false, "Set to true to have the query scheduler create a ring and the frontend and frontend_worker use this ring to get the addresses of the query schedulers. If frontend_address and scheduler_address are not present in the config this value will be toggle by Loki to true") + cfg.SchedulerRing.RegisterFlags(f) +} + +// NewScheduler creates a new Scheduler. +func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Scheduler, error) { + s := &Scheduler{ + cfg: cfg, + log: log, + limits: limits, + + pendingRequests: map[requestKey]*schedulerRequest{}, + connectedFrontends: map[string]*connectedFrontend{}, + } + + s.queueLength = promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_query_scheduler_queue_length", + Help: "Number of queries in the queue.", + }, []string{"user"}) + + s.discardedRequests = promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_query_scheduler_discarded_requests_total", + Help: "Total number of query requests discarded.", + }, []string{"user"}) + s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, s.queueLength, s.discardedRequests) + + s.queueDuration = promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ + Name: "cortex_query_scheduler_queue_duration_seconds", + Help: "Time spend by requests in queue before getting picked up by a querier.", + Buckets: prometheus.DefBuckets, + }) + s.connectedQuerierClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_query_scheduler_connected_querier_clients", + Help: "Number of querier worker clients currently connected to the query-scheduler.", + }, s.requestQueue.GetConnectedQuerierWorkersMetric) + s.connectedFrontendClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_query_scheduler_connected_frontend_clients", + Help: "Number of query-frontend worker clients currently connected to the query-scheduler.", + }, s.getConnectedFrontendClientsMetric) + + s.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(s.cleanupMetricsForInactiveUser) + + svcs := []services.Service{s.requestQueue, s.activeUsers} + + if cfg.UseSchedulerRing { + ringStore, err := kv.NewClient( + cfg.SchedulerRing.KVStore, + ring.GetCodec(), + kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", registerer), "scheduler"), + log, + ) + if err != nil { + return nil, errors.Wrap(err, "create KV store client") + } + lifecyclerCfg, err := cfg.SchedulerRing.ToLifecyclerConfig() + if err != nil { + return nil, errors.Wrap(err, "invalid ring lifecycler config") + } + + // Define lifecycler delegates in reverse order (last to be called defined first because they're + // chained via "next delegate"). + delegate := ring.BasicLifecyclerDelegate(s) + delegate = ring.NewLeaveOnStoppingDelegate(delegate, log) + delegate = ring.NewTokensPersistencyDelegate(cfg.SchedulerRing.TokensFilePath, ring.JOINING, delegate, log) + delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.SchedulerRing.HeartbeatTimeout, delegate, log) + + s.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, RingNameForServer, RingKey, ringStore, delegate, log, registerer) + if err != nil { + return nil, errors.Wrap(err, "create ring lifecycler") + } + + ringCfg := cfg.SchedulerRing.ToRingConfig() + s.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, RingNameForServer, RingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy()) + if err != nil { + return nil, errors.Wrap(err, "create ring client") + } + + if registerer != nil { + registerer.MustRegister(s.ring) + } + svcs = append(svcs, s.ringLifecycler, s.ring) + } + + var err error + s.subservices, err = services.NewManager(svcs...) + if err != nil { + return nil, err + } + + s.Service = services.NewBasicService(s.starting, s.running, s.stopping) + return s, nil +} + +// Limits needed for the Query Scheduler - interface used for decoupling. +type Limits interface { + // MaxQueriersPerUser returns max queriers to use per tenant, or 0 if shuffle sharding is disabled. + MaxQueriersPerUser(user string) int +} + +type schedulerRequest struct { + frontendAddress string + userID string + queryID uint64 + request *httpgrpc.HTTPRequest + statsEnabled bool + + enqueueTime time.Time + + ctx context.Context + ctxCancel context.CancelFunc + queueSpan opentracing.Span + + // This is only used for testing. + parentSpanContext opentracing.SpanContext +} + +// FrontendLoop handles connection from frontend. +func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_FrontendLoopServer) error { + frontendAddress, frontendCtx, err := s.frontendConnected(frontend) + if err != nil { + return err + } + defer s.frontendDisconnected(frontendAddress) + + // Response to INIT. If scheduler is not running, we skip for-loop, send SHUTTING_DOWN and exit this method. + if s.State() == services.Running { + if err := frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}); err != nil { + return err + } + } + + // We stop accepting new queries in Stopping state. By returning quickly, we disconnect frontends, which in turns + // cancels all their queries. + for s.State() == services.Running { + msg, err := frontend.Recv() + if err != nil { + // No need to report this as error, it is expected when query-frontend performs SendClose() (as frontendSchedulerWorker does). + if err == io.EOF { + return nil + } + return err + } + + if s.State() != services.Running { + break // break out of the loop, and send SHUTTING_DOWN message. + } + + var resp *schedulerpb.SchedulerToFrontend + + switch msg.GetType() { + case schedulerpb.ENQUEUE: + err = s.enqueueRequest(frontendCtx, frontendAddress, msg) + switch { + case err == nil: + resp = &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} + case err == queue.ErrTooManyRequests: + resp = &schedulerpb.SchedulerToFrontend{Status: schedulerpb.TOO_MANY_REQUESTS_PER_TENANT} + default: + resp = &schedulerpb.SchedulerToFrontend{Status: schedulerpb.ERROR, Error: err.Error()} + } + + case schedulerpb.CANCEL: + s.cancelRequestAndRemoveFromPending(frontendAddress, msg.QueryID) + resp = &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} + + default: + level.Error(s.log).Log("msg", "unknown request type from frontend", "addr", frontendAddress, "type", msg.GetType()) + return errors.New("unknown request type") + } + + err = frontend.Send(resp) + // Failure to send response results in ending this connection. + if err != nil { + return err + } + } + + // Report shutdown back to frontend, so that it can retry with different scheduler. Also stop the frontend loop. + return frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN}) +} + +func (s *Scheduler) frontendConnected(frontend schedulerpb.SchedulerForFrontend_FrontendLoopServer) (string, context.Context, error) { + msg, err := frontend.Recv() + if err != nil { + return "", nil, err + } + if msg.Type != schedulerpb.INIT || msg.FrontendAddress == "" { + return "", nil, errors.New("no frontend address") + } + + level.Debug(s.log).Log("msg", "frontend connected", "address", msg.FrontendAddress) + + s.connectedFrontendsMu.Lock() + defer s.connectedFrontendsMu.Unlock() + + cf := s.connectedFrontends[msg.FrontendAddress] + if cf == nil { + cf = &connectedFrontend{ + connections: 0, + } + cf.ctx, cf.cancel = context.WithCancel(context.Background()) + s.connectedFrontends[msg.FrontendAddress] = cf + } + + cf.connections++ + return msg.FrontendAddress, cf.ctx, nil +} + +func (s *Scheduler) frontendDisconnected(frontendAddress string) { + s.connectedFrontendsMu.Lock() + defer s.connectedFrontendsMu.Unlock() + + level.Debug(s.log).Log("msg", "frontend disconnected", "address", frontendAddress) + + cf := s.connectedFrontends[frontendAddress] + cf.connections-- + if cf.connections == 0 { + delete(s.connectedFrontends, frontendAddress) + cf.cancel() + } +} + +func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr string, msg *schedulerpb.FrontendToScheduler) error { + // Create new context for this request, to support cancellation. + ctx, cancel := context.WithCancel(frontendContext) + shouldCancel := true + defer func() { + if shouldCancel { + cancel() + } + }() + + // Extract tracing information from headers in HTTP request. FrontendContext doesn't have the correct tracing + // information, since that is a long-running request. + tracer := opentracing.GlobalTracer() + parentSpanContext, err := lokigrpc.GetParentSpanForRequest(tracer, msg.HttpRequest) + if err != nil { + return err + } + + userID := msg.GetUserID() + + req := &schedulerRequest{ + frontendAddress: frontendAddr, + userID: msg.UserID, + queryID: msg.QueryID, + request: msg.HttpRequest, + statsEnabled: msg.StatsEnabled, + } + + now := time.Now() + + req.parentSpanContext = parentSpanContext + req.queueSpan, req.ctx = opentracing.StartSpanFromContextWithTracer(ctx, tracer, "queued", opentracing.ChildOf(parentSpanContext)) + req.enqueueTime = now + req.ctxCancel = cancel + + // aggregate the max queriers limit in the case of a multi tenant query + tenantIDs, err := tenant.TenantIDsFromOrgID(userID) + if err != nil { + return err + } + maxQueriers := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, s.limits.MaxQueriersPerUser) + + s.activeUsers.UpdateUserTimestamp(userID, now) + return s.requestQueue.EnqueueRequest(userID, req, maxQueriers, func() { + shouldCancel = false + + s.pendingRequestsMu.Lock() + defer s.pendingRequestsMu.Unlock() + s.pendingRequests[requestKey{frontendAddr: frontendAddr, queryID: msg.QueryID}] = req + }) +} + +// This method doesn't do removal from the queue. +func (s *Scheduler) cancelRequestAndRemoveFromPending(frontendAddr string, queryID uint64) { + s.pendingRequestsMu.Lock() + defer s.pendingRequestsMu.Unlock() + + key := requestKey{frontendAddr: frontendAddr, queryID: queryID} + req := s.pendingRequests[key] + if req != nil { + req.ctxCancel() + } + delete(s.pendingRequests, key) +} + +// QuerierLoop is started by querier to receive queries from scheduler. +func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierLoopServer) error { + resp, err := querier.Recv() + if err != nil { + return err + } + + querierID := resp.GetQuerierID() + level.Debug(s.log).Log("msg", "querier connected", "querier", querierID) + + s.requestQueue.RegisterQuerierConnection(querierID) + defer s.requestQueue.UnregisterQuerierConnection(querierID) + + // If the downstream connection to querier is cancelled, + // we need to ping the condition variable to unblock getNextRequestForQuerier. + // Ideally we'd have ctx aware condition variables... + go func() { + <-querier.Context().Done() + s.requestQueue.QuerierDisconnecting() + }() + + lastUserIndex := queue.FirstUser() + + // In stopping state scheduler is not accepting new queries, but still dispatching queries in the queues. + for s.isRunningOrStopping() { + req, idx, err := s.requestQueue.GetNextRequestForQuerier(querier.Context(), lastUserIndex, querierID) + if err != nil { + return err + } + lastUserIndex = idx + + r := req.(*schedulerRequest) + + s.queueDuration.Observe(time.Since(r.enqueueTime).Seconds()) + r.queueSpan.Finish() + + /* + We want to dequeue the next unexpired request from the chosen tenant queue. + The chance of choosing a particular tenant for dequeueing is (1/active_tenants). + This is problematic under load, especially with other middleware enabled such as + querier.split-by-interval, where one request may fan out into many. + If expired requests aren't exhausted before checking another tenant, it would take + n_active_tenants * n_expired_requests_at_front_of_queue requests being processed + before an active request was handled for the tenant in question. + If this tenant meanwhile continued to queue requests, + it's possible that it's own queue would perpetually contain only expired requests. + */ + + if r.ctx.Err() != nil { + // Remove from pending requests. + s.cancelRequestAndRemoveFromPending(r.frontendAddress, r.queryID) + + lastUserIndex = lastUserIndex.ReuseLastUser() + continue + } + + if err := s.forwardRequestToQuerier(querier, r); err != nil { + return err + } + } + + return errSchedulerIsNotRunning +} + +func (s *Scheduler) NotifyQuerierShutdown(_ context.Context, req *schedulerpb.NotifyQuerierShutdownRequest) (*schedulerpb.NotifyQuerierShutdownResponse, error) { + level.Debug(s.log).Log("msg", "received shutdown notification from querier", "querier", req.GetQuerierID()) + s.requestQueue.NotifyQuerierShutdown(req.GetQuerierID()) + + return &schedulerpb.NotifyQuerierShutdownResponse{}, nil +} + +func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuerier_QuerierLoopServer, req *schedulerRequest) error { + // Make sure to cancel request at the end to cleanup resources. + defer s.cancelRequestAndRemoveFromPending(req.frontendAddress, req.queryID) + + // Handle the stream sending & receiving on a goroutine so we can + // monitoring the contexts in a select and cancel things appropriately. + errCh := make(chan error, 1) + go func() { + err := querier.Send(&schedulerpb.SchedulerToQuerier{ + UserID: req.userID, + QueryID: req.queryID, + FrontendAddress: req.frontendAddress, + HttpRequest: req.request, + StatsEnabled: req.statsEnabled, + }) + if err != nil { + errCh <- err + return + } + + _, err = querier.Recv() + errCh <- err + }() + + select { + case <-req.ctx.Done(): + // If the upstream request is cancelled (eg. frontend issued CANCEL or closed connection), + // we need to cancel the downstream req. Only way we can do that is to close the stream (by returning error here). + // Querier is expecting this semantics. + return req.ctx.Err() + + case err := <-errCh: + // Is there was an error handling this request due to network IO, + // then error out this upstream request _and_ stream. + + if err != nil { + s.forwardErrorToFrontend(req.ctx, req, err) + } + return err + } +} + +func (s *Scheduler) forwardErrorToFrontend(ctx context.Context, req *schedulerRequest, requestErr error) { + opts, err := s.cfg.GRPCClientConfig.DialOption([]grpc.UnaryClientInterceptor{ + otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), + middleware.ClientUserHeaderInterceptor}, + nil) + if err != nil { + level.Warn(s.log).Log("msg", "failed to create gRPC options for the connection to frontend to report error", "frontend", req.frontendAddress, "err", err, "requestErr", requestErr) + return + } + + conn, err := grpc.DialContext(ctx, req.frontendAddress, opts...) + if err != nil { + level.Warn(s.log).Log("msg", "failed to create gRPC connection to frontend to report error", "frontend", req.frontendAddress, "err", err, "requestErr", requestErr) + return + } + + defer func() { + _ = conn.Close() + }() + + client := frontendv2pb.NewFrontendForQuerierClient(conn) + + userCtx := user.InjectOrgID(ctx, req.userID) + _, err = client.QueryResult(userCtx, &frontendv2pb.QueryResultRequest{ + QueryID: req.queryID, + HttpResponse: &httpgrpc.HTTPResponse{ + Code: http.StatusInternalServerError, + Body: []byte(requestErr.Error()), + }, + }) + + if err != nil { + level.Warn(s.log).Log("msg", "failed to forward error to frontend", "frontend", req.frontendAddress, "err", err, "requestErr", requestErr) + return + } +} + +func (s *Scheduler) isRunningOrStopping() bool { + st := s.State() + return st == services.Running || st == services.Stopping +} + +func (s *Scheduler) starting(ctx context.Context) (err error) { + // In case this function will return error we want to unregister the instance + // from the ring. We do it ensuring dependencies are gracefully stopped if they + // were already started. + defer func() { + if err == nil || s.subservices == nil { + return + } + + if stopErr := services.StopManagerAndAwaitStopped(context.Background(), s.subservices); stopErr != nil { + level.Error(s.log).Log("msg", "failed to gracefully stop scheduler dependencies", "err", stopErr) + } + }() + + s.subservicesWatcher.WatchManager(s.subservices) + + if err := services.StartManagerAndAwaitHealthy(ctx, s.subservices); err != nil { + return errors.Wrap(err, "unable to start scheduler subservices") + } + + if s.cfg.UseSchedulerRing { + // The BasicLifecycler does not automatically move state to ACTIVE such that any additional work that + // someone wants to do can be done before becoming ACTIVE. For the query scheduler we don't currently + // have any additional work so we can become ACTIVE right away. + + // Wait until the ring client detected this instance in the JOINING state to + // make sure that when we'll run the initial sync we already know the tokens + // assigned to this instance. + level.Info(s.log).Log("msg", "waiting until scheduler is JOINING in the ring") + if err := ring.WaitInstanceState(ctx, s.ring, s.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil { + return err + } + level.Info(s.log).Log("msg", "scheduler is JOINING in the ring") + + // Change ring state to ACTIVE + if err = s.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil { + return errors.Wrapf(err, "switch instance to %s in the ring", ring.ACTIVE) + } + + // Wait until the ring client detected this instance in the ACTIVE state to + // make sure that when we'll run the loop it won't be detected as a ring + // topology change. + level.Info(s.log).Log("msg", "waiting until scheduler is ACTIVE in the ring") + if err := ring.WaitInstanceState(ctx, s.ring, s.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil { + return err + } + level.Info(s.log).Log("msg", "scheduler is ACTIVE in the ring") + } + + return nil +} + +func (s *Scheduler) running(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case err := <-s.subservicesWatcher.Chan(): + return errors.Wrap(err, "scheduler subservice failed") + } + } +} + +// Close the Scheduler. +func (s *Scheduler) stopping(_ error) error { + // This will also stop the requests queue, which stop accepting new requests and errors out any pending requests. + return services.StopManagerAndAwaitStopped(context.Background(), s.subservices) +} + +func (s *Scheduler) cleanupMetricsForInactiveUser(user string) { + s.queueLength.DeleteLabelValues(user) + s.discardedRequests.DeleteLabelValues(user) +} + +func (s *Scheduler) getConnectedFrontendClientsMetric() float64 { + s.connectedFrontendsMu.Lock() + defer s.connectedFrontendsMu.Unlock() + + count := 0 + for _, workers := range s.connectedFrontends { + count += workers.connections + } + + return float64(count) +} + +// SafeReadRing does a nil check on the Scheduler before attempting to return it's ring +// this is necessary as many callers of this function will only have a valid Scheduler +// reference if the QueryScheduler target has been specified, which is not guaranteed +func (s *Scheduler) SafeReadRing() ring.ReadRing { + if s == nil || s.ring == nil || !s.cfg.UseSchedulerRing { + return nil + } + + return s.ring +} + +func (s *Scheduler) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) { + // When we initialize the scheduler instance in the ring we want to start from + // a clean situation, so whatever is the state we set it JOINING, while we keep existing + // tokens (if any) or the ones loaded from file. + var tokens []uint32 + if instanceExists { + tokens = instanceDesc.GetTokens() + } + + takenTokens := ringDesc.GetTokens() + newTokens := ring.GenerateTokens(RingNumTokens-len(tokens), takenTokens) + + // Tokens sorting will be enforced by the parent caller. + tokens = append(tokens, newTokens...) + + return ring.JOINING, tokens +} + +func (s *Scheduler) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens) {} +func (s *Scheduler) OnRingInstanceStopping(_ *ring.BasicLifecycler) {} +func (s *Scheduler) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) { +} + +func (s *Scheduler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + s.ring.ServeHTTP(w, req) +} diff --git a/pkg/scheduler/scheduler_ring.go b/pkg/scheduler/scheduler_ring.go new file mode 100644 index 0000000000000..8f41a6138d463 --- /dev/null +++ b/pkg/scheduler/scheduler_ring.go @@ -0,0 +1,110 @@ +package scheduler + +import ( + "flag" + "fmt" + "os" + "time" + + "github.com/go-kit/log/level" + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/kv" + + "github.com/cortexproject/cortex/pkg/ring" + util_log "github.com/cortexproject/cortex/pkg/util/log" +) + +const ( + // RingKey is the key under which we store the store gateways ring in the KVStore. + RingKey = "scheduler" + + // RingNameForServer is the name of the ring used by the store gateway server. + RingNameForServer = "scheduler" + + // RingNameForClient is the name of the ring used by the store gateway client (we need + // a different name to avoid clashing Prometheus metrics when running in single-binary). + RingNameForClient = "scheduler-client" + + // We use a safe default instead of exposing to config option to the user + // in order to simplify the config. + RingNumTokens = 512 +) + +// RingConfig masks the ring lifecycler config which contains +// many options not really required by the distributors ring. This config +// is used to strip down the config to the minimum, and avoid confusion +// to the user. +type RingConfig struct { + KVStore kv.Config `yaml:"kvstore"` + HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` + HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` + TokensFilePath string `yaml:"tokens_file_path"` + ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` + + // Instance details + InstanceID string `yaml:"instance_id"` + InstanceInterfaceNames []string `yaml:"instance_interface_names"` + InstancePort int `yaml:"instance_port"` + InstanceAddr string `yaml:"instance_addr"` + InstanceZone string `yaml:"instance_availability_zone"` + + // Injected internally + ListenPort int `yaml:"-"` + + ObservePeriod time.Duration `yaml:"-"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { + hostname, err := os.Hostname() + if err != nil { + level.Error(util_log.Logger).Log("msg", "failed to get hostname", "err", err) + os.Exit(1) + } + + // Ring flags + cfg.KVStore.RegisterFlagsWithPrefix("scheduler.ring.", "schedulers/", f) + f.DurationVar(&cfg.HeartbeatPeriod, "scheduler.ring.heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.") + f.DurationVar(&cfg.HeartbeatTimeout, "scheduler.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which schedulers are considered unhealthy within the ring. 0 = never (timeout disabled).") + f.StringVar(&cfg.TokensFilePath, "scheduler.ring.tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.") + f.BoolVar(&cfg.ZoneAwarenessEnabled, "scheduler.ring.zone-awareness-enabled", false, "True to enable zone-awareness and replicate blocks across different availability zones.") + + // Instance flags + cfg.InstanceInterfaceNames = []string{"eth0", "en0"} + f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), "scheduler.ring.instance-interface-names", "Name of network interface to read address from.") + f.StringVar(&cfg.InstanceAddr, "scheduler.ring.instance-addr", "", "IP address to advertise in the ring.") + f.IntVar(&cfg.InstancePort, "scheduler.ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).") + f.StringVar(&cfg.InstanceID, "scheduler.ring.instance-id", hostname, "Instance ID to register in the ring.") + f.StringVar(&cfg.InstanceZone, "scheduler.ring.instance-availability-zone", "", "The availability zone where this instance is running. Required if zone-awareness is enabled.") +} + +// ToLifecyclerConfig returns a LifecyclerConfig based on the scheduler ring config. +func (cfg *RingConfig) ToLifecyclerConfig() (ring.BasicLifecyclerConfig, error) { + instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames) + if err != nil { + return ring.BasicLifecyclerConfig{}, err + } + + instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort) + + return ring.BasicLifecyclerConfig{ + ID: cfg.InstanceID, + Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort), + Zone: cfg.InstanceZone, + HeartbeatPeriod: cfg.HeartbeatPeriod, + TokensObservePeriod: 0, + NumTokens: RingNumTokens, + }, nil +} + +func (cfg *RingConfig) ToRingConfig() ring.Config { + rc := ring.Config{} + flagext.DefaultValues(&rc) + + rc.KVStore = cfg.KVStore + rc.HeartbeatTimeout = cfg.HeartbeatTimeout + rc.ZoneAwarenessEnabled = cfg.ZoneAwarenessEnabled + rc.ReplicationFactor = 2 + + return rc +} diff --git a/pkg/util/httpgrpc/carrier.go b/pkg/util/httpgrpc/carrier.go new file mode 100644 index 0000000000000..cff2cd3920740 --- /dev/null +++ b/pkg/util/httpgrpc/carrier.go @@ -0,0 +1,40 @@ +package httpgrpc + +import ( + "github.com/opentracing/opentracing-go" + weaveworks_httpgrpc "github.com/weaveworks/common/httpgrpc" +) + +// Used to transfer trace information from/to HTTP request. +type HeadersCarrier weaveworks_httpgrpc.HTTPRequest + +func (c *HeadersCarrier) Set(key, val string) { + c.Headers = append(c.Headers, &weaveworks_httpgrpc.Header{ + Key: key, + Values: []string{val}, + }) +} + +func (c *HeadersCarrier) ForeachKey(handler func(key, val string) error) error { + for _, h := range c.Headers { + for _, v := range h.Values { + if err := handler(h.Key, v); err != nil { + return err + } + } + } + return nil +} + +func GetParentSpanForRequest(tracer opentracing.Tracer, req *weaveworks_httpgrpc.HTTPRequest) (opentracing.SpanContext, error) { + if tracer == nil { + return nil, nil + } + + carrier := (*HeadersCarrier)(req) + extracted, err := tracer.Extract(opentracing.HTTPHeaders, carrier) + if err == opentracing.ErrSpanContextNotFound { + err = nil + } + return extracted, err +} diff --git a/pkg/util/ring_watcher.go b/pkg/util/ring_watcher.go new file mode 100644 index 0000000000000..ee9779ab6ad15 --- /dev/null +++ b/pkg/util/ring_watcher.go @@ -0,0 +1,126 @@ +package util + +import ( + "context" + "fmt" + "time" + + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/services" +) + +const ( + RingKeyOfLeader = 0 +) + +type ringWatcher struct { + log log.Logger + ring ring.ReadRing + notifications util.DNSNotifications + lookupPeriod time.Duration + addresses []string +} + +// NewRingWatcher creates a new Ring watcher and returns a service that is wrapping it. +func NewRingWatcher(log log.Logger, ring ring.ReadRing, lookupPeriod time.Duration, notifications util.DNSNotifications) (services.Service, error) { + w := &ringWatcher{ + log: log, + ring: ring, + notifications: notifications, + lookupPeriod: lookupPeriod, + } + return services.NewBasicService(nil, w.watchLoop, nil), nil +} + +// watchLoop watches for changes in DNS and sends notifications. +func (w *ringWatcher) watchLoop(servCtx context.Context) error { + + syncTicker := time.NewTicker(w.lookupPeriod) + defer syncTicker.Stop() + + for { + select { + case <-servCtx.Done(): + return nil + case <-syncTicker.C: + w.lookupAddresses() + } + } +} + +func (w *ringWatcher) lookupAddresses() { + addrs, err := w.getAddresses() + if err != nil { + level.Error(w.log).Log("msg", "error getting addresses from ring", "err", err) + } + + if len(addrs) == 0 { + return + } + toAdd := make([]string, 0, len(addrs)) + for i, newAddr := range addrs { + alreadyExists := false + for _, currAddr := range w.addresses { + if currAddr == newAddr { + alreadyExists = true + } + } + if !alreadyExists { + toAdd = append(toAdd, addrs[i]) + } + } + toRemove := make([]string, 0, len(w.addresses)) + for i, existingAddr := range w.addresses { + stillExists := false + for _, newAddr := range addrs { + if newAddr == existingAddr { + stillExists = true + } + } + if !stillExists { + toRemove = append(toRemove, w.addresses[i]) + } + } + + for _, ta := range toAdd { + level.Debug(w.log).Log("msg", fmt.Sprintf("adding connection to scheduler at address: %s", ta)) + w.notifications.AddressAdded(ta) + } + + for _, tr := range toRemove { + level.Debug(w.log).Log("msg", fmt.Sprintf("removing connection to scheduler at address: %s", tr)) + w.notifications.AddressRemoved(tr) + } + + w.addresses = addrs + +} + +func (w *ringWatcher) getAddresses() ([]string, error) { + var addrs []string + + // If there are less than 2 existing addresses, odds are we are running just a single instance + // so just get the first healthy address and use it. If the call returns to continue on to + // check for the actual replicaset instances + if len(w.addresses) < 2 { + rs, err := w.ring.GetAllHealthy(ring.WriteNoExtend) + if err != nil { + return nil, err + } + addrs = rs.GetAddresses() + if len(addrs) == 1 { + return addrs, nil + } + } + + bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() + rs, err := w.ring.Get(RingKeyOfLeader, ring.WriteNoExtend, bufDescs, bufHosts, bufZones) + if err != nil { + return nil, err + } + + return rs.GetAddresses(), nil +}