From 5122df0ea76e73b6578d7cd20204381138d16b67 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Thu, 4 Nov 2021 08:00:32 -0400 Subject: [PATCH 1/4] better handling of tail requests when the frontend and querier are running in the same process --- pkg/loki/modules.go | 32 +++++++++++++++++++++++++------- pkg/querier/worker_service.go | 7 +++++++ 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index e3b37cfd759c1..c7806a15f89b6 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -238,17 +238,30 @@ func (t *Loki) initQuerier() (services.Service, error) { "/loki/api/v1/label": http.HandlerFunc(t.Querier.LabelHandler), "/loki/api/v1/labels": http.HandlerFunc(t.Querier.LabelHandler), "/loki/api/v1/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler), - "/loki/api/v1/tail": http.HandlerFunc(t.Querier.TailHandler), "/loki/api/v1/series": http.HandlerFunc(t.Querier.SeriesHandler), "/api/prom/query": http.HandlerFunc(t.Querier.LogQueryHandler), "/api/prom/label": http.HandlerFunc(t.Querier.LabelHandler), "/api/prom/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler), - "/api/prom/tail": http.HandlerFunc(t.Querier.TailHandler), "/api/prom/series": http.HandlerFunc(t.Querier.SeriesHandler), } + + // We always want to register tail routes externally, tail requests are different from normal queries, they + // are HTTP requests that get upgraded to websocket requests and need to be handled/kept open by the Queriers. + // The frontend has code to proxy these requests, however when running in the same processes + // (such as target=All or target=Read) we don't want the frontend to proxy and instead we want the Queriers + // to directly register these routes. + // In practice this means we always want the queriers to register the tail routes externally, when a querier + // is standalone ALL routes are registered externally, and when it's in the same process as a frontend, + // we disable the proxying of the tail routes in initQueryFrontend() and we still want these routes regiestered + // on the external router. + var alwaysExternalHandlers = map[string]http.Handler{ + "/loki/api/v1/tail": http.HandlerFunc(t.Querier.TailHandler), + "/api/prom/tail": http.HandlerFunc(t.Querier.TailHandler), + } + return querier.InitWorkerService( - querierWorkerServiceConfig, queryHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware, + querierWorkerServiceConfig, queryHandlers, alwaysExternalHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware, ) } @@ -480,7 +493,8 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) { ).Wrap(frontendHandler) var defaultHandler http.Handler - if t.Cfg.Frontend.TailProxyURL != "" { + // If this process also acts as a Querier we don't do any proxying of tail requests + if t.Cfg.Frontend.TailProxyURL != "" && !t.ModuleManager.IsModuleRegistered(Querier) { httpMiddleware := middleware.Merge( t.HTTPAuthMiddleware, queryrange.StatsHTTPMiddleware, @@ -512,9 +526,13 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) { t.Server.HTTP.Path("/api/prom/label/{name}/values").Methods("GET", "POST").Handler(frontendHandler) t.Server.HTTP.Path("/api/prom/series").Methods("GET", "POST").Handler(frontendHandler) - // defer tail endpoints to the default handler - t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(defaultHandler) - t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(defaultHandler) + // Only register tailing requests if this process does not act as a Querier + // If this process is also a Querier the Querier will register the tail endpoints. + if !t.ModuleManager.IsModuleRegistered(Querier) { + // defer tail endpoints to the default handler + t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(defaultHandler) + t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(defaultHandler) + } if t.frontend == nil { return services.NewIdleService(nil, func(_ error) error { diff --git a/pkg/querier/worker_service.go b/pkg/querier/worker_service.go index 9ce2ed1cc5479..ee345d0e49d76 100644 --- a/pkg/querier/worker_service.go +++ b/pkg/querier/worker_service.go @@ -46,6 +46,7 @@ type WorkerServiceConfig struct { func InitWorkerService( cfg WorkerServiceConfig, queryRoutesToHandlers map[string]http.Handler, + alwaysExternalRoutesToHandlers map[string]http.Handler, externalRouter *mux.Router, externalHandler http.Handler, authMiddleware middleware.Interface, @@ -121,6 +122,12 @@ func InitWorkerService( //Querier worker's max concurrent requests must be the same as the querier setting (*cfg.QuerierWorkerConfig).MaxConcurrentRequests = cfg.QuerierMaxConcurrent + // There are some routes which are always registered on the external router, add them now and + // wrap them with the httpMiddleware + for route, handler := range alwaysExternalRoutesToHandlers { + externalRouter.Path(route).Methods("GET", "POST").Handler(httpMiddleware.Wrap(handler)) + } + //Return a querier worker pointed to the internal querier HTTP handler so there is not a conflict in routes between the querier //and the query frontend return querier_worker.NewQuerierWorker( From bcc6938f597f7af8bb74868fc2f83c297cbf4b2c Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Thu, 4 Nov 2021 11:09:49 -0400 Subject: [PATCH 2/4] adding function to determine if a module is enable based on the target supplied to Loki --- pkg/loki/loki.go | 18 ++++++++++++++++++ pkg/loki/loki_test.go | 33 +++++++++++++++++++++++++++++++++ pkg/loki/modules.go | 4 ++-- 3 files changed, 53 insertions(+), 2 deletions(-) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 559a17f51d7e4..c06b307308081 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -209,6 +209,7 @@ type Loki struct { // set during initialization ModuleManager *modules.Manager serviceMap map[string]services.Service + deps map[string][]string Server *server.Server ring *ring.Ring @@ -481,7 +482,24 @@ func (t *Loki) setupModuleManager() error { } } + t.deps = deps t.ModuleManager = mm return nil } + +func (t *Loki) isModuleEnabled(m string) bool { + for _, tg := range t.Cfg.Target { + if tg == m { + return true + } + if k, ok := t.deps[tg]; ok { + for _, dp := range k { + if dp == m { + return true + } + } + } + } + return false +} diff --git a/pkg/loki/loki_test.go b/pkg/loki/loki_test.go index 19d45d4827743..64706ac838b3c 100644 --- a/pkg/loki/loki_test.go +++ b/pkg/loki/loki_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/grafana/dskit/flagext" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -52,3 +54,34 @@ func TestFlagDefaults(t *testing.T) { require.Equal(t, c.Server.GRPCServerPingWithoutStreamAllowed, true) require.Contains(t, gotFlags[flagToCheck], "(default true)") } + +func TestLoki_isModuleEnabled(t1 *testing.T) { + tests := []struct { + name string + target flagext.StringSliceCSV + module string + want bool + }{ + {name: "Target All includes Querier", target: flagext.StringSliceCSV{"all"}, module: Querier, want: true}, + {name: "Target Querier does not include Distributor", target: flagext.StringSliceCSV{"querier"}, module: Distributor, want: false}, + {name: "Target Read includes Query Frontend", target: flagext.StringSliceCSV{"read"}, module: QueryFrontend, want: true}, + {name: "Target Querier does not include Query Frontend", target: flagext.StringSliceCSV{"querier"}, module: QueryFrontend, want: false}, + {name: "Target Query Frontend does not include Querier", target: flagext.StringSliceCSV{"query-frontend"}, module: Querier, want: false}, + {name: "Multi target includes querier", target: flagext.StringSliceCSV{"query-frontend", "query-scheduler", "querier"}, module: Querier, want: true}, + {name: "Multi target does not include distributor", target: flagext.StringSliceCSV{"query-frontend", "query-scheduler", "querier"}, module: Distributor, want: false}, + } + for _, tt := range tests { + t1.Run(tt.name, func(t1 *testing.T) { + t := &Loki{ + Cfg: Config{ + Target: tt.target, + }, + } + err := t.setupModuleManager() + assert.NoError(t1, err) + if got := t.isModuleEnabled(tt.module); got != tt.want { + t1.Errorf("isModuleEnabled() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index c7806a15f89b6..a42b67e9064b3 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -494,7 +494,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) { var defaultHandler http.Handler // If this process also acts as a Querier we don't do any proxying of tail requests - if t.Cfg.Frontend.TailProxyURL != "" && !t.ModuleManager.IsModuleRegistered(Querier) { + if t.Cfg.Frontend.TailProxyURL != "" && !t.isModuleEnabled(Querier) { httpMiddleware := middleware.Merge( t.HTTPAuthMiddleware, queryrange.StatsHTTPMiddleware, @@ -528,7 +528,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) { // Only register tailing requests if this process does not act as a Querier // If this process is also a Querier the Querier will register the tail endpoints. - if !t.ModuleManager.IsModuleRegistered(Querier) { + if !t.isModuleEnabled(Querier) { // defer tail endpoints to the default handler t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(defaultHandler) t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(defaultHandler) From eacd9c47a276948543088ac050f2f1e6c8d5f5a2 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Thu, 4 Nov 2021 11:37:14 -0400 Subject: [PATCH 3/4] fix tests and incorrect registering of tail routes --- pkg/querier/worker_service.go | 53 +++++++++++++----------------- pkg/querier/worker_service_test.go | 22 +++++++++++++ 2 files changed, 45 insertions(+), 30 deletions(-) diff --git a/pkg/querier/worker_service.go b/pkg/querier/worker_service.go index ee345d0e49d76..996c8e8b786bc 100644 --- a/pkg/querier/worker_service.go +++ b/pkg/querier/worker_service.go @@ -52,11 +52,29 @@ func InitWorkerService( authMiddleware middleware.Interface, ) (serve services.Service, err error) { + // Create a couple Middlewares used to handle panics, perform auth, and parse Form's in http request + internalMiddleware := middleware.Merge( + serverutil.RecoveryHTTPMiddleware, + authMiddleware, + serverutil.NewPrepopulateMiddleware(), + ) + // External middleware also needs to set JSON content type headers + externalMiddleware := middleware.Merge( + internalMiddleware, + serverutil.ResponseJSONMiddleware(), + ) + internalRouter := mux.NewRouter() for route, handler := range queryRoutesToHandlers { internalRouter.Path(route).Methods("GET", "POST").Handler(handler) } + // There are some routes which are always registered on the external router, add them now and + // wrap them with the externalMiddleware + for route, handler := range alwaysExternalRoutesToHandlers { + externalRouter.Path(route).Methods("GET", "POST").Handler(externalMiddleware.Wrap(handler)) + } + // If the querier is running standalone without the query-frontend or query-scheduler, we must register the internal // HTTP handler externally (as it's the only handler that needs to register on querier routes) and provide the // external Loki Server HTTP handler to the frontend worker to ensure requests it processes use the default @@ -71,7 +89,10 @@ func InitWorkerService( idx++ } - registerRoutesExternally(routes, externalRouter, internalRouter, authMiddleware) + // Register routes externally + for _, route := range routes { + externalRouter.Path(route).Methods("GET", "POST").Handler(externalMiddleware.Wrap(internalRouter)) + } //If no frontend or scheduler address has been configured, then there is no place for the //querier worker to request work from, so no need to start a worker service @@ -108,26 +129,11 @@ func InitWorkerService( return "internalQuerier" })) - // If queries are processed using the external HTTP Server, we need wrap the internal querier with - // HTTP router with middleware to parse the tenant ID from the HTTP header and inject it into the - // request context, as well as make sure any x-www-url-formencoded params are correctly parsed - httpMiddleware := middleware.Merge( - serverutil.RecoveryHTTPMiddleware, - authMiddleware, - serverutil.NewPrepopulateMiddleware(), - ) - - internalHandler = httpMiddleware.Wrap(internalHandler) + internalHandler = internalMiddleware.Wrap(internalHandler) //Querier worker's max concurrent requests must be the same as the querier setting (*cfg.QuerierWorkerConfig).MaxConcurrentRequests = cfg.QuerierMaxConcurrent - // There are some routes which are always registered on the external router, add them now and - // wrap them with the httpMiddleware - for route, handler := range alwaysExternalRoutesToHandlers { - externalRouter.Path(route).Methods("GET", "POST").Handler(httpMiddleware.Wrap(handler)) - } - //Return a querier worker pointed to the internal querier HTTP handler so there is not a conflict in routes between the querier //and the query frontend return querier_worker.NewQuerierWorker( @@ -138,19 +144,6 @@ func InitWorkerService( prometheus.DefaultRegisterer) } -func registerRoutesExternally(routes []string, externalRouter *mux.Router, internalHandler http.Handler, authMiddleware middleware.Interface) { - httpMiddleware := middleware.Merge( - serverutil.RecoveryHTTPMiddleware, - authMiddleware, - serverutil.NewPrepopulateMiddleware(), - serverutil.ResponseJSONMiddleware(), - ) - - for _, route := range routes { - externalRouter.Path(route).Methods("GET", "POST").Handler(httpMiddleware.Wrap(internalHandler)) - } -} - func querierRunningStandalone(cfg WorkerServiceConfig) bool { runningStandalone := !cfg.QueryFrontendEnabled && !cfg.QuerySchedulerEnabled && !cfg.ReadEnabled && !cfg.AllEnabled level.Debug(util_log.Logger).Log( diff --git a/pkg/querier/worker_service_test.go b/pkg/querier/worker_service_test.go index 39abb3c30840b..3eadfc82e1e9a 100644 --- a/pkg/querier/worker_service_test.go +++ b/pkg/querier/worker_service_test.go @@ -22,6 +22,13 @@ func Test_InitQuerierService(t *testing.T) { }), } + var alwaysExternalHandlers = map[string]http.Handler{ + "/loki/api/v1/tail": http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + _, err := res.Write([]byte("test tail handler")) + require.NoError(t, err) + }), + } + testContext := func(config WorkerServiceConfig, authMiddleware middleware.Interface) (*mux.Router, services.Service) { externalRouter := mux.NewRouter() @@ -32,6 +39,7 @@ func Test_InitQuerierService(t *testing.T) { querierWorkerService, err := InitWorkerService( config, mockQueryHandlers, + alwaysExternalHandlers, externalRouter, http.HandlerFunc(externalRouter.ServeHTTP), authMiddleware, @@ -57,6 +65,13 @@ func Test_InitQuerierService(t *testing.T) { externalRouter.ServeHTTP(recorder, request) assert.Equal(t, 200, recorder.Code) assert.Equal(t, "test handler", recorder.Body.String()) + + // Tail endpoints always external + recorder = httptest.NewRecorder() + request = httptest.NewRequest("GET", "/loki/api/v1/tail", nil) + externalRouter.ServeHTTP(recorder, request) + assert.Equal(t, 200, recorder.Code) + assert.Equal(t, "test tail handler", recorder.Body.String()) }) t.Run("wrap external handler with auth middleware", func(t *testing.T) { @@ -187,6 +202,13 @@ func Test_InitQuerierService(t *testing.T) { request := httptest.NewRequest("GET", "/loki/api/v1/query", nil) externalRouter.ServeHTTP(recorder, request) assert.Equal(t, 404, recorder.Code) + + // Tail endpoints always external + recorder = httptest.NewRecorder() + request = httptest.NewRequest("GET", "/loki/api/v1/tail", nil) + externalRouter.ServeHTTP(recorder, request) + assert.Equal(t, 200, recorder.Code) + assert.Equal(t, "test tail handler", recorder.Body.String()) } }) From aa65db762f861bd549aa2a702fa074cffae07198 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Thu, 4 Nov 2021 11:59:26 -0400 Subject: [PATCH 4/4] renamed function and made check for active recursive --- pkg/loki/loki.go | 26 ++++++++++++++++++-------- pkg/loki/loki_test.go | 5 +++-- pkg/loki/modules.go | 4 ++-- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index c06b307308081..0acc3d2f708a2 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -488,16 +488,26 @@ func (t *Loki) setupModuleManager() error { return nil } -func (t *Loki) isModuleEnabled(m string) bool { - for _, tg := range t.Cfg.Target { - if tg == m { +func (t *Loki) isModuleActive(m string) bool { + for _, target := range t.Cfg.Target { + if target == m { return true } - if k, ok := t.deps[tg]; ok { - for _, dp := range k { - if dp == m { - return true - } + if t.recursiveIsModuleActive(target, m) { + return true + } + } + return false +} + +func (t *Loki) recursiveIsModuleActive(target, m string) bool { + if targetDeps, ok := t.deps[target]; ok { + for _, dep := range targetDeps { + if dep == m { + return true + } + if t.recursiveIsModuleActive(dep, m) { + return true } } } diff --git a/pkg/loki/loki_test.go b/pkg/loki/loki_test.go index 64706ac838b3c..e02fb184c4843 100644 --- a/pkg/loki/loki_test.go +++ b/pkg/loki/loki_test.go @@ -69,6 +69,7 @@ func TestLoki_isModuleEnabled(t1 *testing.T) { {name: "Target Query Frontend does not include Querier", target: flagext.StringSliceCSV{"query-frontend"}, module: Querier, want: false}, {name: "Multi target includes querier", target: flagext.StringSliceCSV{"query-frontend", "query-scheduler", "querier"}, module: Querier, want: true}, {name: "Multi target does not include distributor", target: flagext.StringSliceCSV{"query-frontend", "query-scheduler", "querier"}, module: Distributor, want: false}, + {name: "Test recursive dep, Ingester -> TenantConfigs -> RuntimeConfig", target: flagext.StringSliceCSV{"ingester"}, module: RuntimeConfig, want: true}, } for _, tt := range tests { t1.Run(tt.name, func(t1 *testing.T) { @@ -79,8 +80,8 @@ func TestLoki_isModuleEnabled(t1 *testing.T) { } err := t.setupModuleManager() assert.NoError(t1, err) - if got := t.isModuleEnabled(tt.module); got != tt.want { - t1.Errorf("isModuleEnabled() = %v, want %v", got, tt.want) + if got := t.isModuleActive(tt.module); got != tt.want { + t1.Errorf("isModuleActive() = %v, want %v", got, tt.want) } }) } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index a42b67e9064b3..0f062be9c074d 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -494,7 +494,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) { var defaultHandler http.Handler // If this process also acts as a Querier we don't do any proxying of tail requests - if t.Cfg.Frontend.TailProxyURL != "" && !t.isModuleEnabled(Querier) { + if t.Cfg.Frontend.TailProxyURL != "" && !t.isModuleActive(Querier) { httpMiddleware := middleware.Merge( t.HTTPAuthMiddleware, queryrange.StatsHTTPMiddleware, @@ -528,7 +528,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) { // Only register tailing requests if this process does not act as a Querier // If this process is also a Querier the Querier will register the tail endpoints. - if !t.isModuleEnabled(Querier) { + if !t.isModuleActive(Querier) { // defer tail endpoints to the default handler t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(defaultHandler) t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(defaultHandler)