From 3054ef8fdb6e0d83e86bfb800538010a8b9be455 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 27 Jan 2025 11:24:20 +0100 Subject: [PATCH] chore(querier): Refactor the store and querier interface. --- .../deletion/delete_requests_store.go | 1 - pkg/loki/modules.go | 10 +- pkg/querier/deletion/delete.go | 44 +++ pkg/querier/http.go | 139 +-------- pkg/querier/http_test.go | 29 +- pkg/querier/ingester_querier_test.go | 39 ++- pkg/querier/limits/definitions.go | 2 +- pkg/querier/limits/validation.go | 63 +++++ pkg/querier/limits/validation_test.go | 53 ++++ pkg/querier/pattern/querier.go | 11 + pkg/querier/querier.go | 267 +++--------------- pkg/querier/querier_mock_test.go | 93 ++---- pkg/querier/querier_test.go | 204 +------------ pkg/querier/tail/http.go | 147 ++++++++++ pkg/querier/tail/http_test.go | 48 ++++ pkg/querier/{ => tail}/metrics.go | 8 +- pkg/querier/tail/querier.go | 171 +++++++++++ pkg/querier/tail/querier_test.go | 181 ++++++++++++ pkg/querier/{ => tail}/tail.go | 6 +- pkg/querier/tail/tail_mock_test.go | 129 +++++++++ pkg/querier/{ => tail}/tail_test.go | 79 +++--- pkg/querier/tail_mock_test.go | 10 - pkg/querier/testutil/iterator.go | 38 +++ pkg/querier/testutil/limits.go | 56 ++++ 24 files changed, 1103 insertions(+), 725 deletions(-) create mode 100644 pkg/querier/deletion/delete.go create mode 100644 pkg/querier/limits/validation.go create mode 100644 pkg/querier/limits/validation_test.go create mode 100644 pkg/querier/pattern/querier.go create mode 100644 pkg/querier/tail/http.go create mode 100644 pkg/querier/tail/http_test.go rename pkg/querier/{ => tail}/metrics.go (85%) create mode 100644 pkg/querier/tail/querier.go create mode 100644 pkg/querier/tail/querier_test.go rename pkg/querier/{ => tail}/tail.go (99%) create mode 100644 pkg/querier/tail/tail_mock_test.go rename pkg/querier/{ => tail}/tail_test.go (86%) delete mode 100644 pkg/querier/tail_mock_test.go create mode 100644 pkg/querier/testutil/iterator.go create mode 100644 pkg/querier/testutil/limits.go diff --git a/pkg/compactor/deletion/delete_requests_store.go b/pkg/compactor/deletion/delete_requests_store.go index e351a694b0f19..51357cce6ae1d 100644 --- a/pkg/compactor/deletion/delete_requests_store.go +++ b/pkg/compactor/deletion/delete_requests_store.go @@ -233,7 +233,6 @@ func (ds *deleteRequestsStore) GetCacheGenerationNumber(ctx context.Context, use } return false }) - if err != nil { return "", err } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 58d3e77a74c4b..b39ed82ea7e47 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -67,6 +67,7 @@ import ( "github.com/grafana/loki/v3/pkg/querier" "github.com/grafana/loki/v3/pkg/querier/queryrange" "github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase" + "github.com/grafana/loki/v3/pkg/querier/tail" "github.com/grafana/loki/v3/pkg/ruler" base_ruler "github.com/grafana/loki/v3/pkg/ruler/base" "github.com/grafana/loki/v3/pkg/runtime" @@ -405,7 +406,7 @@ func (t *Loki) initQuerier() (services.Service, error) { return nil, err } - t.Querier, err = querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, prometheus.DefaultRegisterer, logger) + t.Querier, err = querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, logger) if err != nil { return nil, err } @@ -555,8 +556,9 @@ func (t *Loki) initQuerier() (services.Service, error) { // is standalone ALL routes are registered externally, and when it's in the same process as a frontend, // we disable the proxying of the tail routes in initQueryFrontend() and we still want these routes regiestered // on the external router. - t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.TailHandler))) - t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.TailHandler))) + tailQuerier := tail.NewQuerier(t.ingesterQuerier, t.Querier, deleteStore, t.Overrides, t.Cfg.Querier.TailMaxDuration, tail.NewTailMetrics(prometheus.DefaultRegisterer), log.With(util_log.Logger, "component", "tail-querier")) + t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(tailQuerier.TailHandler))) + t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(tailQuerier.TailHandler))) internalMiddlewares := []queryrangebase.Middleware{ serverutil.RecoveryMiddleware, @@ -1936,7 +1938,7 @@ func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLi } func (t *Loki) createRulerQueryEngine(logger log.Logger, deleteStore deletion.DeleteRequestsClient) (eng *logql.Engine, err error) { - q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, nil, logger) + q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, logger) if err != nil { return nil, fmt.Errorf("could not create querier: %w", err) } diff --git a/pkg/querier/deletion/delete.go b/pkg/querier/deletion/delete.go new file mode 100644 index 0000000000000..e380addb484a0 --- /dev/null +++ b/pkg/querier/deletion/delete.go @@ -0,0 +1,44 @@ +package deletion + +import ( + "context" + "time" + + "github.com/grafana/dskit/tenant" + + "github.com/grafana/loki/v3/pkg/compactor/deletion" + "github.com/grafana/loki/v3/pkg/logproto" +) + +type DeleteGetter interface { + GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]deletion.DeleteRequest, error) +} + +// DeletesForUserQuery returns the deletes for a user (taken from request context) within a given time range. +func DeletesForUserQuery(ctx context.Context, startT, endT time.Time, g DeleteGetter) ([]*logproto.Delete, error) { + userID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + + d, err := g.GetAllDeleteRequestsForUser(ctx, userID) + if err != nil { + return nil, err + } + + start := startT.UnixNano() + end := endT.UnixNano() + + var deletes []*logproto.Delete + for _, del := range d { + if del.StartTime.UnixNano() <= end && del.EndTime.UnixNano() >= start { + deletes = append(deletes, &logproto.Delete{ + Selector: del.Query, + Start: del.StartTime.UnixNano(), + End: del.EndTime.UnixNano(), + }) + } + } + + return deletes, nil +} diff --git a/pkg/querier/http.go b/pkg/querier/http.go index fba09327184f0..064e28136475c 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -8,7 +8,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/gorilla/websocket" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/middleware" "github.com/opentracing/opentracing-go" @@ -19,27 +18,21 @@ import ( "github.com/grafana/dskit/tenant" "github.com/grafana/loki/v3/pkg/loghttp" - loghttp_legacy "github.com/grafana/loki/v3/pkg/loghttp/legacy" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" + querier_limits "github.com/grafana/loki/v3/pkg/querier/limits" "github.com/grafana/loki/v3/pkg/querier/queryrange" index_stats "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" "github.com/grafana/loki/v3/pkg/util/httpreq" util_log "github.com/grafana/loki/v3/pkg/util/log" - "github.com/grafana/loki/v3/pkg/util/marshal" - marshal_legacy "github.com/grafana/loki/v3/pkg/util/marshal/legacy" serverutil "github.com/grafana/loki/v3/pkg/util/server" "github.com/grafana/loki/v3/pkg/util/spanlogger" util_validation "github.com/grafana/loki/v3/pkg/util/validation" ) -const ( - wsPingPeriod = 1 * time.Second -) - type QueryResponse struct { ResultType parser.ValueType `json:"resultType"` Result parser.Value `json:"result"` @@ -53,12 +46,12 @@ type Engine interface { type QuerierAPI struct { querier Querier cfg Config - limits Limits + limits querier_limits.Limits engine Engine } // NewQuerierAPI returns an instance of the QuerierAPI. -func NewQuerierAPI(cfg Config, querier Querier, limits Limits, logger log.Logger) *QuerierAPI { +func NewQuerierAPI(cfg Config, querier Querier, limits querier_limits.Limits, logger log.Logger) *QuerierAPI { engine := logql.NewEngine(cfg.Engine, querier, limits, logger) return &QuerierAPI{ cfg: cfg, @@ -128,129 +121,6 @@ func (q *QuerierAPI) LabelHandler(ctx context.Context, req *logproto.LabelReques return resp, err } -// TailHandler is a http.HandlerFunc for handling tail queries. -func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) { - upgrader := websocket.Upgrader{ - CheckOrigin: func(_ *http.Request) bool { return true }, - } - logger := util_log.WithContext(r.Context(), util_log.Logger) - - req, err := loghttp.ParseTailQuery(r) - if err != nil { - serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), w) - return - } - - tenantID, err := tenant.TenantID(r.Context()) - if err != nil { - level.Warn(logger).Log("msg", "error getting tenant id", "err", err) - serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), w) - return - } - - encodingFlags := httpreq.ExtractEncodingFlags(r) - version := loghttp.GetVersion(r.RequestURI) - - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - level.Error(logger).Log("msg", "Error in upgrading websocket", "err", err) - return - } - - level.Info(logger).Log("msg", "starting to tail logs", "tenant", tenantID, "selectors", req.Query) - - defer func() { - level.Info(logger).Log("msg", "ended tailing logs", "tenant", tenantID, "selectors", req.Query) - }() - - defer func() { - if err := conn.Close(); err != nil { - level.Error(logger).Log("msg", "Error closing websocket", "err", err) - } - }() - - tailer, err := q.querier.Tail(r.Context(), req, encodingFlags.Has(httpreq.FlagCategorizeLabels)) - if err != nil { - if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { - level.Error(logger).Log("msg", "Error connecting to ingesters for tailing", "err", err) - } - return - } - defer func() { - if err := tailer.close(); err != nil { - level.Error(logger).Log("msg", "Error closing Tailer", "err", err) - } - }() - - ticker := time.NewTicker(wsPingPeriod) - defer ticker.Stop() - - connWriter := marshal.NewWebsocketJSONWriter(conn) - - var response *loghttp_legacy.TailResponse - responseChan := tailer.getResponseChan() - closeErrChan := tailer.getCloseErrorChan() - - doneChan := make(chan struct{}) - go func() { - for { - _, _, err := conn.ReadMessage() - if err != nil { - if closeErr, ok := err.(*websocket.CloseError); ok { - if closeErr.Code == websocket.CloseNormalClosure { - break - } - level.Error(logger).Log("msg", "Error from client", "err", err) - break - } else if tailer.stopped.Load() { - return - } - - level.Error(logger).Log("msg", "Unexpected error from client", "err", err) - break - } - } - doneChan <- struct{}{} - }() - - for { - select { - case response = <-responseChan: - var err error - if version == loghttp.VersionV1 { - err = marshal.WriteTailResponseJSON(*response, connWriter, encodingFlags) - } else { - err = marshal_legacy.WriteTailResponseJSON(*response, conn) - } - if err != nil { - level.Error(logger).Log("msg", "Error writing to websocket", "err", err) - if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { - level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err) - } - return - } - - case err := <-closeErrChan: - level.Error(logger).Log("msg", "Error from iterator", "err", err) - if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { - level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err) - } - return - case <-ticker.C: - // This is to periodically check whether connection is active, useful to clean up dead connections when there are no entries to send - if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { - level.Error(logger).Log("msg", "Error writing ping message to websocket", "err", err) - if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { - level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err) - } - return - } - case <-doneChan: - return - } - } -} - // SeriesHandler returns the list of time series that match a certain label set. // See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, stats.Result, error) { @@ -420,7 +290,6 @@ func (q *QuerierAPI) validateMaxEntriesLimits(ctx context.Context, expr syntax.E // DetectedLabelsHandler returns a response for detected labels func (q *QuerierAPI) DetectedLabelsHandler(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.DetectedLabelsResponse, error) { resp, err := q.querier.DetectedLabels(ctx, req) - if err != nil { return nil, err } @@ -430,7 +299,7 @@ func (q *QuerierAPI) DetectedLabelsHandler(ctx context.Context, req *logproto.De // WrapQuerySpanAndTimeout applies a context deadline and a span logger to a query call. // // The timeout is based on the per-tenant query timeout configuration. -func WrapQuerySpanAndTimeout(call string, limits Limits) middleware.Interface { +func WrapQuerySpanAndTimeout(call string, limits querier_limits.Limits) middleware.Interface { return middleware.Func(func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { sp, ctx := opentracing.StartSpanFromContext(req.Context(), call) diff --git a/pkg/querier/http_test.go b/pkg/querier/http_test.go index d568b7b9934b4..709e5a71c2ae5 100644 --- a/pkg/querier/http_test.go +++ b/pkg/querier/http_test.go @@ -51,33 +51,6 @@ func TestInstantQueryHandler(t *testing.T) { }) } -func TestTailHandler(t *testing.T) { - defaultLimits := defaultLimitsTestConfig() - limits, err := validation.NewOverrides(defaultLimits, nil) - require.NoError(t, err) - - api := NewQuerierAPI(mockQuerierConfig(), nil, limits, log.NewNopLogger()) - - req, err := http.NewRequest("GET", `/`, nil) - require.NoError(t, err) - q := req.URL.Query() - q.Add("query", `{app="loki"}`) - req.URL.RawQuery = q.Encode() - err = req.ParseForm() - require.NoError(t, err) - - ctx := user.InjectOrgID(req.Context(), "1|2") - req = req.WithContext(ctx) - require.NoError(t, err) - - rr := httptest.NewRecorder() - handler := http.HandlerFunc(api.TailHandler) - - handler.ServeHTTP(rr, req) - require.Equal(t, http.StatusBadRequest, rr.Code) - require.Equal(t, "multiple org IDs present", rr.Body.String()) -} - type slowConnectionSimulator struct { sleepFor time.Duration deadline time.Duration @@ -88,7 +61,6 @@ func (s *slowConnectionSimulator) ServeHTTP(_ http.ResponseWriter, r *http.Reque ctx := r.Context() if err := ctx.Err(); err != nil { panic(fmt.Sprintf("context already errored: %s", err)) - } time.Sleep(s.sleepFor) @@ -221,6 +193,7 @@ func TestSeriesHandler(t *testing.T) { require.JSONEq(t, expected, res.Body.String()) }) } + func TestVolumeHandler(t *testing.T) { ret := &logproto.VolumeResponse{ Volumes: []logproto.Volume{ diff --git a/pkg/querier/ingester_querier_test.go b/pkg/querier/ingester_querier_test.go index 268191bd17a72..e0a427267ad39 100644 --- a/pkg/querier/ingester_querier_test.go +++ b/pkg/querier/ingester_querier_test.go @@ -11,8 +11,8 @@ import ( "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/user" "go.uber.org/atomic" - "google.golang.org/grpc/codes" + grpc_metadata "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "github.com/grafana/dskit/ring" @@ -163,7 +163,7 @@ func TestIngesterQuerier_earlyExitOnQuorum(t *testing.T) { _, err := ingesterQuerier.Tail(context.Background(), new(logproto.TailRequest)) return err }, - retVal: newTailClientMock(), + retVal: &mockQuerierTailClient{}, }, } @@ -542,7 +542,7 @@ func TestQuerier_tailDisconnectedIngesters(t *testing.T) { // For this test's purpose, whenever a new ingester client needs to // be created, the factory will always return the same mock instance ingesterClient := newQuerierClientMock() - ingesterClient.On("Tail", mock.Anything, &req, mock.Anything).Return(newTailClientMock(), nil) + ingesterClient.On("Tail", mock.Anything, &req, mock.Anything).Return(&mockQuerierTailClient{}, nil) ingesterQuerier, err := newTestIngesterQuerier(newReadRingMock(testData.ringIngesters, 0), ingesterClient) require.NoError(t, err) @@ -679,3 +679,36 @@ func newTestPartitionIngesterQuerier(clientFactory client.PoolFactory, instanceR log.NewNopLogger(), ) } + +var _ logproto.Querier_TailClient = &mockQuerierTailClient{} + +// mockQuerierTailClient implements logproto.Querier_TailClient interface +type mockQuerierTailClient struct{} + +func (c *mockQuerierTailClient) Recv() (*logproto.TailResponse, error) { + return nil, nil +} + +func (c *mockQuerierTailClient) Header() (grpc_metadata.MD, error) { + return nil, nil +} + +func (c *mockQuerierTailClient) Trailer() grpc_metadata.MD { + return nil +} + +func (c *mockQuerierTailClient) CloseSend() error { + return nil +} + +func (c *mockQuerierTailClient) Context() context.Context { + return context.Background() +} + +func (c *mockQuerierTailClient) SendMsg(_ interface{}) error { + return nil +} + +func (c *mockQuerierTailClient) RecvMsg(_ interface{}) error { + return nil +} diff --git a/pkg/querier/limits/definitions.go b/pkg/querier/limits/definitions.go index dec518a7fc7da..c85538969dbaf 100644 --- a/pkg/querier/limits/definitions.go +++ b/pkg/querier/limits/definitions.go @@ -1,4 +1,4 @@ -package limists +package limits import ( "context" diff --git a/pkg/querier/limits/validation.go b/pkg/querier/limits/validation.go new file mode 100644 index 0000000000000..eafc4ffd58858 --- /dev/null +++ b/pkg/querier/limits/validation.go @@ -0,0 +1,63 @@ +package limits + +import ( + "context" + "net/http" + "time" + + "github.com/go-kit/log/level" + "github.com/grafana/dskit/httpgrpc" + "github.com/grafana/dskit/tenant" + "github.com/prometheus/common/model" + + "github.com/grafana/loki/v3/pkg/logql" + "github.com/grafana/loki/v3/pkg/util/spanlogger" + util_validation "github.com/grafana/loki/v3/pkg/util/validation" +) + +var nowFunc = func() time.Time { return time.Now() } + +func ValidateQueryRequest(ctx context.Context, req logql.QueryParams, limits Limits) (time.Time, time.Time, error) { + userID, err := tenant.TenantID(ctx) + if err != nil { + return time.Time{}, time.Time{}, err + } + + selector, err := req.LogSelector() + if err != nil { + return time.Time{}, time.Time{}, err + } + matchers := selector.Matchers() + + maxStreamMatchersPerQuery := limits.MaxStreamsMatchersPerQuery(ctx, userID) + if len(matchers) > maxStreamMatchersPerQuery { + return time.Time{}, time.Time{}, httpgrpc.Errorf(http.StatusBadRequest, + "max streams matchers per query exceeded, matchers-count > limit (%d > %d)", len(matchers), maxStreamMatchersPerQuery) + } + + return ValidateQueryTimeRangeLimits(ctx, userID, limits, req.GetStart(), req.GetEnd()) +} + +func ValidateQueryTimeRangeLimits(ctx context.Context, userID string, limits TimeRangeLimits, from, through time.Time) (time.Time, time.Time, error) { + now := nowFunc() + // Clamp the time range based on the max query lookback. + maxQueryLookback := limits.MaxQueryLookback(ctx, userID) + if maxQueryLookback > 0 && from.Before(now.Add(-maxQueryLookback)) { + origStartTime := from + from = now.Add(-maxQueryLookback) + + level.Debug(spanlogger.FromContext(ctx)).Log( + "msg", "the start time of the query has been manipulated because of the 'max query lookback' setting", + "original", origStartTime, + "updated", from) + + } + maxQueryLength := limits.MaxQueryLength(ctx, userID) + if maxQueryLength > 0 && (through).Sub(from) > maxQueryLength { + return time.Time{}, time.Time{}, httpgrpc.Errorf(http.StatusBadRequest, util_validation.ErrQueryTooLong, (through).Sub(from), model.Duration(maxQueryLength)) + } + if through.Before(from) { + return time.Time{}, time.Time{}, httpgrpc.Errorf(http.StatusBadRequest, util_validation.ErrQueryTooOld, model.Duration(maxQueryLookback)) + } + return from, through, nil +} diff --git a/pkg/querier/limits/validation_test.go b/pkg/querier/limits/validation_test.go new file mode 100644 index 0000000000000..6f8668d85645a --- /dev/null +++ b/pkg/querier/limits/validation_test.go @@ -0,0 +1,53 @@ +package limits + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type fakeTimeLimits struct { + maxQueryLookback time.Duration + maxQueryLength time.Duration +} + +func (f fakeTimeLimits) MaxQueryLookback(_ context.Context, _ string) time.Duration { + return f.maxQueryLookback +} + +func (f fakeTimeLimits) MaxQueryLength(_ context.Context, _ string) time.Duration { + return f.maxQueryLength +} + +func Test_validateQueryTimeRangeLimits(t *testing.T) { + now := time.Now() + nowFunc = func() time.Time { return now } + tests := []struct { + name string + limits TimeRangeLimits + from time.Time + through time.Time + wantFrom time.Time + wantThrough time.Time + wantErr bool + }{ + {"no change", fakeTimeLimits{1000 * time.Hour, 1000 * time.Hour}, now, now.Add(24 * time.Hour), now, now.Add(24 * time.Hour), false}, + {"clamped to 24h", fakeTimeLimits{24 * time.Hour, 1000 * time.Hour}, now.Add(-48 * time.Hour), now, now.Add(-24 * time.Hour), now, false}, + {"end before start", fakeTimeLimits{}, now, now.Add(-48 * time.Hour), time.Time{}, time.Time{}, true}, + {"query too long", fakeTimeLimits{maxQueryLength: 24 * time.Hour}, now.Add(-48 * time.Hour), now, time.Time{}, time.Time{}, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + from, through, err := ValidateQueryTimeRangeLimits(context.Background(), "foo", tt.limits, tt.from, tt.through) + if tt.wantErr { + require.NotNil(t, err) + } else { + require.Nil(t, err) + } + require.Equal(t, tt.wantFrom, from, "wanted (%s) got (%s)", tt.wantFrom, from) + require.Equal(t, tt.wantThrough, through) + }) + } +} diff --git a/pkg/querier/pattern/querier.go b/pkg/querier/pattern/querier.go new file mode 100644 index 0000000000000..34bd9f7e86e19 --- /dev/null +++ b/pkg/querier/pattern/querier.go @@ -0,0 +1,11 @@ +package pattern + +import ( + "context" + + "github.com/grafana/loki/v3/pkg/logproto" +) + +type PatterQuerier interface { + Patterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 3f03d1e037aad..abc04d3782a59 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -18,13 +18,11 @@ import ( "github.com/grafana/dskit/tenant" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "golang.org/x/sync/errgroup" "google.golang.org/grpc/health/grpc_health_v1" - "github.com/grafana/loki/v3/pkg/compactor/deletion" "github.com/grafana/loki/v3/pkg/indexgateway" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/loghttp" @@ -33,29 +31,20 @@ import ( logql_log "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/logqlmodel" + "github.com/grafana/loki/v3/pkg/querier/deletion" querier_limits "github.com/grafana/loki/v3/pkg/querier/limits" + "github.com/grafana/loki/v3/pkg/querier/pattern" "github.com/grafana/loki/v3/pkg/querier/plan" - "github.com/grafana/loki/v3/pkg/storage" - "github.com/grafana/loki/v3/pkg/storage/stores/index" + "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume" "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" listutil "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util/httpreq" "github.com/grafana/loki/v3/pkg/util/spanlogger" - util_validation "github.com/grafana/loki/v3/pkg/util/validation" "github.com/grafana/loki/pkg/push" ) -const ( - // How long the Tailer should wait - once there are no entries to read from ingesters - - // before checking if a new entry is available (to avoid spinning the CPU in a continuous - // check loop) - tailerWaitEntryThrottle = time.Second / 2 -) - -var nowFunc = func() time.Time { return time.Now() } - type interval struct { start, end time.Time } @@ -102,52 +91,56 @@ type Querier interface { logql.Querier Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) - Tail(ctx context.Context, req *logproto.TailRequest, categorizedLabels bool) (*Tailer, error) IndexStats(ctx context.Context, req *loghttp.RangeQuery) (*stats.Stats, error) IndexShards(ctx context.Context, req *loghttp.RangeQuery, targetBytesPerShard uint64) (*logproto.ShardsResponse, error) Volume(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error) DetectedFields(ctx context.Context, req *logproto.DetectedFieldsRequest) (*logproto.DetectedFieldsResponse, error) Patterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) DetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.DetectedLabelsResponse, error) - WithPatternQuerier(patternQuerier PatterQuerier) + WithPatternQuerier(patternQuerier pattern.PatterQuerier) } -type Limits querier_limits.Limits - // Store is the store interface we need on the querier. type Store interface { - storage.SelectStore - index.BaseReader - index.StatsReader + SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) + SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) + SelectSeries(ctx context.Context, req logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) + LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) + LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) + Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) + Volume(ctx context.Context, userID string, from, through model.Time, limit int32, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) (*logproto.VolumeResponse, error) + GetShards( + ctx context.Context, + userID string, + from, through model.Time, + targetBytesPerShard uint64, + predicate chunk.Predicate, + ) (*logproto.ShardsResponse, error) } // SingleTenantQuerier handles single tenant queries. type SingleTenantQuerier struct { cfg Config store Store - limits Limits + limits querier_limits.Limits ingesterQuerier *IngesterQuerier - patternQuerier PatterQuerier - deleteGetter deleteGetter - metrics *Metrics + patternQuerier pattern.PatterQuerier + deleteGetter deletion.DeleteGetter logger log.Logger } -type deleteGetter interface { - GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]deletion.DeleteRequest, error) -} - // New makes a new Querier. -func New(cfg Config, store Store, ingesterQuerier *IngesterQuerier, limits Limits, d deleteGetter, r prometheus.Registerer, logger log.Logger) (*SingleTenantQuerier, error) { - return &SingleTenantQuerier{ +func New(cfg Config, store Store, ingesterQuerier *IngesterQuerier, limits querier_limits.Limits, d deletion.DeleteGetter, logger log.Logger) (*SingleTenantQuerier, error) { + q := &SingleTenantQuerier{ cfg: cfg, store: store, ingesterQuerier: ingesterQuerier, limits: limits, deleteGetter: d, - metrics: NewMetrics(r), logger: logger, - }, nil + } + + return q, nil } // Select Implements logql.Querier which select logs via matchers and regex filters. @@ -156,12 +149,12 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec // This is used to track which ingesters were used in the query and reuse the same ingesters for consecutive queries ctx = NewPartitionContext(ctx) var err error - params.Start, params.End, err = q.validateQueryRequest(ctx, params) + params.Start, params.End, err = querier_limits.ValidateQueryRequest(ctx, params, q.limits) if err != nil { return nil, err } - params.QueryRequest.Deletes, err = q.deletesForUser(ctx, params.Start, params.End) + params.QueryRequest.Deletes, err = deletion.DeletesForUserQuery(ctx, params.Start, params.End, q.deleteGetter) if err != nil { level.Error(spanlogger.FromContext(ctx)).Log("msg", "failed loading deletes for user", "err", err) } @@ -218,12 +211,12 @@ func (q *SingleTenantQuerier) SelectSamples(ctx context.Context, params logql.Se // This is used to track which ingesters were used in the query and reuse the same ingesters for consecutive queries ctx = NewPartitionContext(ctx) var err error - params.Start, params.End, err = q.validateQueryRequest(ctx, params) + params.Start, params.End, err = querier_limits.ValidateQueryRequest(ctx, params, q.limits) if err != nil { return nil, err } - params.SampleQueryRequest.Deletes, err = q.deletesForUser(ctx, params.Start, params.End) + params.SampleQueryRequest.Deletes, err = deletion.DeletesForUserQuery(ctx, params.Start, params.End, q.deleteGetter) if err != nil { level.Error(spanlogger.FromContext(ctx)).Log("msg", "failed loading deletes for user", "err", err) } @@ -263,34 +256,6 @@ func (q *SingleTenantQuerier) SelectSamples(ctx context.Context, params logql.Se return iter.NewMergeSampleIterator(ctx, iters), nil } -func (q *SingleTenantQuerier) deletesForUser(ctx context.Context, startT, endT time.Time) ([]*logproto.Delete, error) { - userID, err := tenant.TenantID(ctx) - if err != nil { - return nil, err - } - - d, err := q.deleteGetter.GetAllDeleteRequestsForUser(ctx, userID) - if err != nil { - return nil, err - } - - start := startT.UnixNano() - end := endT.UnixNano() - - var deletes []*logproto.Delete - for _, del := range d { - if del.StartTime.UnixNano() <= end && del.EndTime.UnixNano() >= start { - deletes = append(deletes, &logproto.Delete{ - Selector: del.Query, - Start: del.StartTime.UnixNano(), - End: del.EndTime.UnixNano(), - }) - } - } - - return deletes, nil -} - func (q *SingleTenantQuerier) isWithinIngesterMaxLookbackPeriod(maxLookback time.Duration, queryEnd time.Time) bool { // if no lookback limits are configured, always consider this within the range of the lookback period if maxLookback <= 0 { @@ -399,7 +364,7 @@ func (q *SingleTenantQuerier) Label(ctx context.Context, req *logproto.LabelRequ return nil, err } - if *req.Start, *req.End, err = validateQueryTimeRangeLimits(ctx, userID, q.limits, *req.Start, *req.End); err != nil { + if *req.Start, *req.End, err = querier_limits.ValidateQueryTimeRangeLimits(ctx, userID, q.limits, *req.Start, *req.End); err != nil { return nil, err } @@ -466,86 +431,6 @@ func (*SingleTenantQuerier) Check(_ context.Context, _ *grpc_health_v1.HealthChe return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil } -// Tail keeps getting matching logs from all ingesters for given query -func (q *SingleTenantQuerier) Tail(ctx context.Context, req *logproto.TailRequest, categorizedLabels bool) (*Tailer, error) { - err := q.checkTailRequestLimit(ctx) - if err != nil { - return nil, err - } - - if req.Plan == nil { - parsed, err := syntax.ParseExpr(req.Query) - if err != nil { - return nil, err - } - req.Plan = &plan.QueryPlan{ - AST: parsed, - } - } - - deletes, err := q.deletesForUser(ctx, req.Start, time.Now()) - if err != nil { - level.Error(spanlogger.FromContext(ctx)).Log("msg", "failed loading deletes for user", "err", err) - } - - histReq := logql.SelectLogParams{ - QueryRequest: &logproto.QueryRequest{ - Selector: req.Query, - Start: req.Start, - End: time.Now(), - Limit: req.Limit, - Direction: logproto.BACKWARD, - Deletes: deletes, - Plan: req.Plan, - }, - } - - histReq.Start, histReq.End, err = q.validateQueryRequest(ctx, histReq) - if err != nil { - return nil, err - } - - // Enforce the query timeout except when tailing, otherwise the tailing - // will be terminated once the query timeout is reached - tailCtx := ctx - tenantID, err := tenant.TenantID(tailCtx) - if err != nil { - return nil, errors.Wrap(err, "failed to load tenant") - } - queryTimeout := q.limits.QueryTimeout(tailCtx, tenantID) - queryCtx, cancelQuery := context.WithDeadline(ctx, time.Now().Add(queryTimeout)) - defer cancelQuery() - - tailClients, err := q.ingesterQuerier.Tail(tailCtx, req) - if err != nil { - return nil, err - } - - histIterators, err := q.SelectLogs(queryCtx, histReq) - if err != nil { - return nil, err - } - - reversedIterator, err := iter.NewReversedIter(histIterators, req.Limit, true) - if err != nil { - return nil, err - } - - return newTailer( - time.Duration(req.DelayFor)*time.Second, - tailClients, - reversedIterator, - func(connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) { - return q.ingesterQuerier.TailDisconnectedIngesters(tailCtx, req, connectedIngestersAddr) - }, - q.cfg.TailMaxDuration, - tailerWaitEntryThrottle, - categorizedLabels, - q.metrics, - q.logger, - ), nil -} - // Series fetches any matching series for a list of matcher sets func (q *SingleTenantQuerier) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) { userID, err := tenant.TenantID(ctx) @@ -553,7 +438,7 @@ func (q *SingleTenantQuerier) Series(ctx context.Context, req *logproto.SeriesRe return nil, err } - if req.Start, req.End, err = validateQueryTimeRangeLimits(ctx, userID, q.limits, req.Start, req.End); err != nil { + if req.Start, req.End, err = querier_limits.ValidateQueryTimeRangeLimits(ctx, userID, q.limits, req.Start, req.End); err != nil { return nil, err } @@ -695,89 +580,13 @@ func (q *SingleTenantQuerier) seriesForMatcher(ctx context.Context, from, throug return ids, nil } -func (q *SingleTenantQuerier) validateQueryRequest(ctx context.Context, req logql.QueryParams) (time.Time, time.Time, error) { - userID, err := tenant.TenantID(ctx) - if err != nil { - return time.Time{}, time.Time{}, err - } - - selector, err := req.LogSelector() - if err != nil { - return time.Time{}, time.Time{}, err - } - matchers := selector.Matchers() - - maxStreamMatchersPerQuery := q.limits.MaxStreamsMatchersPerQuery(ctx, userID) - if len(matchers) > maxStreamMatchersPerQuery { - return time.Time{}, time.Time{}, httpgrpc.Errorf(http.StatusBadRequest, - "max streams matchers per query exceeded, matchers-count > limit (%d > %d)", len(matchers), maxStreamMatchersPerQuery) - } - - return validateQueryTimeRangeLimits(ctx, userID, q.limits, req.GetStart(), req.GetEnd()) -} - -type TimeRangeLimits querier_limits.TimeRangeLimits - -func validateQueryTimeRangeLimits(ctx context.Context, userID string, limits TimeRangeLimits, from, through time.Time) (time.Time, time.Time, error) { - now := nowFunc() - // Clamp the time range based on the max query lookback. - maxQueryLookback := limits.MaxQueryLookback(ctx, userID) - if maxQueryLookback > 0 && from.Before(now.Add(-maxQueryLookback)) { - origStartTime := from - from = now.Add(-maxQueryLookback) - - level.Debug(spanlogger.FromContext(ctx)).Log( - "msg", "the start time of the query has been manipulated because of the 'max query lookback' setting", - "original", origStartTime, - "updated", from) - - } - maxQueryLength := limits.MaxQueryLength(ctx, userID) - if maxQueryLength > 0 && (through).Sub(from) > maxQueryLength { - return time.Time{}, time.Time{}, httpgrpc.Errorf(http.StatusBadRequest, util_validation.ErrQueryTooLong, (through).Sub(from), model.Duration(maxQueryLength)) - } - if through.Before(from) { - return time.Time{}, time.Time{}, httpgrpc.Errorf(http.StatusBadRequest, util_validation.ErrQueryTooOld, model.Duration(maxQueryLookback)) - } - return from, through, nil -} - -func (q *SingleTenantQuerier) checkTailRequestLimit(ctx context.Context) error { - userID, err := tenant.TenantID(ctx) - if err != nil { - return err - } - - responses, err := q.ingesterQuerier.TailersCount(ctx) - // We are only checking active ingesters, and any error returned stops checking other ingesters - // so return that error here as well. - if err != nil { - return err - } - - var maxCnt uint32 - maxCnt = 0 - for _, resp := range responses { - if resp > maxCnt { - maxCnt = resp - } - } - l := uint32(q.limits.MaxConcurrentTailRequests(ctx, userID)) - if maxCnt >= l { - return httpgrpc.Errorf(http.StatusBadRequest, - "max concurrent tail requests limit exceeded, count > limit (%d > %d)", maxCnt+1, l) - } - - return nil -} - func (q *SingleTenantQuerier) IndexStats(ctx context.Context, req *loghttp.RangeQuery) (*stats.Stats, error) { userID, err := tenant.TenantID(ctx) if err != nil { return nil, err } - start, end, err := validateQueryTimeRangeLimits(ctx, userID, q.limits, req.Start, req.End) + start, end, err := querier_limits.ValidateQueryTimeRangeLimits(ctx, userID, q.limits, req.Start, req.End) if err != nil { return nil, err } @@ -811,7 +620,7 @@ func (q *SingleTenantQuerier) IndexShards( return nil, err } - start, end, err := validateQueryTimeRangeLimits(ctx, userID, q.limits, req.Start, req.End) + start, end, err := querier_limits.ValidateQueryTimeRangeLimits(ctx, userID, q.limits, req.Start, req.End) if err != nil { return nil, err } @@ -939,7 +748,7 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. defer cancel() g, ctx := errgroup.WithContext(ctx) - if req.Start, req.End, err = validateQueryTimeRangeLimits(ctx, userID, q.limits, req.Start, req.End); err != nil { + if req.Start, req.End, err = querier_limits.ValidateQueryTimeRangeLimits(ctx, userID, q.limits, req.Start, req.End); err != nil { return nil, err } ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(req.Start, req.End) @@ -1048,11 +857,7 @@ func countLabelsAndCardinality(storeLabelsMap map[string][]string, ingesterLabel return detectedLabels } -type PatterQuerier interface { - Patterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) -} - -func (q *SingleTenantQuerier) WithPatternQuerier(pq PatterQuerier) { +func (q *SingleTenantQuerier) WithPatternQuerier(pq pattern.PatterQuerier) { q.patternQuerier = pq } diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 0fd9b421de000..350ce6fba5ee5 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -7,13 +7,8 @@ import ( "math" "time" - "github.com/grafana/loki/v3/pkg/logql/log" - "github.com/grafana/loki/v3/pkg/logql/syntax" - "github.com/grafana/loki/pkg/push" - "github.com/grafana/loki/v3/pkg/loghttp" - "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/ring" ring_client "github.com/grafana/dskit/ring/client" @@ -24,14 +19,18 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" grpc_metadata "google.golang.org/grpc/metadata" - logql_log "github.com/grafana/loki/v3/pkg/logql/log" - + "github.com/grafana/loki/v3/pkg/compactor/deletion" "github.com/grafana/loki/v3/pkg/distributor/clientpool" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/loghttp" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql" + "github.com/grafana/loki/v3/pkg/logql/log" + logql_log "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/logqlmodel" + "github.com/grafana/loki/v3/pkg/querier/pattern" "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/chunk/fetcher" "github.com/grafana/loki/v3/pkg/storage/config" @@ -157,7 +156,7 @@ func (f mockIngesterClientFactory) newIngesterClientMockFactory(c *querierClient // newIngesterClientMockFactory creates a factory function always returning // the input querierClientMock -func newIngesterClientMockFactory(c *querierClientMock) ring_client.PoolFactory { +func newIngesterClientMockFactory(c ring_client.PoolClient) ring_client.PoolFactory { return ring_client.PoolAddrFunc(func(_ string) (ring_client.PoolClient, error) { return c, nil }) @@ -264,62 +263,6 @@ func (c *querySampleClientMock) Context() context.Context { return context.Background() } -// tailClientMock is mockable version of Querier_TailClient -type tailClientMock struct { - util.ExtendedMock - logproto.Querier_TailClient - recvTrigger chan time.Time -} - -func newTailClientMock() *tailClientMock { - return &tailClientMock{ - recvTrigger: make(chan time.Time, 10), - } -} - -func (c *tailClientMock) Recv() (*logproto.TailResponse, error) { - args := c.Called() - return args.Get(0).(*logproto.TailResponse), args.Error(1) -} - -func (c *tailClientMock) Header() (grpc_metadata.MD, error) { - return nil, nil -} - -func (c *tailClientMock) Trailer() grpc_metadata.MD { - return nil -} - -func (c *tailClientMock) CloseSend() error { - return nil -} - -func (c *tailClientMock) Context() context.Context { - return context.Background() -} - -func (c *tailClientMock) SendMsg(_ interface{}) error { - return nil -} - -func (c *tailClientMock) RecvMsg(_ interface{}) error { - return nil -} - -func (c *tailClientMock) mockRecvWithTrigger(response *logproto.TailResponse) *tailClientMock { - c.On("Recv").WaitUntil(c.recvTrigger).Return(response, nil) - - return c -} - -// triggerRecv triggers the Recv() mock to return from the next invocation -// or from the current invocation if was already called and waiting for the -// trigger. This method works if and only if the Recv() has been mocked with -// mockRecvWithTrigger(). -func (c *tailClientMock) triggerRecv() { - c.recvTrigger <- time.Now() -} - // storeMock is a mockable version of Loki's storage, used in querier unit tests // to control the behaviour of the store without really hitting any storage backend type storeMock struct { @@ -655,8 +598,8 @@ func mockLogfmtStreamWithLabels(_ int, quantity int, lbls string) logproto.Strea streamLabels = labels.EmptyLabels() } - lblBuilder := logql_log.NewBaseLabelsBuilder().ForLabels(streamLabels, streamLabels.Hash()) - logFmtParser := logql_log.NewLogfmtParser(false, false) + lblBuilder := log.NewBaseLabelsBuilder().ForLabels(streamLabels, streamLabels.Hash()) + logFmtParser := log.NewLogfmtParser(false, false) // used for detected fields queries which are always BACKWARD for i := quantity; i > 0; i-- { @@ -765,9 +708,9 @@ func (q *querierMock) Series(ctx context.Context, req *logproto.SeriesRequest) ( return args.Get(0).(func() *logproto.SeriesResponse)(), args.Error(1) } -func (q *querierMock) Tail(_ context.Context, _ *logproto.TailRequest, _ bool) (*Tailer, error) { - return nil, errors.New("querierMock.Tail() has not been mocked") -} +// func (q *querierMock) Tail(_ context.Context, _ *logproto.TailRequest, _ bool) (*Tailer, error) { +// return nil, errors.New("querierMock.Tail() has not been mocked") +// } func (q *querierMock) IndexStats(_ context.Context, _ *loghttp.RangeQuery) (*stats.Stats, error) { return nil, nil @@ -833,7 +776,7 @@ func (q *querierMock) DetectedLabels(ctx context.Context, req *logproto.Detected return resp.(*logproto.DetectedLabelsResponse), err } -func (q *querierMock) WithPatternQuerier(_ PatterQuerier) {} +func (q *querierMock) WithPatternQuerier(_ pattern.PatterQuerier) {} type engineMock struct { util.ExtendedMock @@ -870,3 +813,13 @@ func (tl mockTenantLimits) TenantLimits(userID string) *validation.Limits { func (tl mockTenantLimits) AllByUserID() map[string]*validation.Limits { return tl } + +type mockDeleteGettter struct { + user string + results []deletion.DeleteRequest +} + +func (d *mockDeleteGettter) GetAllDeleteRequestsForUser(_ context.Context, userID string) ([]deletion.DeleteRequest, error) { + d.user = userID + return d.results, nil +} diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 41265e00df59f..971028f58d378 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -87,69 +87,6 @@ func TestQuerier_Label_QueryTimeoutConfigFlag(t *testing.T) { store.AssertExpectations(t) } -func TestQuerier_Tail_QueryTimeoutConfigFlag(t *testing.T) { - request := logproto.TailRequest{ - Query: `{type="test"}`, - DelayFor: 0, - Limit: 10, - Start: time.Now(), - Plan: &plan.QueryPlan{ - AST: syntax.MustParseExpr(`{type="test"}`), - }, - } - - store := newStoreMock() - store.On("SelectLogs", mock.Anything, mock.Anything).Return(mockStreamIterator(1, 2), nil) - - queryClient := newQueryClientMock() - queryClient.On("Recv").Return(mockQueryResponse([]logproto.Stream{mockStream(1, 2)}), nil) - - tailClient := newTailClientMock() - tailClient.On("Recv").Return(mockTailResponse(mockStream(1, 2)), nil) - - ingesterClient := newQuerierClientMock() - ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).Return(queryClient, nil) - ingesterClient.On("Tail", mock.Anything, &request, mock.Anything).Return(tailClient, nil) - ingesterClient.On("TailersCount", mock.Anything, mock.Anything, mock.Anything).Return(&logproto.TailersCountResponse{}, nil) - - limitsCfg := defaultLimitsTestConfig() - limitsCfg.QueryTimeout = model.Duration(queryTimeout) - limits, err := validation.NewOverrides(limitsCfg, nil) - require.NoError(t, err) - - q, err := newQuerier( - mockQuerierConfig(), - mockIngesterClientConfig(), - newIngesterClientMockFactory(ingesterClient), - mockReadRingWithOneActiveIngester(), - &mockDeleteGettter{}, - store, limits) - require.NoError(t, err) - - ctx := user.InjectOrgID(context.Background(), "test") - _, err = q.Tail(ctx, &request, false) - require.NoError(t, err) - - calls := ingesterClient.GetMockedCallsByMethod("Query") - assert.Equal(t, 1, len(calls)) - deadline, ok := calls[0].Arguments.Get(0).(context.Context).Deadline() - assert.True(t, ok) - assert.WithinDuration(t, deadline, time.Now().Add(queryTimeout), 1*time.Second) - - calls = ingesterClient.GetMockedCallsByMethod("Tail") - assert.Equal(t, 1, len(calls)) - _, ok = calls[0].Arguments.Get(0).(context.Context).Deadline() - assert.False(t, ok) - - calls = store.GetMockedCallsByMethod("SelectLogs") - assert.Equal(t, 1, len(calls)) - deadline, ok = calls[0].Arguments.Get(0).(context.Context).Deadline() - assert.True(t, ok) - assert.WithinDuration(t, deadline, time.Now().Add(queryTimeout), 1*time.Second) - - store.AssertExpectations(t) -} - func mockQuerierConfig() Config { return Config{ TailMaxDuration: 1 * time.Minute, @@ -483,91 +420,6 @@ func TestQuerier_IngesterMaxQueryLookback(t *testing.T) { } } -func TestQuerier_concurrentTailLimits(t *testing.T) { - request := logproto.TailRequest{ - Query: "{type=\"test\"}", - DelayFor: 0, - Limit: 10, - Start: time.Now(), - Plan: &plan.QueryPlan{ - AST: syntax.MustParseExpr("{type=\"test\"}"), - }, - } - - t.Parallel() - - tests := map[string]struct { - ringIngesters []ring.InstanceDesc - expectedError error - tailersCount uint32 - }{ - "empty ring": { - ringIngesters: []ring.InstanceDesc{}, - expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found"), - }, - "ring containing one pending ingester": { - ringIngesters: []ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.PENDING)}, - expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found"), - }, - "ring containing one active ingester and 0 active tailers": { - ringIngesters: []ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.ACTIVE)}, - }, - "ring containing one active ingester and 1 active tailer": { - ringIngesters: []ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.ACTIVE)}, - tailersCount: 1, - }, - "ring containing one pending and active ingester with 1 active tailer": { - ringIngesters: []ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.PENDING), mockInstanceDesc("2.2.2.2", ring.ACTIVE)}, - tailersCount: 1, - }, - "ring containing one active ingester and max active tailers": { - ringIngesters: []ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.ACTIVE)}, - expectedError: httpgrpc.Errorf(http.StatusBadRequest, - "max concurrent tail requests limit exceeded, count > limit (%d > %d)", 6, 5), - tailersCount: 5, - }, - } - - for testName, testData := range tests { - t.Run(testName, func(t *testing.T) { - // For this test's purpose, whenever a new ingester client needs to - // be created, the factory will always return the same mock instance - store := newStoreMock() - store.On("SelectLogs", mock.Anything, mock.Anything).Return(mockStreamIterator(1, 2), nil) - - queryClient := newQueryClientMock() - queryClient.On("Recv").Return(mockQueryResponse([]logproto.Stream{mockStream(1, 2)}), nil) - - tailClient := newTailClientMock() - tailClient.On("Recv").Return(mockTailResponse(mockStream(1, 2)), nil) - - ingesterClient := newQuerierClientMock() - ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).Return(queryClient, nil) - ingesterClient.On("Tail", mock.Anything, &request, mock.Anything).Return(tailClient, nil) - ingesterClient.On("TailersCount", mock.Anything, mock.Anything, mock.Anything).Return(&logproto.TailersCountResponse{Count: testData.tailersCount}, nil) - - defaultLimits := defaultLimitsTestConfig() - defaultLimits.MaxConcurrentTailRequests = 5 - - limits, err := validation.NewOverrides(defaultLimits, nil) - require.NoError(t, err) - - q, err := newQuerier( - mockQuerierConfig(), - mockIngesterClientConfig(), - newIngesterClientMockFactory(ingesterClient), - newReadRingMock(testData.ringIngesters, 0), - &mockDeleteGettter{}, - store, limits) - require.NoError(t, err) - - ctx := user.InjectOrgID(context.Background(), "test") - _, err = q.Tail(ctx, &request, false) - assert.Equal(t, testData.expectedError, err) - }) - } -} - func TestQuerier_buildQueryIntervals(t *testing.T) { // For simplicity it is always assumed that ingesterQueryStoreMaxLookback and queryIngestersWithin both would be set upto 11 hours so // overlappingQuery has range of last 11 hours while nonOverlappingQuery has range older than last 11 hours. @@ -1181,50 +1033,6 @@ func setupIngesterQuerierMocks(conf Config, limits *validation.Overrides) (*quer return ingesterClient, store, querier, nil } -type fakeTimeLimits struct { - maxQueryLookback time.Duration - maxQueryLength time.Duration -} - -func (f fakeTimeLimits) MaxQueryLookback(_ context.Context, _ string) time.Duration { - return f.maxQueryLookback -} - -func (f fakeTimeLimits) MaxQueryLength(_ context.Context, _ string) time.Duration { - return f.maxQueryLength -} - -func Test_validateQueryTimeRangeLimits(t *testing.T) { - now := time.Now() - nowFunc = func() time.Time { return now } - tests := []struct { - name string - limits TimeRangeLimits - from time.Time - through time.Time - wantFrom time.Time - wantThrough time.Time - wantErr bool - }{ - {"no change", fakeTimeLimits{1000 * time.Hour, 1000 * time.Hour}, now, now.Add(24 * time.Hour), now, now.Add(24 * time.Hour), false}, - {"clamped to 24h", fakeTimeLimits{24 * time.Hour, 1000 * time.Hour}, now.Add(-48 * time.Hour), now, now.Add(-24 * time.Hour), now, false}, - {"end before start", fakeTimeLimits{}, now, now.Add(-48 * time.Hour), time.Time{}, time.Time{}, true}, - {"query too long", fakeTimeLimits{maxQueryLength: 24 * time.Hour}, now.Add(-48 * time.Hour), now, time.Time{}, time.Time{}, true}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - from, through, err := validateQueryTimeRangeLimits(context.Background(), "foo", tt.limits, tt.from, tt.through) - if tt.wantErr { - require.NotNil(t, err) - } else { - require.Nil(t, err) - } - require.Equal(t, tt.wantFrom, from, "wanted (%s) got (%s)", tt.wantFrom, from) - require.Equal(t, tt.wantThrough, through) - }) - } -} - func TestQuerier_SelectLogWithDeletes(t *testing.T) { store := newStoreMock() store.On("SelectLogs", mock.Anything, mock.Anything).Return(mockStreamIterator(1, 2), nil) @@ -1365,17 +1173,7 @@ func newQuerier(cfg Config, clientCfg client.Config, clientFactory ring_client.P return nil, err } - return New(cfg, store, iq, limits, dg, nil, log.NewNopLogger()) -} - -type mockDeleteGettter struct { - user string - results []deletion.DeleteRequest -} - -func (d *mockDeleteGettter) GetAllDeleteRequestsForUser(_ context.Context, userID string) ([]deletion.DeleteRequest, error) { - d.user = userID - return d.results, nil + return New(cfg, store, iq, limits, dg, log.NewNopLogger()) } func TestQuerier_DetectedLabels(t *testing.T) { diff --git a/pkg/querier/tail/http.go b/pkg/querier/tail/http.go new file mode 100644 index 0000000000000..54c18e3240ea7 --- /dev/null +++ b/pkg/querier/tail/http.go @@ -0,0 +1,147 @@ +package tail + +import ( + "net/http" + "time" + + "github.com/go-kit/log/level" + "github.com/gorilla/websocket" + "github.com/grafana/dskit/httpgrpc" + + "github.com/grafana/dskit/tenant" + + "github.com/grafana/loki/v3/pkg/loghttp" + loghttp_legacy "github.com/grafana/loki/v3/pkg/loghttp/legacy" + "github.com/grafana/loki/v3/pkg/util/httpreq" + util_log "github.com/grafana/loki/v3/pkg/util/log" + "github.com/grafana/loki/v3/pkg/util/marshal" + marshal_legacy "github.com/grafana/loki/v3/pkg/util/marshal/legacy" + serverutil "github.com/grafana/loki/v3/pkg/util/server" +) + +const ( + wsPingPeriod = 1 * time.Second +) + +// TailHandler is a http.HandlerFunc for handling tail queries. +func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { + upgrader := websocket.Upgrader{ + CheckOrigin: func(_ *http.Request) bool { return true }, + } + logger := util_log.WithContext(r.Context(), util_log.Logger) + + req, err := loghttp.ParseTailQuery(r) + if err != nil { + serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), w) + return + } + + tenantID, err := tenant.TenantID(r.Context()) + if err != nil { + level.Warn(logger).Log("msg", "error getting tenant id", "err", err) + serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), w) + return + } + + encodingFlags := httpreq.ExtractEncodingFlags(r) + version := loghttp.GetVersion(r.RequestURI) + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + level.Error(logger).Log("msg", "Error in upgrading websocket", "err", err) + return + } + + level.Info(logger).Log("msg", "starting to tail logs", "tenant", tenantID, "selectors", req.Query) + + defer func() { + level.Info(logger).Log("msg", "ended tailing logs", "tenant", tenantID, "selectors", req.Query) + }() + + defer func() { + if err := conn.Close(); err != nil { + level.Error(logger).Log("msg", "Error closing websocket", "err", err) + } + }() + + tailer, err := q.Tail(r.Context(), req, encodingFlags.Has(httpreq.FlagCategorizeLabels)) + if err != nil { + if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { + level.Error(logger).Log("msg", "Error connecting to ingesters for tailing", "err", err) + } + return + } + defer func() { + if err := tailer.close(); err != nil { + level.Error(logger).Log("msg", "Error closing Tailer", "err", err) + } + }() + + ticker := time.NewTicker(wsPingPeriod) + defer ticker.Stop() + + connWriter := marshal.NewWebsocketJSONWriter(conn) + + var response *loghttp_legacy.TailResponse + responseChan := tailer.getResponseChan() + closeErrChan := tailer.getCloseErrorChan() + + doneChan := make(chan struct{}) + go func() { + for { + _, _, err := conn.ReadMessage() + if err != nil { + if closeErr, ok := err.(*websocket.CloseError); ok { + if closeErr.Code == websocket.CloseNormalClosure { + break + } + level.Error(logger).Log("msg", "Error from client", "err", err) + break + } else if tailer.stopped.Load() { + return + } + + level.Error(logger).Log("msg", "Unexpected error from client", "err", err) + break + } + } + doneChan <- struct{}{} + }() + + for { + select { + case response = <-responseChan: + var err error + if version == loghttp.VersionV1 { + err = marshal.WriteTailResponseJSON(*response, connWriter, encodingFlags) + } else { + err = marshal_legacy.WriteTailResponseJSON(*response, conn) + } + if err != nil { + level.Error(logger).Log("msg", "Error writing to websocket", "err", err) + if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { + level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err) + } + return + } + + case err := <-closeErrChan: + level.Error(logger).Log("msg", "Error from iterator", "err", err) + if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { + level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err) + } + return + case <-ticker.C: + // This is to periodically check whether connection is active, useful to clean up dead connections when there are no entries to send + if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { + level.Error(logger).Log("msg", "Error writing ping message to websocket", "err", err) + if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { + level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err) + } + return + } + case <-doneChan: + return + } + } +} diff --git a/pkg/querier/tail/http_test.go b/pkg/querier/tail/http_test.go new file mode 100644 index 0000000000000..216afb87932d9 --- /dev/null +++ b/pkg/querier/tail/http_test.go @@ -0,0 +1,48 @@ +package tail + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/user" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/validation" +) + +func TestTailHandler(t *testing.T) { + defaultLimits := defaultLimitsTestConfig() + limits, err := validation.NewOverrides(defaultLimits, nil) + require.NoError(t, err) + + tailQuerier := NewQuerier(nil, nil, nil, limits, 1*time.Minute, NewTailMetrics(nil), log.NewNopLogger()) + + req, err := http.NewRequest("GET", `/`, nil) + require.NoError(t, err) + q := req.URL.Query() + q.Add("query", `{app="loki"}`) + req.URL.RawQuery = q.Encode() + err = req.ParseForm() + require.NoError(t, err) + + ctx := user.InjectOrgID(req.Context(), "1|2") + req = req.WithContext(ctx) + require.NoError(t, err) + + rr := httptest.NewRecorder() + handler := http.HandlerFunc(tailQuerier.TailHandler) + + handler.ServeHTTP(rr, req) + require.Equal(t, http.StatusBadRequest, rr.Code) + require.Equal(t, "multiple org IDs present", rr.Body.String()) +} + +func defaultLimitsTestConfig() validation.Limits { + limits := validation.Limits{} + flagext.DefaultValues(&limits) + return limits +} diff --git a/pkg/querier/metrics.go b/pkg/querier/tail/metrics.go similarity index 85% rename from pkg/querier/metrics.go rename to pkg/querier/tail/metrics.go index 3da0690da46da..307d8c9a4f62b 100644 --- a/pkg/querier/metrics.go +++ b/pkg/querier/tail/metrics.go @@ -1,18 +1,18 @@ -package querier +package tail import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) -type Metrics struct { +type TailMetrics struct { tailsActive prometheus.Gauge tailedStreamsActive prometheus.Gauge tailedBytesTotal prometheus.Counter } -func NewMetrics(r prometheus.Registerer) *Metrics { - return &Metrics{ +func NewTailMetrics(r prometheus.Registerer) *TailMetrics { + return &TailMetrics{ tailsActive: promauto.With(r).NewGauge(prometheus.GaugeOpts{ Name: "loki_querier_tail_active", Help: "Number of active tailers", diff --git a/pkg/querier/tail/querier.go b/pkg/querier/tail/querier.go new file mode 100644 index 0000000000000..f379e70076bf2 --- /dev/null +++ b/pkg/querier/tail/querier.go @@ -0,0 +1,171 @@ +package tail + +import ( + "context" + "net/http" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/httpgrpc" + "github.com/grafana/dskit/tenant" + "github.com/pkg/errors" + + "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql" + "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/querier/deletion" + querier_limits "github.com/grafana/loki/v3/pkg/querier/limits" + "github.com/grafana/loki/v3/pkg/querier/plan" + "github.com/grafana/loki/v3/pkg/util/spanlogger" +) + +const ( + // How long the Tailer should wait - once there are no entries to read from ingesters - + // before checking if a new entry is available (to avoid spinning the CPU in a continuous + // check loop) + tailerWaitEntryThrottle = time.Second / 2 +) + +type Ingester interface { + TailDisconnectedIngesters(ctx context.Context, req *logproto.TailRequest, connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) + TailersCount(ctx context.Context) ([]uint32, error) + Tail(ctx context.Context, req *logproto.TailRequest) (map[string]logproto.Querier_TailClient, error) +} + +type Store interface { + SelectLogs(ctx context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) +} + +type Querier struct { + ingester Ingester + store Store + limits querier_limits.Limits + deleteGetter deletion.DeleteGetter + + tailMaxDuration time.Duration + metrics *TailMetrics + logger log.Logger +} + +func NewQuerier(ingester Ingester, store Store, deleteGetter deletion.DeleteGetter, limits querier_limits.Limits, tailMaxDuration time.Duration, metrics *TailMetrics, logger log.Logger) *Querier { + return &Querier{ + ingester: ingester, + store: store, + deleteGetter: deleteGetter, + limits: limits, + tailMaxDuration: tailMaxDuration, + metrics: metrics, + logger: logger, + } +} + +// Tail keeps getting matching logs from all ingesters for given query +func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest, categorizedLabels bool) (*Tailer, error) { + err := q.checkTailRequestLimit(ctx) + if err != nil { + return nil, err + } + + if req.Plan == nil { + parsed, err := syntax.ParseExpr(req.Query) + if err != nil { + return nil, err + } + req.Plan = &plan.QueryPlan{ + AST: parsed, + } + } + + deletes, err := deletion.DeletesForUserQuery(ctx, req.Start, time.Now(), q.deleteGetter) + if err != nil { + level.Error(spanlogger.FromContext(ctx)).Log("msg", "failed loading deletes for user", "err", err) + } + + histReq := logql.SelectLogParams{ + QueryRequest: &logproto.QueryRequest{ + Selector: req.Query, + Start: req.Start, + End: time.Now(), + Limit: req.Limit, + Direction: logproto.BACKWARD, + Deletes: deletes, + Plan: req.Plan, + }, + } + + histReq.Start, histReq.End, err = querier_limits.ValidateQueryRequest(ctx, histReq, q.limits) + if err != nil { + return nil, err + } + + // Enforce the query timeout except when tailing, otherwise the tailing + // will be terminated once the query timeout is reached + tailCtx := ctx + tenantID, err := tenant.TenantID(tailCtx) + if err != nil { + return nil, errors.Wrap(err, "failed to load tenant") + } + queryTimeout := q.limits.QueryTimeout(tailCtx, tenantID) + queryCtx, cancelQuery := context.WithDeadline(ctx, time.Now().Add(queryTimeout)) + defer cancelQuery() + + tailClients, err := q.ingester.Tail(tailCtx, req) + if err != nil { + return nil, err + } + + histIterators, err := q.store.SelectLogs(queryCtx, histReq) + if err != nil { + return nil, err + } + + reversedIterator, err := iter.NewReversedIter(histIterators, req.Limit, true) + if err != nil { + return nil, err + } + + return newTailer( + time.Duration(req.DelayFor)*time.Second, + tailClients, + reversedIterator, + func(connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) { + return q.ingester.TailDisconnectedIngesters(tailCtx, req, connectedIngestersAddr) + }, + q.tailMaxDuration, + tailerWaitEntryThrottle, + categorizedLabels, + q.metrics, + q.logger, + ), nil +} + +func (q *Querier) checkTailRequestLimit(ctx context.Context) error { + userID, err := tenant.TenantID(ctx) + if err != nil { + return err + } + + responses, err := q.ingester.TailersCount(ctx) + // We are only checking active ingesters, and any error returned stops checking other ingesters + // so return that error here as well. + if err != nil { + return err + } + + var maxCnt uint32 + maxCnt = 0 + for _, resp := range responses { + if resp > maxCnt { + maxCnt = resp + } + } + l := uint32(q.limits.MaxConcurrentTailRequests(ctx, userID)) + if maxCnt >= l { + return httpgrpc.Errorf(http.StatusBadRequest, + "max concurrent tail requests limit exceeded, count > limit (%d > %d)", maxCnt+1, l) + } + + return nil +} diff --git a/pkg/querier/tail/querier_test.go b/pkg/querier/tail/querier_test.go new file mode 100644 index 0000000000000..3dc34566d1b91 --- /dev/null +++ b/pkg/querier/tail/querier_test.go @@ -0,0 +1,181 @@ +package tail + +import ( + "context" + "net/http" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/httpgrpc" + "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/user" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/compactor/deletion" + "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/querier/plan" + "github.com/grafana/loki/v3/pkg/querier/testutil" +) + +const queryTimeout = 1 * time.Hour + +func mockInstanceDesc(addr string, state ring.InstanceState) ring.InstanceDesc { + return ring.InstanceDesc{ + Addr: addr, + State: state, + Timestamp: time.Now().Unix(), + } +} + +func TestQuerier_Tail_QueryTimeoutConfigFlag(t *testing.T) { + request := logproto.TailRequest{ + Query: `{type="test"}`, + DelayFor: 0, + Limit: 10, + Start: time.Now(), + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(`{type="test"}`), + }, + } + + // Setup mocks + tailClient := newTailClientMock() + tailClient.On("Recv").Return(mockTailResponse(logproto.Stream{ + Labels: `{type="test"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Now(), Line: "line 1"}, + {Timestamp: time.Now(), Line: "line 2"}, + }, + }), nil) + + // Setup ingester mock + ingester := newMockTailIngester() + clients := map[string]logproto.Querier_TailClient{ + "ingester-1": tailClient, + } + ingester.On("Tail", mock.Anything, &request).Return(clients, nil) + ingester.On("TailersCount", mock.Anything).Return([]uint32{0}, nil) + + // Setup log selector mock + logSelector := newMockTailLogSelector() + logSelector.On("SelectLogs", mock.Anything, mock.Anything).Return(iter.NoopEntryIterator, nil) + + // Setup limits + limits := &testutil.MockLimits{ + MaxQueryTimeoutVal: queryTimeout, + MaxStreamsMatchersPerQueryVal: 100, + MaxConcurrentTailRequestsVal: 10, + } + + // Create tail querier + tailQuerier := NewQuerier(ingester, logSelector, newMockDeleteGettter("test", []deletion.DeleteRequest{}), limits, 7*24*time.Hour, NewTailMetrics(nil), log.NewNopLogger()) + + // Run test + ctx := user.InjectOrgID(context.Background(), "test") + _, err := tailQuerier.Tail(ctx, &request, false) + require.NoError(t, err) + + // Verify expectations + ingester.AssertExpectations(t) + logSelector.AssertExpectations(t) +} + +func TestQuerier_concurrentTailLimits(t *testing.T) { + request := logproto.TailRequest{ + Query: "{type=\"test\"}", + DelayFor: 0, + Limit: 10, + Start: time.Now(), + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr("{type=\"test\"}"), + }, + } + + t.Parallel() + + tests := map[string]struct { + ringIngesters []ring.InstanceDesc + expectedError error + ingesterError error + tailersCount uint32 + }{ + "empty ring": { + ringIngesters: []ring.InstanceDesc{}, + expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found"), + ingesterError: httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found"), + }, + "ring containing one pending ingester": { + ringIngesters: []ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.PENDING)}, + expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found"), + ingesterError: httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found"), + }, + "ring containing one active ingester and 0 active tailers": { + ringIngesters: []ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.ACTIVE)}, + }, + "ring containing one active ingester and 1 active tailer": { + ringIngesters: []ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.ACTIVE)}, + tailersCount: 1, + }, + "ring containing one pending and active ingester with 1 active tailer": { + ringIngesters: []ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.PENDING), mockInstanceDesc("2.2.2.2", ring.ACTIVE)}, + tailersCount: 1, + }, + "ring containing one active ingester and max active tailers": { + ringIngesters: []ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.ACTIVE)}, + expectedError: httpgrpc.Errorf(http.StatusBadRequest, + "max concurrent tail requests limit exceeded, count > limit (%d > %d)", 6, 5), + tailersCount: 5, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "test") + // Setup mocks + tailClient := newTailClientMock() + tailClient.On("Recv").Return(mockTailResponse(logproto.Stream{ + Labels: `{type="test"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Now(), Line: "line 1"}, + {Timestamp: time.Now(), Line: "line 2"}, + }, + }), nil) + + // Setup ingester mock + ingester := newMockTailIngester() + clients := map[string]logproto.Querier_TailClient{ + "ingester-1": tailClient, + } + ingester.On("Tail", mock.Anything, &request).Return(clients, testData.ingesterError) + ingester.On("TailersCount", mock.Anything).Return([]uint32{testData.tailersCount}, testData.ingesterError) + + // Setup log selector mock + logSelector := newMockTailLogSelector() + logSelector.On("SelectLogs", mock.Anything, mock.Anything).Return(iter.NoopEntryIterator, nil) + + // Setup limits + limits := &testutil.MockLimits{ + MaxConcurrentTailRequestsVal: 5, + MaxStreamsMatchersPerQueryVal: 100, + } + + // Create tail querier + tailQuerier := NewQuerier(ingester, logSelector, newMockDeleteGettter("test", []deletion.DeleteRequest{}), limits, 7*24*time.Hour, NewTailMetrics(nil), log.NewNopLogger()) + + // Run + _, err := tailQuerier.Tail(ctx, &request, false) + assert.Equal(t, testData.expectedError, err) + + // Verify expectations if we expect the request to succeed + if testData.expectedError == nil { + ingester.AssertExpectations(t) + logSelector.AssertExpectations(t) + } + }) + } +} diff --git a/pkg/querier/tail.go b/pkg/querier/tail/tail.go similarity index 99% rename from pkg/querier/tail.go rename to pkg/querier/tail/tail.go index 0d9495daf6e46..e758c637e904d 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail/tail.go @@ -1,4 +1,4 @@ -package querier +package tail import ( "context" @@ -63,7 +63,7 @@ type Tailer struct { // if we are not seeing any response from ingester, // how long do we want to wait by going into sleep waitEntryThrottle time.Duration - metrics *Metrics + metrics *TailMetrics logger log.Logger } @@ -318,7 +318,7 @@ func newTailer( tailMaxDuration time.Duration, waitEntryThrottle time.Duration, categorizeLabels bool, - m *Metrics, + m *TailMetrics, logger log.Logger, ) *Tailer { historicEntriesIter := historicEntries diff --git a/pkg/querier/tail/tail_mock_test.go b/pkg/querier/tail/tail_mock_test.go new file mode 100644 index 0000000000000..2f2d3f9cf205d --- /dev/null +++ b/pkg/querier/tail/tail_mock_test.go @@ -0,0 +1,129 @@ +package tail + +import ( + "context" + "time" + + grpc_metadata "google.golang.org/grpc/metadata" + + "github.com/stretchr/testify/mock" + + "github.com/grafana/loki/v3/pkg/compactor/deletion" + "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql" + "github.com/grafana/loki/v3/pkg/util" +) + +// mockTailIngester implements tailIngester interface for testing +type mockTailIngester struct { + mock.Mock +} + +func newMockTailIngester() *mockTailIngester { + return &mockTailIngester{} +} + +func (m *mockTailIngester) TailDisconnectedIngesters(ctx context.Context, req *logproto.TailRequest, connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) { + args := m.Called(ctx, req, connectedIngestersAddr) + return args.Get(0).(map[string]logproto.Querier_TailClient), args.Error(1) +} + +func (m *mockTailIngester) TailersCount(ctx context.Context) ([]uint32, error) { + args := m.Called(ctx) + return args.Get(0).([]uint32), args.Error(1) +} + +func (m *mockTailIngester) Tail(ctx context.Context, req *logproto.TailRequest) (map[string]logproto.Querier_TailClient, error) { + args := m.Called(ctx, req) + return args.Get(0).(map[string]logproto.Querier_TailClient), args.Error(1) +} + +// mockTailLogSelector implements tailLogSelector interface for testing +type mockTailLogSelector struct { + mock.Mock +} + +func newMockTailLogSelector() *mockTailLogSelector { + return &mockTailLogSelector{} +} + +func (m *mockTailLogSelector) SelectLogs(ctx context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) { + args := m.Called(ctx, params) + return args.Get(0).(iter.EntryIterator), args.Error(1) +} + +func mockTailResponse(stream logproto.Stream) *logproto.TailResponse { + return &logproto.TailResponse{ + Stream: &stream, + DroppedStreams: []*logproto.DroppedStream{}, + } +} + +// tailClientMock is mockable version of Querier_TailClient +type tailClientMock struct { + util.ExtendedMock + logproto.Querier_TailClient + recvTrigger chan time.Time +} + +func newTailClientMock() *tailClientMock { + return &tailClientMock{ + recvTrigger: make(chan time.Time, 10), + } +} + +func (c *tailClientMock) Recv() (*logproto.TailResponse, error) { + args := c.Called() + return args.Get(0).(*logproto.TailResponse), args.Error(1) +} + +func (c *tailClientMock) Header() (grpc_metadata.MD, error) { + return nil, nil +} + +func (c *tailClientMock) Trailer() grpc_metadata.MD { + return nil +} + +func (c *tailClientMock) CloseSend() error { + return nil +} + +func (c *tailClientMock) Context() context.Context { + return context.Background() +} + +func (c *tailClientMock) SendMsg(_ interface{}) error { + return nil +} + +func (c *tailClientMock) RecvMsg(_ interface{}) error { + return nil +} + +func (c *tailClientMock) mockRecvWithTrigger(response *logproto.TailResponse) *tailClientMock { + c.On("Recv").WaitUntil(c.recvTrigger).Return(response, nil) + return c +} + +func (c *tailClientMock) triggerRecv() { + c.recvTrigger <- time.Now() +} + +type mockDeleteGettter struct { + user string + results []deletion.DeleteRequest +} + +func newMockDeleteGettter(user string, results []deletion.DeleteRequest) *mockDeleteGettter { + return &mockDeleteGettter{ + user: user, + results: results, + } +} + +func (d *mockDeleteGettter) GetAllDeleteRequestsForUser(_ context.Context, userID string) ([]deletion.DeleteRequest, error) { + d.user = userID + return d.results, nil +} diff --git a/pkg/querier/tail_test.go b/pkg/querier/tail/tail_test.go similarity index 86% rename from pkg/querier/tail_test.go rename to pkg/querier/tail/tail_test.go index 3be5e5f053dc9..8a6a448f5b952 100644 --- a/pkg/querier/tail_test.go +++ b/pkg/querier/tail/tail_test.go @@ -1,20 +1,19 @@ -package querier +package tail import ( - "errors" "testing" "time" "github.com/go-kit/log" + "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - gokitlog "github.com/go-kit/log" + "gotest.tools/assert" "github.com/grafana/loki/v3/pkg/iter" loghttp "github.com/grafana/loki/v3/pkg/loghttp/legacy" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/querier/testutil" ) const ( @@ -31,23 +30,23 @@ func TestTailer(t *testing.T) { tester func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) }{ "tail logs from historic entries only (no tail clients provided)": { - historicEntries: mockStreamIterator(1, 2), + historicEntries: testutil.NewFakeStreamIterator(1, 2), tailClient: nil, tester: func(t *testing.T, tailer *Tailer, _ *tailClientMock) { responses, err := readFromTailer(tailer, 2) require.NoError(t, err) actual := flattenStreamsFromResponses(responses) - - assert.Equal(t, []logproto.Stream{ - mockStream(1, 1), - mockStream(2, 1), - }, actual) + expected := []logproto.Stream{ + testutil.NewFakeStream(1, 1), + testutil.NewFakeStream(2, 1), + } + compareStreams(t, expected, actual) }, }, "tail logs from tail clients only (no historic entries provided)": { - historicEntries: mockStreamIterator(0, 0), - tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(mockStream(1, 1))), + historicEntries: testutil.NewFakeStreamIterator(0, 0), + tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(testutil.NewFakeStream(1, 1))), tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) { tailClient.triggerRecv() @@ -55,15 +54,15 @@ func TestTailer(t *testing.T) { require.NoError(t, err) actual := flattenStreamsFromResponses(responses) - - assert.Equal(t, []logproto.Stream{ - mockStream(1, 1), - }, actual) + expected := []logproto.Stream{ + testutil.NewFakeStream(1, 1), + } + compareStreams(t, expected, actual) }, }, "tail logs both from historic entries and tail clients": { - historicEntries: mockStreamIterator(1, 2), - tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(mockStream(3, 1))), + historicEntries: testutil.NewFakeStreamIterator(1, 2), + tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(testutil.NewFakeStream(3, 1))), tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) { tailClient.triggerRecv() @@ -71,16 +70,16 @@ func TestTailer(t *testing.T) { require.NoError(t, err) actual := flattenStreamsFromResponses(responses) - - assert.Equal(t, []logproto.Stream{ - mockStream(1, 1), - mockStream(2, 1), - mockStream(3, 1), - }, actual) + expected := []logproto.Stream{ + testutil.NewFakeStream(1, 1), + testutil.NewFakeStream(2, 1), + testutil.NewFakeStream(3, 1), + } + compareStreams(t, expected, actual) }, }, "honor max entries per tail response": { - historicEntries: mockStreamIterator(1, maxEntriesPerTailResponse+1), + historicEntries: testutil.NewFakeStreamIterator(1, maxEntriesPerTailResponse+1), tailClient: nil, tester: func(t *testing.T, tailer *Tailer, _ *tailClientMock) { responses, err := readFromTailer(tailer, maxEntriesPerTailResponse+1) @@ -93,8 +92,8 @@ func TestTailer(t *testing.T) { }, }, "honor max buffered tail responses": { - historicEntries: mockStreamIterator(1, (maxEntriesPerTailResponse*maxBufferedTailResponses)+5), - tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(mockStream(1, 1))), + historicEntries: testutil.NewFakeStreamIterator(1, (maxEntriesPerTailResponse*maxBufferedTailResponses)+5), + tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(testutil.NewFakeStream(1, 1))), tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) { err := waitUntilTailerOpenStreamsHaveBeenConsumed(tailer) require.NoError(t, err) @@ -123,8 +122,8 @@ func TestTailer(t *testing.T) { }, }, "honor max dropped entries per tail response": { - historicEntries: mockStreamIterator(1, (maxEntriesPerTailResponse*maxBufferedTailResponses)+maxDroppedEntriesPerTailResponse+5), - tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(mockStream(1, 1))), + historicEntries: testutil.NewFakeStreamIterator(1, (maxEntriesPerTailResponse*maxBufferedTailResponses)+maxDroppedEntriesPerTailResponse+5), + tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(testutil.NewFakeStream(1, 1))), tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) { err := waitUntilTailerOpenStreamsHaveBeenConsumed(tailer) require.NoError(t, err) @@ -165,7 +164,7 @@ func TestTailer(t *testing.T) { tailClients["test"] = test.tailClient } - tailer := newTailer(0, tailClients, test.historicEntries, tailDisconnectedIngesters, timeout, throttle, false, NewMetrics(nil), gokitlog.NewNopLogger()) + tailer := newTailer(0, tailClients, test.historicEntries, tailDisconnectedIngesters, timeout, throttle, false, NewTailMetrics(nil), log.NewNopLogger()) defer tailer.close() test.tester(t, tailer, test.tailClient) @@ -360,7 +359,7 @@ func TestCategorizedLabels(t *testing.T) { tailClients[k] = v } - tailer := newTailer(0, tailClients, tc.historicEntries, tailDisconnectedIngesters, timeout, throttle, tc.categorizeLabels, NewMetrics(nil), log.NewNopLogger()) + tailer := newTailer(0, tailClients, tc.historicEntries, tailDisconnectedIngesters, timeout, throttle, tc.categorizeLabels, NewTailMetrics(nil), log.NewNopLogger()) defer tailer.close() // Make tail clients receive their responses @@ -464,3 +463,19 @@ func flattenStreamsFromResponses(responses []*loghttp.TailResponse) []logproto.S return result } + +// compareStreams compares two slices of logproto.Stream +func compareStreams(t *testing.T, expected, actual []logproto.Stream) { + t.Helper() + require.Equal(t, len(expected), len(actual), "number of streams mismatch") + + for i := range expected { + require.Equal(t, expected[i].Labels, actual[i].Labels, "labels mismatch at index %d", i) + require.Equal(t, len(expected[i].Entries), len(actual[i].Entries), "number of entries mismatch at index %d", i) + + for j := range expected[i].Entries { + require.Equal(t, expected[i].Entries[j].Line, actual[i].Entries[j].Line, "entry line mismatch at index %d,%d", i, j) + require.Equal(t, expected[i].Entries[j].Timestamp.Unix(), actual[i].Entries[j].Timestamp.Unix(), "entry timestamp mismatch at index %d,%d", i, j) + } + } +} diff --git a/pkg/querier/tail_mock_test.go b/pkg/querier/tail_mock_test.go deleted file mode 100644 index a1d161d2f2c95..0000000000000 --- a/pkg/querier/tail_mock_test.go +++ /dev/null @@ -1,10 +0,0 @@ -package querier - -import "github.com/grafana/loki/v3/pkg/logproto" - -func mockTailResponse(stream logproto.Stream) *logproto.TailResponse { - return &logproto.TailResponse{ - Stream: &stream, - DroppedStreams: []*logproto.DroppedStream{}, - } -} diff --git a/pkg/querier/testutil/iterator.go b/pkg/querier/testutil/iterator.go new file mode 100644 index 0000000000000..7a6c4e07b19b2 --- /dev/null +++ b/pkg/querier/testutil/iterator.go @@ -0,0 +1,38 @@ +package testutil + +import ( + "fmt" + "time" + + "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/logproto" +) + +// mockStreamIterator returns an iterator with 1 stream and quantity entries, +// where entries timestamp and line string are constructed as sequential numbers +// starting at from +func NewFakeStreamIterator(from int, quantity int) iter.EntryIterator { + return iter.NewStreamIterator(NewFakeStream(from, quantity)) +} + +// mockStream return a stream with quantity entries, where entries timestamp and +// line string are constructed as sequential numbers starting at from +func NewFakeStream(from int, quantity int) logproto.Stream { + return NewFakeStreamWithLabels(from, quantity, `{type="test"}`) +} + +func NewFakeStreamWithLabels(from int, quantity int, labels string) logproto.Stream { + entries := make([]logproto.Entry, 0, quantity) + + for i := from; i < from+quantity; i++ { + entries = append(entries, logproto.Entry{ + Timestamp: time.Unix(int64(i), 0), + Line: fmt.Sprintf("line %d", i), + }) + } + + return logproto.Stream{ + Entries: entries, + Labels: labels, + } +} diff --git a/pkg/querier/testutil/limits.go b/pkg/querier/testutil/limits.go new file mode 100644 index 0000000000000..46fad24f6e7b0 --- /dev/null +++ b/pkg/querier/testutil/limits.go @@ -0,0 +1,56 @@ +package testutil + +import ( + "context" + "time" + + "github.com/grafana/loki/v3/pkg/util/validation" +) + +// MockLimits is a mock implementation of limits.Limits interface that can be used in tests +type MockLimits struct { + MaxQueryLookbackVal time.Duration + MaxQueryLengthVal time.Duration + MaxQueryTimeoutVal time.Duration + MaxQueryRangeVal time.Duration + MaxQuerySeriesVal int + MaxConcurrentTailRequestsVal int + MaxEntriesLimitPerQueryVal int + MaxStreamsMatchersPerQueryVal int +} + +func (m *MockLimits) MaxQueryLookback(_ context.Context, _ string) time.Duration { + return m.MaxQueryLookbackVal +} + +func (m *MockLimits) MaxQueryLength(_ context.Context, _ string) time.Duration { + return m.MaxQueryLengthVal +} + +func (m *MockLimits) QueryTimeout(_ context.Context, _ string) time.Duration { + return m.MaxQueryTimeoutVal +} + +func (m *MockLimits) MaxQueryRange(_ context.Context, _ string) time.Duration { + return m.MaxQueryRangeVal +} + +func (m *MockLimits) MaxQuerySeries(_ context.Context, _ string) int { + return m.MaxQuerySeriesVal +} + +func (m *MockLimits) MaxConcurrentTailRequests(_ context.Context, _ string) int { + return m.MaxConcurrentTailRequestsVal +} + +func (m *MockLimits) MaxEntriesLimitPerQuery(_ context.Context, _ string) int { + return m.MaxEntriesLimitPerQueryVal +} + +func (m *MockLimits) MaxStreamsMatchersPerQuery(_ context.Context, _ string) int { + return m.MaxStreamsMatchersPerQueryVal +} + +func (m *MockLimits) BlockedQueries(_ context.Context, _ string) []*validation.BlockedQuery { + return nil +}