From 0f0dc81ee3912d1345f7ac86f37ce15211047e70 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 6 Jan 2022 14:52:29 +0100 Subject: [PATCH 01/19] Fix deadlock in disconnecting querier Signed-off-by: Cyril Tovena --- .../frontend/transport/handler_test.go | 30 ++ pkg/lokifrontend/frontend/v1/frontend.go | 13 +- pkg/lokifrontend/frontend/v1/frontend_test.go | 303 +++++++++++++++++ pkg/lokifrontend/frontend/v1/queue_test.go | 171 ++++++++++ pkg/lokifrontend/frontend/v2/frontend_test.go | 274 ++++++++++++++++ pkg/querier/worker/frontend_processor_test.go | 74 +++++ pkg/querier/worker/worker_test.go | 108 +++++++ pkg/scheduler/queue/queue.go | 72 ++++- pkg/scheduler/queue/queue_test.go | 304 ++++++++++++++++++ pkg/scheduler/scheduler.go | 17 +- 10 files changed, 1331 insertions(+), 35 deletions(-) create mode 100644 pkg/lokifrontend/frontend/transport/handler_test.go create mode 100644 pkg/lokifrontend/frontend/v1/frontend_test.go create mode 100644 pkg/lokifrontend/frontend/v1/queue_test.go create mode 100644 pkg/lokifrontend/frontend/v2/frontend_test.go create mode 100644 pkg/querier/worker/frontend_processor_test.go create mode 100644 pkg/querier/worker/worker_test.go create mode 100644 pkg/scheduler/queue/queue_test.go diff --git a/pkg/lokifrontend/frontend/transport/handler_test.go b/pkg/lokifrontend/frontend/transport/handler_test.go new file mode 100644 index 0000000000000..136f1879933c1 --- /dev/null +++ b/pkg/lokifrontend/frontend/transport/handler_test.go @@ -0,0 +1,30 @@ +package transport + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/httpgrpc" +) + +func TestWriteError(t *testing.T) { + for _, test := range []struct { + status int + err error + }{ + {http.StatusInternalServerError, errors.New("unknown")}, + {http.StatusGatewayTimeout, context.DeadlineExceeded}, + {StatusClientClosedRequest, context.Canceled}, + {http.StatusBadRequest, httpgrpc.Errorf(http.StatusBadRequest, "")}, + } { + t.Run(test.err.Error(), func(t *testing.T) { + w := httptest.NewRecorder() + writeError(w, test.err) + require.Equal(t, test.status, w.Result().StatusCode) + }) + } +} diff --git a/pkg/lokifrontend/frontend/v1/frontend.go b/pkg/lokifrontend/frontend/v1/frontend.go index 13619b8278e23..086bf971da5ac 100644 --- a/pkg/lokifrontend/frontend/v1/frontend.go +++ b/pkg/lokifrontend/frontend/v1/frontend.go @@ -25,9 +25,7 @@ import ( lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" ) -var ( - errTooManyRequest = httpgrpc.Errorf(http.StatusTooManyRequests, "too many outstanding requests") -) +var errTooManyRequest = httpgrpc.Errorf(http.StatusTooManyRequests, "too many outstanding requests") // Config for a Frontend. type Config struct { @@ -198,14 +196,6 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error { f.requestQueue.RegisterQuerierConnection(querierID) defer f.requestQueue.UnregisterQuerierConnection(querierID) - // If the downstream request(from querier -> frontend) is cancelled, - // we need to ping the condition variable to unblock getNextRequestForQuerier. - // Ideally we'd have ctx aware condition variables... - go func() { - <-server.Context().Done() - f.requestQueue.QuerierDisconnecting() - }() - lastUserIndex := queue.FirstUser() for { @@ -302,7 +292,6 @@ func getQuerierID(server frontendv1pb.Frontend_ProcessServer) (string, error) { Url: "/invalid_request_sent_by_frontend", }, }) - if err != nil { return "", err } diff --git a/pkg/lokifrontend/frontend/v1/frontend_test.go b/pkg/lokifrontend/frontend/v1/frontend_test.go new file mode 100644 index 0000000000000..af08afab5adb9 --- /dev/null +++ b/pkg/lokifrontend/frontend/v1/frontend_test.go @@ -0,0 +1,303 @@ +package v1 + +import ( + "context" + "fmt" + "io/ioutil" + "net" + "net/http" + "strings" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/gorilla/mux" + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/services" + otgrpc "github.com/opentracing-contrib/go-grpc" + "github.com/opentracing-contrib/go-stdlib/nethttp" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/jaeger-client-go" + "github.com/uber/jaeger-client-go/config" + httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" + "go.uber.org/atomic" + "google.golang.org/grpc" + + "github.com/grafana/loki/pkg/lokifrontend/frontend/transport" + "github.com/grafana/loki/pkg/lokifrontend/frontend/v1/frontendv1pb" + querier_worker "github.com/grafana/loki/pkg/querier/worker" + "github.com/grafana/loki/pkg/scheduler/queue" +) + +const ( + query = "/api/v1/query_range?end=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680&step=120" + responseBody = `{"status":"success","data":{"resultType":"Matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}]}}` +) + +func TestFrontend(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := w.Write([]byte("Hello World")) + require.NoError(t, err) + }) + test := func(addr string, _ *Frontend) { + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/", addr), nil) + require.NoError(t, err) + err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), "1"), req) + require.NoError(t, err) + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + + body, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + + assert.Equal(t, "Hello World", string(body)) + } + + testFrontend(t, defaultFrontendConfig(), handler, test, false, nil, nil) + testFrontend(t, defaultFrontendConfig(), handler, test, true, nil, nil) +} + +func TestFrontendPropagateTrace(t *testing.T) { + closer, err := config.Configuration{}.InitGlobalTracer("test") + require.NoError(t, err) + defer closer.Close() + + observedTraceID := make(chan string, 2) + + handler := middleware.Tracer{}.Wrap(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + sp := opentracing.SpanFromContext(r.Context()) + defer sp.Finish() + + traceID := fmt.Sprintf("%v", sp.Context().(jaeger.SpanContext).TraceID()) + observedTraceID <- traceID + + _, err = w.Write([]byte(responseBody)) + require.NoError(t, err) + })) + + test := func(addr string, _ *Frontend) { + sp, ctx := opentracing.StartSpanFromContext(context.Background(), "client") + defer sp.Finish() + traceID := fmt.Sprintf("%v", sp.Context().(jaeger.SpanContext).TraceID()) + + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/%s", addr, query), nil) + require.NoError(t, err) + req = req.WithContext(ctx) + err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(ctx, "1"), req) + require.NoError(t, err) + + req, tr := nethttp.TraceRequest(opentracing.GlobalTracer(), req) + defer tr.Finish() + + client := http.Client{ + Transport: &nethttp.Transport{}, + } + resp, err := client.Do(req) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + + defer resp.Body.Close() + _, err = ioutil.ReadAll(resp.Body) + require.NoError(t, err) + + // Query should do one call. + assert.Equal(t, traceID, <-observedTraceID) + } + testFrontend(t, defaultFrontendConfig(), handler, test, false, nil, nil) + testFrontend(t, defaultFrontendConfig(), handler, test, true, nil, nil) +} + +func TestFrontendCheckReady(t *testing.T) { + for _, tt := range []struct { + name string + connectedClients int + msg string + readyForRequests bool + }{ + {"connected clients are ready", 3, "", true}, + {"no url, no clients is not ready", 0, "not ready: number of queriers connected to query-frontend is 0", false}, + } { + t.Run(tt.name, func(t *testing.T) { + f := &Frontend{ + log: log.NewNopLogger(), + requestQueue: queue.NewRequestQueue(5, 0, + prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + ), + } + for i := 0; i < tt.connectedClients; i++ { + f.requestQueue.RegisterQuerierConnection("test") + } + err := f.CheckReady(context.Background()) + errMsg := "" + + if err != nil { + errMsg = err.Error() + } + + require.Equal(t, tt.msg, errMsg) + }) + } +} + +// TestFrontendCancel ensures that when client requests are cancelled, +// the underlying query is correctly cancelled _and not retried_. +func TestFrontendCancel(t *testing.T) { + var tries atomic.Int32 + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-r.Context().Done() + tries.Inc() + }) + test := func(addr string, _ *Frontend) { + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/", addr), nil) + require.NoError(t, err) + err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), "1"), req) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + req = req.WithContext(ctx) + + go func() { + time.Sleep(100 * time.Millisecond) + cancel() + }() + + _, err = http.DefaultClient.Do(req) + require.Error(t, err) + + time.Sleep(100 * time.Millisecond) + assert.Equal(t, int32(1), tries.Load()) + } + testFrontend(t, defaultFrontendConfig(), handler, test, false, nil, nil) + tries.Store(0) + testFrontend(t, defaultFrontendConfig(), handler, test, true, nil, nil) +} + +func TestFrontendMetricsCleanup(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := w.Write([]byte("Hello World")) + require.NoError(t, err) + }) + + for _, matchMaxConcurrency := range []bool{false, true} { + reg := prometheus.NewPedanticRegistry() + + test := func(addr string, fr *Frontend) { + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/", addr), nil) + require.NoError(t, err) + err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), "1"), req) + require.NoError(t, err) + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + + assert.Equal(t, "Hello World", string(body)) + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_query_frontend_queue_length Number of queries in the queue. + # TYPE cortex_query_frontend_queue_length gauge + cortex_query_frontend_queue_length{user="1"} 0 + `), "cortex_query_frontend_queue_length")) + + fr.cleanupInactiveUserMetrics("1") + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_query_frontend_queue_length Number of queries in the queue. + # TYPE cortex_query_frontend_queue_length gauge + `), "cortex_query_frontend_queue_length")) + } + + testFrontend(t, defaultFrontendConfig(), handler, test, matchMaxConcurrency, nil, reg) + } +} + +func testFrontend(t *testing.T, config Config, handler http.Handler, test func(addr string, frontend *Frontend), matchMaxConcurrency bool, l log.Logger, reg prometheus.Registerer) { + logger := log.NewNopLogger() + if l != nil { + logger = l + } + + var workerConfig querier_worker.Config + flagext.DefaultValues(&workerConfig) + workerConfig.Parallelism = 1 + workerConfig.MatchMaxConcurrency = matchMaxConcurrency + workerConfig.MaxConcurrentRequests = 1 + + // localhost:0 prevents firewall warnings on Mac OS X. + grpcListen, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + workerConfig.FrontendAddress = grpcListen.Addr().String() + + httpListen, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + + v1, err := New(config, limits{}, logger, reg) + require.NoError(t, err) + require.NotNil(t, v1) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), v1)) + defer func() { + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), v1)) + }() + + grpcServer := grpc.NewServer( + grpc.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(opentracing.GlobalTracer())), + ) + defer grpcServer.GracefulStop() + + frontendv1pb.RegisterFrontendServer(grpcServer, v1) + + // Default HTTP handler config. + handlerCfg := transport.HandlerConfig{} + flagext.DefaultValues(&handlerCfg) + + rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(v1) + r := mux.NewRouter() + r.PathPrefix("/").Handler(middleware.Merge( + middleware.AuthenticateUser, + middleware.Tracer{}, + ).Wrap(transport.NewHandler(handlerCfg, rt, logger, nil))) + + httpServer := http.Server{ + Handler: r, + } + defer httpServer.Shutdown(context.Background()) //nolint:errcheck + + go httpServer.Serve(httpListen) //nolint:errcheck + go grpcServer.Serve(grpcListen) //nolint:errcheck + + var worker services.Service + worker, err = querier_worker.NewQuerierWorker(workerConfig, nil, httpgrpc_server.NewServer(handler), logger, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), worker)) + + test(httpListen.Addr().String(), v1) + + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), worker)) +} + +func defaultFrontendConfig() Config { + config := Config{} + flagext.DefaultValues(&config) + return config +} + +type limits struct { + queriers int +} + +func (l limits) MaxQueriersPerUser(_ string) int { + return l.queriers +} diff --git a/pkg/lokifrontend/frontend/v1/queue_test.go b/pkg/lokifrontend/frontend/v1/queue_test.go new file mode 100644 index 0000000000000..b3b6fbcf9e3e8 --- /dev/null +++ b/pkg/lokifrontend/frontend/v1/queue_test.go @@ -0,0 +1,171 @@ +package v1 + +import ( + "context" + "fmt" + "strconv" + "strings" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/services" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" + "google.golang.org/grpc/metadata" + + "github.com/grafana/loki/pkg/lokifrontend/frontend/v1/frontendv1pb" +) + +func setupFrontend(t *testing.T, config Config) (*Frontend, error) { + logger := log.NewNopLogger() + + frontend, err := New(config, limits{queriers: 3}, logger, nil) + require.NoError(t, err) + + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), frontend)) + }) + return frontend, nil +} + +func testReq(ctx context.Context, reqID, user string) *request { + return &request{ + originalCtx: ctx, + err: make(chan error, 1), + request: &httpgrpc.HTTPRequest{ + // Good enough for testing. + Method: user, + Url: reqID, + }, + response: make(chan *httpgrpc.HTTPResponse, 1), + } +} + +func TestDequeuesExpiredRequests(t *testing.T) { + var config Config + flagext.DefaultValues(&config) + config.MaxOutstandingPerTenant = 10 + userID := "1" + + f, err := setupFrontend(t, config) + require.NoError(t, err) + + ctx := user.InjectOrgID(context.Background(), userID) + expired, cancel := context.WithCancel(ctx) + cancel() + + good := 0 + for i := 0; i < config.MaxOutstandingPerTenant; i++ { + var err error + if i%5 == 0 { + good++ + err = f.queueRequest(ctx, testReq(ctx, fmt.Sprintf("good-%d", i), userID)) + } else { + err = f.queueRequest(ctx, testReq(expired, fmt.Sprintf("expired-%d", i), userID)) + } + + require.Nil(t, err) + } + + // Calling Process will only return when client disconnects or context is finished. + // We use context timeout to stop Process call. + ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel2() + + m := &processServerMock{ctx: ctx2, querierID: "querier"} + err = f.Process(m) + require.EqualError(t, err, context.DeadlineExceeded.Error()) + + // Verify that only non-expired requests were forwarded to querier. + for _, r := range m.requests { + require.True(t, strings.HasPrefix(r.Url, "good-"), r.Url) + } + require.Len(t, m.requests, good) +} + +func TestRoundRobinQueues(t *testing.T) { + var config Config + flagext.DefaultValues(&config) + + const ( + requests = 100 + tenants = 10 + ) + + config.MaxOutstandingPerTenant = requests + + f, err := setupFrontend(t, config) + require.NoError(t, err) + + for i := 0; i < requests; i++ { + userID := fmt.Sprint(i / tenants) + ctx := user.InjectOrgID(context.Background(), userID) + + err = f.queueRequest(ctx, testReq(ctx, fmt.Sprintf("%d", i), userID)) + require.NoError(t, err) + } + + // Calling Process will only return when client disconnects or context is finished. + // We use context timeout to stop Process call. + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + m := &processServerMock{ctx: ctx, querierID: "querier"} + err = f.Process(m) + require.EqualError(t, err, context.DeadlineExceeded.Error()) + + require.Len(t, m.requests, requests) + for i, r := range m.requests { + intUserID, err := strconv.Atoi(r.Method) + require.NoError(t, err) + + require.Equal(t, i%tenants, intUserID) + } +} + +// This mock behaves as connected querier worker to frontend. It will remember each request +// that frontend sends, and reply with 200 HTTP status code. +type processServerMock struct { + ctx context.Context + querierID string + + response *frontendv1pb.ClientToFrontend + + requests []*httpgrpc.HTTPRequest +} + +func (p *processServerMock) Send(client *frontendv1pb.FrontendToClient) error { + switch { + case client.GetType() == frontendv1pb.GET_ID: + p.response = &frontendv1pb.ClientToFrontend{ClientID: p.querierID} + return nil + + case client.GetType() == frontendv1pb.HTTP_REQUEST: + p.requests = append(p.requests, client.HttpRequest) + p.response = &frontendv1pb.ClientToFrontend{HttpResponse: &httpgrpc.HTTPResponse{Code: 200}} + return nil + + default: + return errors.New("unknown message") + } +} + +func (p *processServerMock) Recv() (*frontendv1pb.ClientToFrontend, error) { + if p.response != nil { + m := p.response + p.response = nil + return m, nil + } + return nil, errors.New("no message") +} + +func (p *processServerMock) SetHeader(_ metadata.MD) error { return nil } +func (p *processServerMock) SendHeader(_ metadata.MD) error { return nil } +func (p *processServerMock) SetTrailer(md metadata.MD) {} +func (p *processServerMock) Context() context.Context { return p.ctx } +func (p *processServerMock) SendMsg(m interface{}) error { return nil } +func (p *processServerMock) RecvMsg(m interface{}) error { return nil } diff --git a/pkg/lokifrontend/frontend/v2/frontend_test.go b/pkg/lokifrontend/frontend/v2/frontend_test.go new file mode 100644 index 0000000000000..7a34fffde67ff --- /dev/null +++ b/pkg/lokifrontend/frontend/v2/frontend_test.go @@ -0,0 +1,274 @@ +package v2 + +import ( + "context" + "net" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/go-kit/log" + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/services" + "github.com/grafana/loki/pkg/util/test" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" + "go.uber.org/atomic" + "google.golang.org/grpc" + + "github.com/grafana/loki/pkg/lokifrontend/frontend/v2/frontendv2pb" + "github.com/grafana/loki/pkg/scheduler/schedulerpb" +) + +const testFrontendWorkerConcurrency = 5 + +func setupFrontend(t *testing.T, reg prometheus.Registerer, schedulerReplyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend) (*Frontend, *mockScheduler) { + l, err := net.Listen("tcp", "") + require.NoError(t, err) + + server := grpc.NewServer() + + h, p, err := net.SplitHostPort(l.Addr().String()) + require.NoError(t, err) + + grpcPort, err := strconv.Atoi(p) + require.NoError(t, err) + + cfg := Config{} + flagext.DefaultValues(&cfg) + cfg.SchedulerAddress = l.Addr().String() + cfg.WorkerConcurrency = testFrontendWorkerConcurrency + cfg.Addr = h + cfg.Port = grpcPort + + // logger := log.NewLogfmtLogger(os.Stdout) + logger := log.NewNopLogger() + f, err := NewFrontend(cfg, nil, logger, reg) + require.NoError(t, err) + + frontendv2pb.RegisterFrontendForQuerierServer(server, f) + + ms := newMockScheduler(t, f, schedulerReplyFunc) + schedulerpb.RegisterSchedulerForFrontendServer(server, ms) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), f)) + t.Cleanup(func() { + _ = services.StopAndAwaitTerminated(context.Background(), f) + }) + + go func() { + _ = server.Serve(l) + }() + + t.Cleanup(func() { + _ = l.Close() + }) + + // Wait for frontend to connect to scheduler. + test.Poll(t, 1*time.Second, 1, func() interface{} { + ms.mu.Lock() + defer ms.mu.Unlock() + + return len(ms.frontendAddr) + }) + + return f, ms +} + +func sendResponseWithDelay(f *Frontend, delay time.Duration, userID string, queryID uint64, resp *httpgrpc.HTTPResponse) { + if delay > 0 { + time.Sleep(delay) + } + + ctx := user.InjectOrgID(context.Background(), userID) + _, _ = f.QueryResult(ctx, &frontendv2pb.QueryResultRequest{ + QueryID: queryID, + HttpResponse: resp, + Stats: &stats.Stats{}, + }) +} + +func TestFrontendBasicWorkflow(t *testing.T) { + const ( + body = "all fine here" + userID = "test" + ) + + f, _ := setupFrontend(t, nil, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend { + // We cannot call QueryResult directly, as Frontend is not yet waiting for the response. + // It first needs to be told that enqueuing has succeeded. + go sendResponseWithDelay(f, 100*time.Millisecond, userID, msg.QueryID, &httpgrpc.HTTPResponse{ + Code: 200, + Body: []byte(body), + }) + + return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} + }) + + resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}) + require.NoError(t, err) + require.Equal(t, int32(200), resp.Code) + require.Equal(t, []byte(body), resp.Body) +} + +func TestFrontendRetryEnqueue(t *testing.T) { + // Frontend uses worker concurrency to compute number of retries. We use one less failure. + failures := atomic.NewInt64(testFrontendWorkerConcurrency - 1) + const ( + body = "hello world" + userID = "test" + ) + + f, _ := setupFrontend(t, nil, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend { + fail := failures.Dec() + if fail >= 0 { + return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN} + } + + go sendResponseWithDelay(f, 100*time.Millisecond, userID, msg.QueryID, &httpgrpc.HTTPResponse{ + Code: 200, + Body: []byte(body), + }) + + return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} + }) + _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}) + require.NoError(t, err) +} + +func TestFrontendEnqueueFailure(t *testing.T) { + f, _ := setupFrontend(t, nil, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend { + return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN} + }) + + _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), &httpgrpc.HTTPRequest{}) + require.Error(t, err) + require.True(t, strings.Contains(err.Error(), "failed to enqueue request")) +} + +func TestFrontendCancellation(t *testing.T) { + f, ms := setupFrontend(t, nil, nil) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{}) + require.EqualError(t, err, context.DeadlineExceeded.Error()) + require.Nil(t, resp) + + // We wait a bit to make sure scheduler receives the cancellation request. + test.Poll(t, time.Second, 2, func() interface{} { + ms.mu.Lock() + defer ms.mu.Unlock() + + return len(ms.msgs) + }) + + ms.checkWithLock(func() { + require.Equal(t, 2, len(ms.msgs)) + require.True(t, ms.msgs[0].Type == schedulerpb.ENQUEUE) + require.True(t, ms.msgs[1].Type == schedulerpb.CANCEL) + require.True(t, ms.msgs[0].QueryID == ms.msgs[1].QueryID) + }) +} + +func TestFrontendFailedCancellation(t *testing.T) { + f, ms := setupFrontend(t, nil, nil) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + time.Sleep(100 * time.Millisecond) + + // stop scheduler workers + addr := "" + f.schedulerWorkers.mu.Lock() + for k := range f.schedulerWorkers.workers { + addr = k + break + } + f.schedulerWorkers.mu.Unlock() + + f.schedulerWorkers.AddressRemoved(addr) + + // Wait for worker goroutines to stop. + time.Sleep(100 * time.Millisecond) + + // Cancel request. Frontend will try to send cancellation to scheduler, but that will fail (not visible to user). + // Everything else should still work fine. + cancel() + }() + + // send request + resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{}) + require.EqualError(t, err, context.Canceled.Error()) + require.Nil(t, resp) + + ms.checkWithLock(func() { + require.Equal(t, 1, len(ms.msgs)) + }) +} + +type mockScheduler struct { + t *testing.T + f *Frontend + + replyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend + + mu sync.Mutex + frontendAddr map[string]int + msgs []*schedulerpb.FrontendToScheduler +} + +func newMockScheduler(t *testing.T, f *Frontend, replyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend) *mockScheduler { + return &mockScheduler{t: t, f: f, frontendAddr: map[string]int{}, replyFunc: replyFunc} +} + +func (m *mockScheduler) checkWithLock(fn func()) { + m.mu.Lock() + defer m.mu.Unlock() + + fn() +} + +func (m *mockScheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_FrontendLoopServer) error { + init, err := frontend.Recv() + if err != nil { + return err + } + + m.mu.Lock() + m.frontendAddr[init.FrontendAddress]++ + m.mu.Unlock() + + // Ack INIT from frontend. + if err := frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}); err != nil { + return err + } + + for { + msg, err := frontend.Recv() + if err != nil { + return err + } + + m.mu.Lock() + m.msgs = append(m.msgs, msg) + m.mu.Unlock() + + reply := &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} + if m.replyFunc != nil { + reply = m.replyFunc(m.f, msg) + } + + if err := frontend.Send(reply); err != nil { + return err + } + } +} diff --git a/pkg/querier/worker/frontend_processor_test.go b/pkg/querier/worker/frontend_processor_test.go new file mode 100644 index 0000000000000..68981586c9fd3 --- /dev/null +++ b/pkg/querier/worker/frontend_processor_test.go @@ -0,0 +1,74 @@ +package worker + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/loki/pkg/util/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "google.golang.org/grpc" +) + +func TestRecvFailDoesntCancelProcess(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // We use random port here, hopefully without any gRPC server. + cc, err := grpc.DialContext(ctx, "localhost:999", grpc.WithInsecure()) + require.NoError(t, err) + + cfg := Config{} + mgr := newFrontendProcessor(cfg, nil, log.NewNopLogger()) + running := atomic.NewBool(false) + go func() { + running.Store(true) + defer running.Store(false) + + mgr.processQueriesOnSingleStream(ctx, cc, "test:12345") + }() + + test.Poll(t, time.Second, true, func() interface{} { + return running.Load() + }) + + // Wait a bit, and verify that processQueriesOnSingleStream is still running, and hasn't stopped + // just because it cannot contact frontend. + time.Sleep(100 * time.Millisecond) + assert.Equal(t, true, running.Load()) + + cancel() + test.Poll(t, time.Second, false, func() interface{} { + return running.Load() + }) +} + +func TestContextCancelStopsProcess(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // We use random port here, hopefully without any gRPC server. + cc, err := grpc.DialContext(ctx, "localhost:999", grpc.WithInsecure()) + require.NoError(t, err) + + pm := newProcessorManager(ctx, &mockProcessor{}, cc, "test") + pm.concurrency(1) + + test.Poll(t, time.Second, 1, func() interface{} { + return int(pm.currentProcessors.Load()) + }) + + cancel() + + test.Poll(t, time.Second, 0, func() interface{} { + return int(pm.currentProcessors.Load()) + }) + + pm.stop() + test.Poll(t, time.Second, 0, func() interface{} { + return int(pm.currentProcessors.Load()) + }) +} diff --git a/pkg/querier/worker/worker_test.go b/pkg/querier/worker/worker_test.go new file mode 100644 index 0000000000000..c835b3f1836c5 --- /dev/null +++ b/pkg/querier/worker/worker_test.go @@ -0,0 +1,108 @@ +package worker + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/services" + "github.com/grafana/loki/pkg/util/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" +) + +func TestResetConcurrency(t *testing.T) { + tests := []struct { + name string + parallelism int + maxConcurrent int + numTargets int + expectedConcurrency int + }{ + { + name: "Test create at least one processor per target", + parallelism: 0, + maxConcurrent: 0, + numTargets: 2, + expectedConcurrency: 2, + }, + { + name: "Test parallelism per target", + parallelism: 4, + maxConcurrent: 0, + numTargets: 2, + expectedConcurrency: 8, + }, + { + name: "Test Total Parallelism with a remainder", + parallelism: 1, + maxConcurrent: 7, + numTargets: 4, + expectedConcurrency: 7, + }, + { + name: "Test Total Parallelism dividing evenly", + parallelism: 1, + maxConcurrent: 6, + numTargets: 2, + expectedConcurrency: 6, + }, + { + name: "Test Total Parallelism at least one worker per target", + parallelism: 1, + maxConcurrent: 3, + numTargets: 6, + expectedConcurrency: 6, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := Config{ + Parallelism: tt.parallelism, + MatchMaxConcurrency: tt.maxConcurrent > 0, + MaxConcurrentRequests: tt.maxConcurrent, + } + + w, err := newQuerierWorkerWithProcessor(cfg, log.NewNopLogger(), &mockProcessor{}, "", nil, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), w)) + + for i := 0; i < tt.numTargets; i++ { + // gRPC connections are virtual... they don't actually try to connect until they are needed. + // This allows us to use dummy ports, and not get any errors. + w.AddressAdded(fmt.Sprintf("127.0.0.1:%d", i)) + } + + test.Poll(t, 250*time.Millisecond, tt.expectedConcurrency, func() interface{} { + return getConcurrentProcessors(w) + }) + + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), w)) + assert.Equal(t, 0, getConcurrentProcessors(w)) + }) + } +} + +func getConcurrentProcessors(w *querierWorker) int { + result := 0 + w.mu.Lock() + defer w.mu.Unlock() + + for _, mgr := range w.managers { + result += int(mgr.currentProcessors.Load()) + } + + return result +} + +type mockProcessor struct{} + +func (m mockProcessor) processQueriesOnSingleStream(ctx context.Context, _ *grpc.ClientConn, _ string) { + <-ctx.Done() +} + +func (m mockProcessor) notifyShutdown(_ context.Context, _ *grpc.ClientConn, _ string) {} diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index db14f7241a31f..4d611dadb5c24 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -52,7 +52,7 @@ type RequestQueue struct { connectedQuerierWorkers *atomic.Int32 mtx sync.Mutex - cond *sync.Cond // Notified when request is enqueued or dequeued, or querier is disconnected. + cond contextCond // Notified when request is enqueued or dequeued, or querier is disconnected. queues *queues stopped bool @@ -68,7 +68,7 @@ func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, que discardedRequests: discardedRequests, } - q.cond = sync.NewCond(&q.mtx) + q.cond = contextCond{Cond: sync.NewCond(&q.mtx)} q.Service = services.NewTimerService(forgetCheckPeriod, nil, q.forgetDisconnectedQueriers, q.stopping).WithName("request queue") return q @@ -121,7 +121,7 @@ FindQueue: // We need to wait if there are no users, or no pending requests for given querier. for (q.queues.len() == 0 || querierWait) && ctx.Err() == nil && !q.stopped { querierWait = false - q.cond.Wait() + q.cond.Wait(ctx) } if q.stopped { @@ -179,7 +179,7 @@ func (q *RequestQueue) stopping(_ error) error { defer q.mtx.Unlock() for q.queues.len() > 0 && q.connectedQuerierWorkers.Load() > 0 { - q.cond.Wait() + q.cond.Wait(context.Background()) } // Only stop after dispatching enqueued requests. @@ -213,11 +213,65 @@ func (q *RequestQueue) NotifyQuerierShutdown(querierID string) { q.queues.notifyQuerierShutdown(querierID) } -// When querier is waiting for next request, this unblocks the method. -func (q *RequestQueue) QuerierDisconnecting() { - q.cond.Broadcast() -} - func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64 { return float64(q.connectedQuerierWorkers.Load()) } + +// contextCond is a *sync.Cond with Wait() method overridden to support context-based waiting. +type contextCond struct { + *sync.Cond + + // testHookBeforeWaiting is called before calling Cond.Wait() if it's not nil. + // Yes, it's ugly, but the http package settled jurisprudence: + // https://github.com/golang/go/blob/6178d25fc0b28724b1b5aec2b1b74fc06d9294c7/src/net/http/client.go#L596-L601 + testHookBeforeWaiting func() +} + +// Wait does c.cond.Wait() but will also return if the context provided is done. +// All the documentation of sync.Cond.Wait() applies, but it's especially important to remember that the mutex of +// the cond should be held while Wait() is called (and mutex will be held once it returns) +func (c contextCond) Wait(ctx context.Context) { + // "condWait" goroutine does q.cond.Wait() and signals through condWait channel. + condWait := make(chan struct{}) + go func() { + if c.testHookBeforeWaiting != nil { + c.testHookBeforeWaiting() + } + c.Cond.Wait() + close(condWait) + }() + + // "waiting" goroutine: signals that the condWait goroutine has started waiting. + // Notice that a closed waiting channel implies that the goroutine above has started waiting + // (because it has unlocked the mutex), but the other way is not true: + // - condWait it may have unlocked and is waiting, but someone else locked the mutex faster than us: + // in this case that caller will eventually unlock, and we'll be able to enter here. + // - condWait called Wait(), unlocked, received a broadcast and locked again faster than we were able to lock here: + // in this case condWait channel will be closed, and this goroutine will be waiting until we unlock. + waiting := make(chan struct{}) + go func() { + c.L.Lock() + close(waiting) + c.L.Unlock() + }() + + select { + case <-condWait: + // We don't know whether the waiting goroutine is done or not, but we don't care: + // it will be done once nobody is fighting for the mutex anymore. + case <-ctx.Done(): + // In order to avoid leaking the condWait goroutine, we can send a broadcast. + // Before sending the broadcast we need to make sure that condWait goroutine is already waiting (or has already waited). + select { + case <-condWait: + // No need to broadcast as q.cond.Wait() has returned already. + return + case <-waiting: + // q.cond.Wait() might be still waiting (or maybe not!), so we'll poke it just in case. + c.Broadcast() + } + + // Make sure we are not waiting anymore, we need to do that before returning as the caller will need to unlock the mutex. + <-condWait + } +} diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go new file mode 100644 index 0000000000000..9b50b350ddf1d --- /dev/null +++ b/pkg/scheduler/queue/queue_test.go @@ -0,0 +1,304 @@ +package queue + +import ( + "context" + "fmt" + "strconv" + "sync" + "testing" + "time" + + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func BenchmarkGetNextRequest(b *testing.B) { + const maxOutstandingPerTenant = 2 + const numTenants = 50 + const queriers = 5 + + queues := make([]*RequestQueue, 0, b.N) + + for n := 0; n < b.N; n++ { + queue := NewRequestQueue(maxOutstandingPerTenant, 0, + prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + ) + queues = append(queues, queue) + + for ix := 0; ix < queriers; ix++ { + queue.RegisterQuerierConnection(fmt.Sprintf("querier-%d", ix)) + } + + for i := 0; i < maxOutstandingPerTenant; i++ { + for j := 0; j < numTenants; j++ { + userID := strconv.Itoa(j) + + err := queue.EnqueueRequest(userID, "request", 0, nil) + if err != nil { + b.Fatal(err) + } + } + } + } + + ctx := context.Background() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + idx := FirstUser() + for j := 0; j < maxOutstandingPerTenant*numTenants; j++ { + querier := "" + b: + // Find querier with at least one request to avoid blocking in getNextRequestForQuerier. + for _, q := range queues[i].queues.userQueues { + for qid := range q.queriers { + querier = qid + break b + } + } + + _, nidx, err := queues[i].GetNextRequestForQuerier(ctx, idx, querier) + if err != nil { + b.Fatal(err) + } + idx = nidx + } + } +} + +func BenchmarkQueueRequest(b *testing.B) { + const maxOutstandingPerTenant = 2 + const numTenants = 50 + const queriers = 5 + + queues := make([]*RequestQueue, 0, b.N) + users := make([]string, 0, numTenants) + requests := make([]string, 0, numTenants) + + for n := 0; n < b.N; n++ { + q := NewRequestQueue(maxOutstandingPerTenant, 0, + prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + ) + + for ix := 0; ix < queriers; ix++ { + q.RegisterQuerierConnection(fmt.Sprintf("querier-%d", ix)) + } + + queues = append(queues, q) + + for j := 0; j < numTenants; j++ { + requests = append(requests, fmt.Sprintf("%d-%d", n, j)) + users = append(users, strconv.Itoa(j)) + } + } + + b.ResetTimer() + for n := 0; n < b.N; n++ { + for i := 0; i < maxOutstandingPerTenant; i++ { + for j := 0; j < numTenants; j++ { + err := queues[n].EnqueueRequest(users[j], requests[j], 0, nil) + if err != nil { + b.Fatal(err) + } + } + } + } +} + +func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBecauseQuerierHasBeenForgotten(t *testing.T) { + const forgetDelay = 3 * time.Second + + queue := NewRequestQueue(1, forgetDelay, + prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"})) + + // Start the queue service. + ctx := context.Background() + require.NoError(t, services.StartAndAwaitRunning(ctx, queue)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, queue)) + }) + + // Two queriers connect. + queue.RegisterQuerierConnection("querier-1") + queue.RegisterQuerierConnection("querier-2") + + // Querier-2 waits for a new request. + querier2wg := sync.WaitGroup{} + querier2wg.Add(1) + go func() { + defer querier2wg.Done() + _, _, err := queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-2") + require.NoError(t, err) + }() + + // Querier-1 crashes (no graceful shutdown notification). + queue.UnregisterQuerierConnection("querier-1") + + // Enqueue a request from an user which would be assigned to querier-1. + // NOTE: "user-1" hash falls in the querier-1 shard. + require.NoError(t, queue.EnqueueRequest("user-1", "request", 1, nil)) + + startTime := time.Now() + querier2wg.Wait() + waitTime := time.Since(startTime) + + // We expect that querier-2 got the request only after querier-1 forget delay is passed. + assert.GreaterOrEqual(t, waitTime.Milliseconds(), forgetDelay.Milliseconds()) +} + +func TestContextCond(t *testing.T) { + t.Run("wait until broadcast", func(t *testing.T) { + t.Parallel() + mtx := &sync.Mutex{} + cond := contextCond{Cond: sync.NewCond(mtx)} + + doneWaiting := make(chan struct{}) + + mtx.Lock() + go func() { + cond.Wait(context.Background()) + mtx.Unlock() + close(doneWaiting) + }() + + assertChanNotReceived(t, doneWaiting, 100*time.Millisecond, "cond.Wait returned, but it should not because we did not broadcast yet") + + cond.Broadcast() + assertChanReceived(t, doneWaiting, 250*time.Millisecond, "cond.Wait did not return after broadcast") + }) + + t.Run("wait until context deadline", func(t *testing.T) { + t.Parallel() + mtx := &sync.Mutex{} + cond := contextCond{Cond: sync.NewCond(mtx)} + doneWaiting := make(chan struct{}) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mtx.Lock() + go func() { + cond.Wait(ctx) + mtx.Unlock() + close(doneWaiting) + }() + + assertChanNotReceived(t, doneWaiting, 100*time.Millisecond, "cond.Wait returned, but it should not because we did not broadcast yet and didn't cancel the context") + + cancel() + assertChanReceived(t, doneWaiting, 250*time.Millisecond, "cond.Wait did not return after cancelling the context") + }) + + t.Run("wait on already canceled context", func(t *testing.T) { + // This test represents the racy real world scenario, + // we don't know whether it's going to wait before the broadcast triggered by the context cancellation. + t.Parallel() + mtx := &sync.Mutex{} + cond := contextCond{Cond: sync.NewCond(mtx)} + doneWaiting := make(chan struct{}) + + alreadyCanceledContext, cancel := context.WithCancel(context.Background()) + cancel() + + mtx.Lock() + go func() { + cond.Wait(alreadyCanceledContext) + mtx.Unlock() + close(doneWaiting) + }() + + assertChanReceived(t, doneWaiting, 250*time.Millisecond, "cond.Wait did not return after cancelling the context") + }) + + t.Run("wait on already canceled context, but it takes a while to wait", func(t *testing.T) { + t.Parallel() + mtx := &sync.Mutex{} + cond := contextCond{ + Cond: sync.NewCond(mtx), + testHookBeforeWaiting: func() { + // This makes the waiting goroutine so slow that out Wait(ctx) will need to broadcast once it sees it waiting. + time.Sleep(250 * time.Millisecond) + }, + } + doneWaiting := make(chan struct{}) + + alreadyCanceledContext, cancel := context.WithCancel(context.Background()) + cancel() + + mtx.Lock() + go func() { + cond.Wait(alreadyCanceledContext) + mtx.Unlock() + close(doneWaiting) + }() + + assertChanReceived(t, doneWaiting, time.Second, "cond.Wait did not return after 500ms") + }) + + t.Run("lots of goroutines waiting at the same time, none of them misses it's broadcast from cancel", func(t *testing.T) { + t.Parallel() + mtx := &sync.Mutex{} + cond := contextCond{ + Cond: sync.NewCond(mtx), + testHookBeforeWaiting: func() { + // Wait just a little bit to create every goroutine + time.Sleep(time.Millisecond) + }, + } + const goroutines = 100 + + doneWaiting := make(chan struct{}, goroutines) + release := make(chan struct{}) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + for i := 0; i < goroutines; i++ { + go func() { + <-release + + mtx.Lock() + cond.Wait(ctx) + mtx.Unlock() + + doneWaiting <- struct{}{} + }() + } + go func() { + <-release + cancel() + }() + + close(release) + + assert.Eventually(t, func() bool { + return len(doneWaiting) == goroutines + }, time.Second, 10*time.Millisecond) + }) +} + +func assertChanReceived(t *testing.T, c chan struct{}, timeout time.Duration, msg string, args ...interface{}) { + t.Helper() + + select { + case <-c: + case <-time.After(timeout): + t.Fatalf(msg, args...) + } +} + +func assertChanNotReceived(t *testing.T, c chan struct{}, wait time.Duration, msg string, args ...interface{}) { + t.Helper() + + select { + case <-c: + t.Fatalf(msg, args...) + case <-time.After(wait): + // OK! + } +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 5f20b3e500bdd..86e5de1a26797 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -41,9 +41,7 @@ import ( lokihttpreq "github.com/grafana/loki/pkg/util/httpreq" ) -var ( - errSchedulerIsNotRunning = errors.New("scheduler is not running") -) +var errSchedulerIsNotRunning = errors.New("scheduler is not running") const ( // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance @@ -444,14 +442,6 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL s.requestQueue.RegisterQuerierConnection(querierID) defer s.requestQueue.UnregisterQuerierConnection(querierID) - // If the downstream connection to querier is cancelled, - // we need to ping the condition variable to unblock getNextRequestForQuerier. - // Ideally we'd have ctx aware condition variables... - go func() { - <-querier.Context().Done() - s.requestQueue.QuerierDisconnecting() - }() - lastUserIndex := queue.FirstUser() // In stopping state scheduler is not accepting new queries, but still dispatching queries in the queues. @@ -554,7 +544,8 @@ func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuer func (s *Scheduler) forwardErrorToFrontend(ctx context.Context, req *schedulerRequest, requestErr error) { opts, err := s.cfg.GRPCClientConfig.DialOption([]grpc.UnaryClientInterceptor{ otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), - middleware.ClientUserHeaderInterceptor}, + middleware.ClientUserHeaderInterceptor, + }, nil) if err != nil { level.Warn(s.log).Log("msg", "failed to create gRPC options for the connection to frontend to report error", "frontend", req.frontendAddress, "err", err, "requestErr", requestErr) @@ -667,7 +658,6 @@ func (s *Scheduler) running(ctx context.Context) error { } func (s *Scheduler) setRunState(isInSet bool) { - if isInSet { if s.shouldRun.CAS(false, true) { // Value was swapped, meaning this was a state change from stopped to running. @@ -728,7 +718,6 @@ func SafeReadRing(s *Scheduler) ring.ReadRing { } return s.ring - } func (s *Scheduler) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) { From 4c41d3964d6409eb11a31f76e3b534901a97351d Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 6 Jan 2022 16:00:59 +0100 Subject: [PATCH 02/19] todo Signed-off-by: Kaviraj --- pkg/scheduler/scheduler.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 202955d5cb81f..d708caf1a5ec4 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -301,6 +301,7 @@ func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_Front } case schedulerpb.CANCEL: + level.Info(s.log).Log("msg", "inside frontend loop, msg type is cancel", msg.QueryID) s.cancelRequestAndRemoveFromPending(frontendAddress, msg.QueryID) resp = &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} @@ -416,6 +417,7 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr // This method doesn't do removal from the queue. func (s *Scheduler) cancelRequestAndRemoveFromPending(frontendAddr string, queryID uint64) { + level.Info(s.log).Log("msg", "cancelling request and remove from pending", "queryID", queryID) s.pendingRequestsMu.Lock() defer s.pendingRequestsMu.Unlock() @@ -500,7 +502,10 @@ func (s *Scheduler) NotifyQuerierShutdown(_ context.Context, req *schedulerpb.No func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuerier_QuerierLoopServer, req *schedulerRequest) error { // Make sure to cancel request at the end to cleanup resources. - defer s.cancelRequestAndRemoveFromPending(req.frontendAddress, req.queryID) + level.Info(s.log).Log("msg", "forwarding request to the querier", "queryID", req.queryID) + defer func() { + s.cancelRequestAndRemoveFromPending(req.frontendAddress, req.queryID) + }() // Handle the stream sending & receiving on a goroutine so we can // monitoring the contexts in a select and cancel things appropriately. From e28892cf51cbf71128a3b4ed8075524c80babb7d Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 7 Jan 2022 09:03:16 +0100 Subject: [PATCH 03/19] Add more logs to queriers cancellation Signed-off-by: Cyril Tovena --- pkg/querier/http.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 2a59cc37e2c5d..81c18e04e4520 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -7,6 +7,7 @@ import ( "github.com/cortexproject/cortex/pkg/tenant" util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/go-kit/log/level" "github.com/gorilla/websocket" "github.com/prometheus/prometheus/model/labels" @@ -33,6 +34,15 @@ type QueryResponse struct { // RangeQueryHandler is a http.HandlerFunc for range queries. func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) { + log := spanlogger.FromContext(r.Context()) + level.Info(log.Logger).Log("msg", "range query started") + defer func() { + level.Info(log.Logger).Log("msg", "range query ended") + }() + go func() { + <-r.Context().Done() + level.Info(log.Logger).Log("msg", "range query done", "err", r.Context().Err()) + }() // Enforce the query timeout while querying backends ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout)) defer cancel() From 61abcbddb89be1d1f14a2dd76f06d53e55bcfc79 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 7 Jan 2022 09:30:24 +0100 Subject: [PATCH 04/19] new span Signed-off-by: Cyril Tovena --- pkg/querier/http.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 81c18e04e4520..511f74ba181cd 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -34,17 +34,18 @@ type QueryResponse struct { // RangeQueryHandler is a http.HandlerFunc for range queries. func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) { - log := spanlogger.FromContext(r.Context()) + log, ctx := spanlogger.New(r.Context(), "RangeQueryHandler") + defer log.Finish() level.Info(log.Logger).Log("msg", "range query started") defer func() { level.Info(log.Logger).Log("msg", "range query ended") }() go func() { <-r.Context().Done() - level.Info(log.Logger).Log("msg", "range query done", "err", r.Context().Err()) + level.Info(log.Logger).Log("msg", "range query done", "err", ctx.Err()) }() // Enforce the query timeout while querying backends - ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout)) + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout)) defer cancel() request, err := loghttp.ParseRangeQuery(r) From f5352bbeeb929229049deb444ec8038f92b6e551 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 7 Jan 2022 09:31:40 +0100 Subject: [PATCH 05/19] Fixe log Signed-off-by: Cyril Tovena --- pkg/querier/http.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 511f74ba181cd..3537fa5b47de4 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -34,18 +34,17 @@ type QueryResponse struct { // RangeQueryHandler is a http.HandlerFunc for range queries. func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) { - log, ctx := spanlogger.New(r.Context(), "RangeQueryHandler") - defer log.Finish() - level.Info(log.Logger).Log("msg", "range query started") + log := spanlogger.FromContext(r.Context()) + level.Info(log).Log("msg", "range query started") defer func() { - level.Info(log.Logger).Log("msg", "range query ended") + level.Info(log).Log("msg", "range query ended") }() go func() { <-r.Context().Done() - level.Info(log.Logger).Log("msg", "range query done", "err", ctx.Err()) + level.Info(log).Log("msg", "range query done", "err", r.Context().Err()) }() // Enforce the query timeout while querying backends - ctx, cancel := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout)) + ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout)) defer cancel() request, err := loghttp.ParseRangeQuery(r) From d1da6df32cd825a2c420aedcd97de6171a1361de Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 7 Jan 2022 11:12:25 +0100 Subject: [PATCH 06/19] Exit earlier for batch iterator Signed-off-by: Cyril Tovena --- pkg/scheduler/scheduler.go | 9 +++------ pkg/storage/batch.go | 2 +- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index d708caf1a5ec4..f3cd5d4efeefd 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -37,9 +37,7 @@ import ( lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" ) -var ( - errSchedulerIsNotRunning = errors.New("scheduler is not running") -) +var errSchedulerIsNotRunning = errors.New("scheduler is not running") const ( // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance @@ -548,7 +546,8 @@ func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuer func (s *Scheduler) forwardErrorToFrontend(ctx context.Context, req *schedulerRequest, requestErr error) { opts, err := s.cfg.GRPCClientConfig.DialOption([]grpc.UnaryClientInterceptor{ otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), - middleware.ClientUserHeaderInterceptor}, + middleware.ClientUserHeaderInterceptor, + }, nil) if err != nil { level.Warn(s.log).Log("msg", "failed to create gRPC options for the connection to frontend to report error", "frontend", req.frontendAddress, "err", err, "requestErr", requestErr) @@ -661,7 +660,6 @@ func (s *Scheduler) running(ctx context.Context) error { } func (s *Scheduler) setRunState(isInSet bool) { - if isInSet { if s.shouldRun.CAS(false, true) { // Value was swapped, meaning this was a state change from stopped to running. @@ -722,7 +720,6 @@ func SafeReadRing(s *Scheduler) ring.ReadRing { } return s.ring - } func (s *Scheduler) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) { diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index fbd39e9a880ec..47d8dca41ef4a 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -499,7 +499,7 @@ func (it *sampleBatchIterator) Sample() logproto.Sample { func (it *sampleBatchIterator) Next() bool { // for loop to avoid recursion - for { + for it.ctx.Err() == nil { if it.curr != nil && it.curr.Next() { return true } From f559d0b0dc9c86ef0ddbaea586502f71fe5c3484 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Fri, 7 Jan 2022 11:14:27 +0100 Subject: [PATCH 07/19] Add missing return Signed-off-by: Kaviraj --- pkg/storage/batch.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index 47d8dca41ef4a..863f288f1e927 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -522,6 +522,7 @@ func (it *sampleBatchIterator) Next() bool { return false } } + return false } // newChunksIterator creates an iterator over a set of lazychunks. From 49d4d731dd952f9400c3fb602614bac5c542628c Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 7 Jan 2022 12:09:02 +0100 Subject: [PATCH 08/19] Add store context cancellation Signed-off-by: Cyril Tovena --- pkg/storage/chunk/chunk_store_utils.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/storage/chunk/chunk_store_utils.go b/pkg/storage/chunk/chunk_store_utils.go index a86edd33493fa..2be308fc267dc 100644 --- a/pkg/storage/chunk/chunk_store_utils.go +++ b/pkg/storage/chunk/chunk_store_utils.go @@ -143,6 +143,9 @@ func (c *Fetcher) worker() { // FetchChunks fetches a set of chunks from cache and store. Note that the keys passed in must be // lexicographically sorted, while the returned chunks are not in the same order as the passed in chunks. func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string) ([]Chunk, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } log, ctx := spanlogger.New(ctx, "ChunkStore.FetchChunks") defer log.Span.Finish() From 499325fd007c1e177431e4c4c55cbfd5f538934a Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 7 Jan 2022 15:17:44 +0100 Subject: [PATCH 09/19] Fixes a possible cancellation issue Signed-off-by: Cyril Tovena --- pkg/querier/queryrange/limits.go | 36 ++++++++++++++++---------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index f97c391191bd4..8a00780a98bee 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -257,16 +257,16 @@ func newWork(ctx context.Context, req queryrange.Request) work { } func (rt limitedRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { - var wg sync.WaitGroup - intermediate := make(chan work) + var ( + wg sync.WaitGroup + intermediate = make(chan work) + ctx, cancel = context.WithCancel(r.Context()) + ) defer func() { + cancel() wg.Wait() - close(intermediate) }() - ctx, cancel := context.WithCancel(r.Context()) - defer cancel() - // Do not forward any request header. request, err := rt.codec.DecodeRequest(ctx, r, nil) if err != nil { @@ -284,29 +284,29 @@ func (rt limitedRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) parallelism := rt.limits.MaxQueryParallelism(userid) for i := 0; i < parallelism; i++ { + wg.Add(1) go func() { - for w := range intermediate { - resp, err := rt.do(w.ctx, w.req) + defer wg.Done() + for { select { - case w.result <- result{response: resp, err: err}: - case <-w.ctx.Done(): - w.result <- result{err: w.ctx.Err()} + case w := <-intermediate: + resp, err := rt.do(w.ctx, w.req) + w.result <- result{response: resp, err: err} + case <-ctx.Done(): + return } - } }() } response, err := rt.middleware.Wrap( queryrange.HandlerFunc(func(ctx context.Context, r queryrange.Request) (queryrange.Response, error) { - wg.Add(1) - defer wg.Done() - - if ctx.Err() != nil { + w := newWork(ctx, r) + select { + case intermediate <- w: + case <-ctx.Done(): return nil, ctx.Err() } - w := newWork(ctx, r) - intermediate <- w select { case response := <-w.result: return response.response, response.err From f7c599813cbf84a2a33da55272d451d9dcb56ac8 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 7 Jan 2022 15:36:23 +0100 Subject: [PATCH 10/19] Rmove code to find issues Signed-off-by: Cyril Tovena --- pkg/querier/queryrange/roundtrip.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 9cab48feff534..e01e8b0c6b256 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -375,11 +375,11 @@ func NewMetricTripperware( ) } - queryRangeMiddleware = append( - queryRangeMiddleware, - queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), - SplitByIntervalMiddleware(limits, codec, splitMetricByTime, splitByMetrics), - ) + // queryRangeMiddleware = append( + // queryRangeMiddleware, + // queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), + // SplitByIntervalMiddleware(limits, codec, splitMetricByTime, splitByMetrics), + // ) var c cache.Cache if cfg.CacheResults { @@ -420,11 +420,11 @@ func NewMetricTripperware( } if cfg.MaxRetries > 0 { - queryRangeMiddleware = append( - queryRangeMiddleware, - queryrange.InstrumentMiddleware("retry", instrumentMetrics), - queryrange.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics), - ) + // queryRangeMiddleware = append( + // queryRangeMiddleware, + // queryrange.InstrumentMiddleware("retry", instrumentMetrics), + // queryrange.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics), + // ) } return func(next http.RoundTripper) http.RoundTripper { From f66d78839bd9ce08ede13cf639d2b28bd328a972 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Fri, 7 Jan 2022 16:17:15 +0100 Subject: [PATCH 11/19] Add splitMiddleware back to handler Signed-off-by: Kaviraj --- pkg/querier/queryrange/roundtrip.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index e01e8b0c6b256..9cab48feff534 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -375,11 +375,11 @@ func NewMetricTripperware( ) } - // queryRangeMiddleware = append( - // queryRangeMiddleware, - // queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), - // SplitByIntervalMiddleware(limits, codec, splitMetricByTime, splitByMetrics), - // ) + queryRangeMiddleware = append( + queryRangeMiddleware, + queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), + SplitByIntervalMiddleware(limits, codec, splitMetricByTime, splitByMetrics), + ) var c cache.Cache if cfg.CacheResults { @@ -420,11 +420,11 @@ func NewMetricTripperware( } if cfg.MaxRetries > 0 { - // queryRangeMiddleware = append( - // queryRangeMiddleware, - // queryrange.InstrumentMiddleware("retry", instrumentMetrics), - // queryrange.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics), - // ) + queryRangeMiddleware = append( + queryRangeMiddleware, + queryrange.InstrumentMiddleware("retry", instrumentMetrics), + queryrange.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics), + ) } return func(next http.RoundTripper) http.RoundTripper { From 538ab07dc8286006ddc345063dbff0ec643bda3d Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Fri, 7 Jan 2022 17:14:10 +0100 Subject: [PATCH 12/19] Remove query split Signed-off-by: Kaviraj --- pkg/querier/queryrange/roundtrip.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 9cab48feff534..e01e8b0c6b256 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -375,11 +375,11 @@ func NewMetricTripperware( ) } - queryRangeMiddleware = append( - queryRangeMiddleware, - queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), - SplitByIntervalMiddleware(limits, codec, splitMetricByTime, splitByMetrics), - ) + // queryRangeMiddleware = append( + // queryRangeMiddleware, + // queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), + // SplitByIntervalMiddleware(limits, codec, splitMetricByTime, splitByMetrics), + // ) var c cache.Cache if cfg.CacheResults { @@ -420,11 +420,11 @@ func NewMetricTripperware( } if cfg.MaxRetries > 0 { - queryRangeMiddleware = append( - queryRangeMiddleware, - queryrange.InstrumentMiddleware("retry", instrumentMetrics), - queryrange.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics), - ) + // queryRangeMiddleware = append( + // queryRangeMiddleware, + // queryrange.InstrumentMiddleware("retry", instrumentMetrics), + // queryrange.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics), + // ) } return func(next http.RoundTripper) http.RoundTripper { From 7ed4d02fac82a705b78d468156b8b9425cfa8776 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Sat, 8 Jan 2022 16:57:08 +0100 Subject: [PATCH 13/19] Remove split middleware and context cancel check for chunks --- pkg/querier/queryrange/roundtrip.go | 20 +++++++++---------- pkg/storage/chunk/objectclient/client.go | 3 +++ .../chunk/util/parallel_chunk_fetch.go | 4 ++++ 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index e01e8b0c6b256..9cab48feff534 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -375,11 +375,11 @@ func NewMetricTripperware( ) } - // queryRangeMiddleware = append( - // queryRangeMiddleware, - // queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), - // SplitByIntervalMiddleware(limits, codec, splitMetricByTime, splitByMetrics), - // ) + queryRangeMiddleware = append( + queryRangeMiddleware, + queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), + SplitByIntervalMiddleware(limits, codec, splitMetricByTime, splitByMetrics), + ) var c cache.Cache if cfg.CacheResults { @@ -420,11 +420,11 @@ func NewMetricTripperware( } if cfg.MaxRetries > 0 { - // queryRangeMiddleware = append( - // queryRangeMiddleware, - // queryrange.InstrumentMiddleware("retry", instrumentMetrics), - // queryrange.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics), - // ) + queryRangeMiddleware = append( + queryRangeMiddleware, + queryrange.InstrumentMiddleware("retry", instrumentMetrics), + queryrange.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics), + ) } return func(next http.RoundTripper) http.RoundTripper { diff --git a/pkg/storage/chunk/objectclient/client.go b/pkg/storage/chunk/objectclient/client.go index 0f4711f9a0692..23c74ce7e3447 100644 --- a/pkg/storage/chunk/objectclient/client.go +++ b/pkg/storage/chunk/objectclient/client.go @@ -102,6 +102,9 @@ func (o *Client) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.C } func (o *Client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) { + if ctx.Err() != nil { + return chunk.Chunk{}, ctx.Err() + } key := o.schema.ExternalKey(c) if o.keyEncoder != nil { key = o.keyEncoder(key) diff --git a/pkg/storage/chunk/util/parallel_chunk_fetch.go b/pkg/storage/chunk/util/parallel_chunk_fetch.go index e2def27e7254d..66a2ea91435d9 100644 --- a/pkg/storage/chunk/util/parallel_chunk_fetch.go +++ b/pkg/storage/chunk/util/parallel_chunk_fetch.go @@ -23,6 +23,10 @@ func GetParallelChunks(ctx context.Context, maxParallel int, chunks []chunk.Chun defer log.Finish() log.LogFields(otlog.Int("requested", len(chunks))) + if ctx.Err() != nil { + return nil, ctx.Err() + } + queuedChunks := make(chan chunk.Chunk) go func() { From 87d80d0f5e62cebb869fe801c114388cbf73dc8f Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Sat, 8 Jan 2022 17:19:45 +0100 Subject: [PATCH 14/19] Handle context cancel properly on `getChunk()` via select. Signed-off-by: Kaviraj --- pkg/storage/chunk/objectclient/client.go | 58 +++++++++++++++--------- 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/pkg/storage/chunk/objectclient/client.go b/pkg/storage/chunk/objectclient/client.go index 23c74ce7e3447..d5ee16103a898 100644 --- a/pkg/storage/chunk/objectclient/client.go +++ b/pkg/storage/chunk/objectclient/client.go @@ -102,33 +102,49 @@ func (o *Client) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.C } func (o *Client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) { - if ctx.Err() != nil { - return chunk.Chunk{}, ctx.Err() - } - key := o.schema.ExternalKey(c) - if o.keyEncoder != nil { - key = o.keyEncoder(key) - } + var ( + errChan = make(chan error) + chunkChan = make(chan chunk.Chunk) + ) - readCloser, size, err := o.store.GetObject(ctx, key) - if err != nil { - return chunk.Chunk{}, errors.WithStack(err) - } + fetchChunk := func() { + key := o.schema.ExternalKey(c) + if o.keyEncoder != nil { + key = o.keyEncoder(key) + } - defer readCloser.Close() + readCloser, size, err := o.store.GetObject(ctx, key) + if err != nil { + errChan <- errors.WithStack(err) + } + + defer readCloser.Close() + + // adds bytes.MinRead to avoid allocations when the size is known. + // This is because ReadFrom reads bytes.MinRead by bytes.MinRead. + buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead)) + _, err = buf.ReadFrom(readCloser) + if err != nil { + errChan <- errors.WithStack(err) + } - // adds bytes.MinRead to avoid allocations when the size is known. - // This is because ReadFrom reads bytes.MinRead by bytes.MinRead. - buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead)) - _, err = buf.ReadFrom(readCloser) - if err != nil { - return chunk.Chunk{}, errors.WithStack(err) + if err := c.Decode(decodeContext, buf.Bytes()); err != nil { + errChan <- errors.WithStack(err) + } + chunkChan <- c } - if err := c.Decode(decodeContext, buf.Bytes()); err != nil { - return chunk.Chunk{}, errors.WithStack(err) + go fetchChunk() + + select { + case err := <-errChan: + return chunk.Chunk{}, err + case c := <-chunkChan: + return c, nil + case <-ctx.Done(): + return chunk.Chunk{}, ctx.Err() } - return c, nil + } // GetChunks retrieves the specified chunks from the configured backend From faae173b78d26b81601c2f64f5617dc2350f4f94 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Sun, 9 Jan 2022 21:42:27 +0100 Subject: [PATCH 15/19] Use context in getChunk without starting new goroutine Signed-off-by: Kaviraj --- pkg/storage/chunk/objectclient/client.go | 58 ++++++++++-------------- 1 file changed, 23 insertions(+), 35 deletions(-) diff --git a/pkg/storage/chunk/objectclient/client.go b/pkg/storage/chunk/objectclient/client.go index d5ee16103a898..e96f5b48accec 100644 --- a/pkg/storage/chunk/objectclient/client.go +++ b/pkg/storage/chunk/objectclient/client.go @@ -102,49 +102,37 @@ func (o *Client) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.C } func (o *Client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) { - var ( - errChan = make(chan error) - chunkChan = make(chan chunk.Chunk) - ) - - fetchChunk := func() { - key := o.schema.ExternalKey(c) - if o.keyEncoder != nil { - key = o.keyEncoder(key) - } - readCloser, size, err := o.store.GetObject(ctx, key) - if err != nil { - errChan <- errors.WithStack(err) - } - - defer readCloser.Close() + select { + default: + case <-ctx.Done(): + return chunk.Chunk{}, ctx.Err() + } - // adds bytes.MinRead to avoid allocations when the size is known. - // This is because ReadFrom reads bytes.MinRead by bytes.MinRead. - buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead)) - _, err = buf.ReadFrom(readCloser) - if err != nil { - errChan <- errors.WithStack(err) - } + key := o.schema.ExternalKey(c) + if o.keyEncoder != nil { + key = o.keyEncoder(key) + } - if err := c.Decode(decodeContext, buf.Bytes()); err != nil { - errChan <- errors.WithStack(err) - } - chunkChan <- c + readCloser, size, err := o.store.GetObject(ctx, key) + if err != nil { + return chunk.Chunk{}, errors.WithStack(err) } - go fetchChunk() + defer readCloser.Close() - select { - case err := <-errChan: - return chunk.Chunk{}, err - case c := <-chunkChan: - return c, nil - case <-ctx.Done(): - return chunk.Chunk{}, ctx.Err() + // adds bytes.MinRead to avoid allocations when the size is known. + // This is because ReadFrom reads bytes.MinRead by bytes.MinRead. + buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead)) + _, err = buf.ReadFrom(readCloser) + if err != nil { + return chunk.Chunk{}, errors.WithStack(err) } + if err := c.Decode(decodeContext, buf.Bytes()); err != nil { + return chunk.Chunk{}, errors.WithStack(err) + } + return c, nil } // GetChunks retrieves the specified chunks from the configured backend From fd5befa69bc4398e83e174f039f470b5f15c8db6 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Sun, 9 Jan 2022 22:48:24 +0100 Subject: [PATCH 16/19] Just normal ctx.Err() check instead of using Done channel Signed-off-by: Kaviraj --- pkg/storage/chunk/objectclient/client.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/storage/chunk/objectclient/client.go b/pkg/storage/chunk/objectclient/client.go index e96f5b48accec..7aac07319c530 100644 --- a/pkg/storage/chunk/objectclient/client.go +++ b/pkg/storage/chunk/objectclient/client.go @@ -103,9 +103,7 @@ func (o *Client) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.C func (o *Client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) { - select { - default: - case <-ctx.Done(): + if ctx.Err() != nil { return chunk.Chunk{}, ctx.Err() } From 78db2fd20c63c418ab3f2d9640b9c3083fc3fbab Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Mon, 10 Jan 2022 16:04:53 +0100 Subject: [PATCH 17/19] Remove debug logs Signed-off-by: Kaviraj --- pkg/scheduler/scheduler.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 74e987340536d..86e5de1a26797 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -303,7 +303,6 @@ func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_Front } case schedulerpb.CANCEL: - level.Info(s.log).Log("msg", "inside frontend loop, msg type is cancel", msg.QueryID) s.cancelRequestAndRemoveFromPending(frontendAddress, msg.QueryID) resp = &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} @@ -419,7 +418,6 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr // This method doesn't do removal from the queue. func (s *Scheduler) cancelRequestAndRemoveFromPending(frontendAddr string, queryID uint64) { - level.Info(s.log).Log("msg", "cancelling request and remove from pending", "queryID", queryID) s.pendingRequestsMu.Lock() defer s.pendingRequestsMu.Unlock() @@ -503,10 +501,7 @@ func (s *Scheduler) NotifyQuerierShutdown(_ context.Context, req *schedulerpb.No func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuerier_QuerierLoopServer, req *schedulerRequest) error { // Make sure to cancel request at the end to cleanup resources. - level.Info(s.log).Log("msg", "forwarding request to the querier", "queryID", req.queryID) - defer func() { - s.cancelRequestAndRemoveFromPending(req.frontendAddress, req.queryID) - }() + defer s.cancelRequestAndRemoveFromPending(req.frontendAddress, req.queryID) // Handle the stream sending & receiving on a goroutine so we can // monitoring the contexts in a select and cancel things appropriately. From 7ab8056d469d82f9a7473850818757f0cb491231 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Mon, 10 Jan 2022 17:46:22 +0100 Subject: [PATCH 18/19] Update pkg/querier/http.go Co-authored-by: Cyril Tovena --- pkg/querier/http.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 8121b9ed7e895..1d173d6c28866 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -35,15 +35,6 @@ type QueryResponse struct { // RangeQueryHandler is a http.HandlerFunc for range queries. func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) { - log := spanlogger.FromContext(r.Context()) - level.Info(log).Log("msg", "range query started") - defer func() { - level.Info(log).Log("msg", "range query ended") - }() - go func() { - <-r.Context().Done() - level.Info(log).Log("msg", "range query done", "err", r.Context().Err()) - }() // Enforce the query timeout while querying backends ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout)) defer cancel() From 592c7b9436aaebee1effa80894c27aa2cbb305d0 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Mon, 10 Jan 2022 19:27:44 +0100 Subject: [PATCH 19/19] Remove unused imports --- pkg/querier/http.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 1d173d6c28866..483f65b8c4841 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -6,7 +6,6 @@ import ( "time" util_log "github.com/cortexproject/cortex/pkg/util/log" - "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/go-kit/log/level" "github.com/gorilla/websocket" "github.com/prometheus/prometheus/model/labels"