Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki: fix handling of tail requests when using target all or read #4642

Merged
merged 4 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ type Loki struct {
// set during initialization
ModuleManager *modules.Manager
serviceMap map[string]services.Service
deps map[string][]string

Server *server.Server
ring *ring.Ring
Expand Down Expand Up @@ -481,7 +482,24 @@ func (t *Loki) setupModuleManager() error {
}
}

t.deps = deps
t.ModuleManager = mm

return nil
}

func (t *Loki) isModuleEnabled(m string) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this isModuleActive? We have another function with the same name on the Config type which behaves differently: https://github.com/grafana/loki/blob/main/pkg/loki/loki.go#L196-L198

Copy link
Member

@owen-d owen-d Nov 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this would be a recursive check so we could handle IsActive(buzz) == true when target=foo

foo: {bar}
bar: {bazz}
bazz: {buzz}

edit: we should also recontribute that upstream, but I'm fine coding it here for the moment

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed and made it recursive

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double checking: there's no chance of Loki having a cyclic dependency, right? Ex:

foo: {bar}
bar: {bazz}
bazz: {bar}

Because, if there is, I think it could end in an infinite loop (solution would be to track the visited modules and to not visit already visited nodes).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha, i think this could totally happen.

for _, tg := range t.Cfg.Target {
if tg == m {
return true
}
if k, ok := t.deps[tg]; ok {
for _, dp := range k {
if dp == m {
return true
}
}
}
}
return false
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh, this is a much better check of dependencies, should we replace the function of the same name on the Config struct with this version and use that everywhere?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worried about introducing bugs as we close in on the 2.4 release so I created #4644 to track us doing this in the future.

33 changes: 33 additions & 0 deletions pkg/loki/loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
"time"

"github.com/grafana/dskit/flagext"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -52,3 +54,34 @@ func TestFlagDefaults(t *testing.T) {
require.Equal(t, c.Server.GRPCServerPingWithoutStreamAllowed, true)
require.Contains(t, gotFlags[flagToCheck], "(default true)")
}

func TestLoki_isModuleEnabled(t1 *testing.T) {
tests := []struct {
name string
target flagext.StringSliceCSV
module string
want bool
}{
{name: "Target All includes Querier", target: flagext.StringSliceCSV{"all"}, module: Querier, want: true},
{name: "Target Querier does not include Distributor", target: flagext.StringSliceCSV{"querier"}, module: Distributor, want: false},
{name: "Target Read includes Query Frontend", target: flagext.StringSliceCSV{"read"}, module: QueryFrontend, want: true},
{name: "Target Querier does not include Query Frontend", target: flagext.StringSliceCSV{"querier"}, module: QueryFrontend, want: false},
{name: "Target Query Frontend does not include Querier", target: flagext.StringSliceCSV{"query-frontend"}, module: Querier, want: false},
{name: "Multi target includes querier", target: flagext.StringSliceCSV{"query-frontend", "query-scheduler", "querier"}, module: Querier, want: true},
{name: "Multi target does not include distributor", target: flagext.StringSliceCSV{"query-frontend", "query-scheduler", "querier"}, module: Distributor, want: false},
}
for _, tt := range tests {
t1.Run(tt.name, func(t1 *testing.T) {
t := &Loki{
Cfg: Config{
Target: tt.target,
},
}
err := t.setupModuleManager()
assert.NoError(t1, err)
if got := t.isModuleEnabled(tt.module); got != tt.want {
t1.Errorf("isModuleEnabled() = %v, want %v", got, tt.want)
}
})
}
}
32 changes: 25 additions & 7 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,17 +238,30 @@ func (t *Loki) initQuerier() (services.Service, error) {
"/loki/api/v1/label": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/labels": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/tail": http.HandlerFunc(t.Querier.TailHandler),
"/loki/api/v1/series": http.HandlerFunc(t.Querier.SeriesHandler),

"/api/prom/query": http.HandlerFunc(t.Querier.LogQueryHandler),
"/api/prom/label": http.HandlerFunc(t.Querier.LabelHandler),
"/api/prom/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
"/api/prom/tail": http.HandlerFunc(t.Querier.TailHandler),
"/api/prom/series": http.HandlerFunc(t.Querier.SeriesHandler),
}

// We always want to register tail routes externally, tail requests are different from normal queries, they
// are HTTP requests that get upgraded to websocket requests and need to be handled/kept open by the Queriers.
// The frontend has code to proxy these requests, however when running in the same processes
// (such as target=All or target=Read) we don't want the frontend to proxy and instead we want the Queriers
// to directly register these routes.
// In practice this means we always want the queriers to register the tail routes externally, when a querier
// is standalone ALL routes are registered externally, and when it's in the same process as a frontend,
// we disable the proxying of the tail routes in initQueryFrontend() and we still want these routes regiestered
// on the external router.
var alwaysExternalHandlers = map[string]http.Handler{
"/loki/api/v1/tail": http.HandlerFunc(t.Querier.TailHandler),
"/api/prom/tail": http.HandlerFunc(t.Querier.TailHandler),
}

return querier.InitWorkerService(
querierWorkerServiceConfig, queryHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware,
querierWorkerServiceConfig, queryHandlers, alwaysExternalHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware,
)
}

Expand Down Expand Up @@ -480,7 +493,8 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
).Wrap(frontendHandler)

var defaultHandler http.Handler
if t.Cfg.Frontend.TailProxyURL != "" {
// If this process also acts as a Querier we don't do any proxying of tail requests
if t.Cfg.Frontend.TailProxyURL != "" && !t.isModuleEnabled(Querier) {
httpMiddleware := middleware.Merge(
t.HTTPAuthMiddleware,
queryrange.StatsHTTPMiddleware,
Expand Down Expand Up @@ -512,9 +526,13 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
t.Server.HTTP.Path("/api/prom/label/{name}/values").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/api/prom/series").Methods("GET", "POST").Handler(frontendHandler)

// defer tail endpoints to the default handler
t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(defaultHandler)
t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(defaultHandler)
// Only register tailing requests if this process does not act as a Querier
// If this process is also a Querier the Querier will register the tail endpoints.
if !t.isModuleEnabled(Querier) {
// defer tail endpoints to the default handler
t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(defaultHandler)
t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(defaultHandler)
}

if t.frontend == nil {
return services.NewIdleService(nil, func(_ error) error {
Expand Down
7 changes: 7 additions & 0 deletions pkg/querier/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type WorkerServiceConfig struct {
func InitWorkerService(
cfg WorkerServiceConfig,
queryRoutesToHandlers map[string]http.Handler,
alwaysExternalRoutesToHandlers map[string]http.Handler,
externalRouter *mux.Router,
externalHandler http.Handler,
authMiddleware middleware.Interface,
Expand Down Expand Up @@ -121,6 +122,12 @@ func InitWorkerService(
//Querier worker's max concurrent requests must be the same as the querier setting
(*cfg.QuerierWorkerConfig).MaxConcurrentRequests = cfg.QuerierMaxConcurrent

// There are some routes which are always registered on the external router, add them now and
// wrap them with the httpMiddleware
for route, handler := range alwaysExternalRoutesToHandlers {
externalRouter.Path(route).Methods("GET", "POST").Handler(httpMiddleware.Wrap(handler))
}

//Return a querier worker pointed to the internal querier HTTP handler so there is not a conflict in routes between the querier
//and the query frontend
return querier_worker.NewQuerierWorker(
Expand Down