Skip to content

Commit

Permalink
fix: Determine when all logs have been filtered (#16073)
Browse files Browse the repository at this point in the history
  • Loading branch information
MasslessParticle authored Feb 3, 2025
1 parent 91ff737 commit f2bff20
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 26 deletions.
23 changes: 17 additions & 6 deletions pkg/distributor/http.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package distributor

import (
"errors"
"fmt"
"net/http"
"strings"
Expand Down Expand Up @@ -42,16 +43,26 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
logPushRequestStreams := d.tenantConfigs.LogPushRequestStreams(tenantID)
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser, d.usageTracker, logPushRequestStreams)
if err != nil {
if !errors.Is(err, push.ErrAllLogsFiltered) {
if d.tenantConfigs.LogPushRequest(tenantID) {
level.Debug(logger).Log(
"msg", "push request failed",
"code", http.StatusBadRequest,
"err", err,
)
}
d.writeFailuresManager.Log(tenantID, fmt.Errorf("couldn't parse push request: %w", err))

errorWriter(w, err.Error(), http.StatusBadRequest, logger)
return
}

if d.tenantConfigs.LogPushRequest(tenantID) {
level.Debug(logger).Log(
"msg", "push request failed",
"code", http.StatusBadRequest,
"err", err,
"msg", "successful push request filtered all lines",
)
}
d.writeFailuresManager.Log(tenantID, fmt.Errorf("couldn't parse push request: %w", err))

errorWriter(w, err.Error(), http.StatusBadRequest, logger)
w.WriteHeader(http.StatusNoContent)
return
}

Expand Down
71 changes: 55 additions & 16 deletions pkg/distributor/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,27 +63,66 @@ func TestDistributorRingHandler(t *testing.T) {
}

func TestRequestParserWrapping(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.RejectOldSamples = false
distributors, _ := prepare(t, 1, 3, limits, nil)
t.Run("it calls the parser wrapper if there is one", func(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.RejectOldSamples = false
distributors, _ := prepare(t, 1, 3, limits, nil)

var called bool
distributors[0].RequestParserWrapper = func(requestParser push.RequestParser) push.RequestParser {
called = true
return requestParser
}
var called bool
distributors[0].RequestParserWrapper = func(requestParser push.RequestParser) push.RequestParser {
called = true
return requestParser
}

ctx := user.InjectOrgID(context.Background(), "test-user")
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "fake-path", nil)
require.NoError(t, err)

rec := httptest.NewRecorder()
distributors[0].pushHandler(rec, req, newFakeParser().parseRequest, push.HTTPError)

ctx := user.InjectOrgID(context.Background(), "test-user")
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "fake-path", nil)
require.NoError(t, err)
// unprocessable code because there are no streams in the request.
require.Equal(t, http.StatusUnprocessableEntity, rec.Code)
require.True(t, called)
})

t.Run("it returns 204 when the parser wrapper filteres all log lines", func(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.RejectOldSamples = false
distributors, _ := prepare(t, 1, 3, limits, nil)

distributors[0].pushHandler(httptest.NewRecorder(), req, stubParser, push.HTTPError)
var called bool
distributors[0].RequestParserWrapper = func(requestParser push.RequestParser) push.RequestParser {
called = true
return requestParser
}

ctx := user.InjectOrgID(context.Background(), "test-user")
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "fake-path", nil)
require.NoError(t, err)

parser := newFakeParser()
parser.parseErr = push.ErrAllLogsFiltered

rec := httptest.NewRecorder()
distributors[0].pushHandler(rec, req, parser.parseRequest, push.HTTPError)

require.True(t, called)
require.Equal(t, http.StatusNoContent, rec.Code)
})
}

type fakeParser struct {
parseErr error
}

require.True(t, called)
func newFakeParser() *fakeParser {
return &fakeParser{}
}

func stubParser(
func (p *fakeParser) parseRequest(
_ string,
_ *http.Request,
_ push.TenantsRetention,
Expand All @@ -92,5 +131,5 @@ func stubParser(
_ bool,
_ log.Logger,
) (*logproto.PushRequest, *push.Stats, error) {
return &logproto.PushRequest{}, &push.Stats{}, nil
return &logproto.PushRequest{}, &push.Stats{}, p.parseErr
}
12 changes: 8 additions & 4 deletions pkg/loghttp/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@ import (
"time"

"github.com/go-kit/log/level"
"github.com/pkg/errors"

"github.com/grafana/loki/pkg/push"

"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"

"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"

"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/loghttp"
Expand Down Expand Up @@ -66,6 +68,8 @@ const (
AggregatedMetricLabel = "__aggregated_metric__"
)

var ErrAllLogsFiltered = errors.New("all logs lines filtered during parsing")

type TenantsRetention interface {
RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration
}
Expand Down Expand Up @@ -111,7 +115,7 @@ type Stats struct {

func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser, tracker UsageTracker, logPushRequestStreams bool) (*logproto.PushRequest, error) {
req, pushStats, err := pushRequestParser(userID, r, tenantsRetention, limits, tracker, logPushRequestStreams, logger)
if err != nil {
if err != nil && !errors.Is(err, ErrAllLogsFiltered) {
return nil, err
}

Expand Down Expand Up @@ -164,7 +168,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
logValues = append(logValues, pushStats.Extra...)
level.Debug(logger).Log(logValues...)

return req, nil
return req, err
}

func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) {
Expand Down

0 comments on commit f2bff20

Please sign in to comment.