diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b89bfefc54..44477167c04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ * [CHANGE] Querier: Renamed `-querier.prefer-streaming-chunks` to `-querier.prefer-streaming-chunks-from-ingesters` to enable streaming chunks from ingesters to queriers. #5182 * [CHANGE] Querier: `-query-frontend.cache-unaligned-requests` has been moved from a global flag to a per-tenant override. #5312 * [CHANGE] Ingester: removed `cortex_ingester_shipper_dir_syncs_total` and `cortex_ingester_shipper_dir_sync_failures_total` metrics. The former metric was not much useful, and the latter was never incremented. #5396 +* [CHANGE] Ingester: Do not log errors related to hitting per-instance limits to reduce resource usage when ingesters are under pressure. #5585 * [CHANGE] gRPC clients: use default connect timeout of 5s, and therefore enable default connect backoff max delay of 5s. #5562 * [FEATURE] Cardinality API: Add a new `count_method` parameter which enables counting active series #5136 * [FEATURE] Query-frontend: added experimental support to cache cardinality, label names and label values query responses. The cache will be used when `-query-frontend.cache-results` is enabled, and `-query-frontend.results-cache-ttl-for-cardinality-query` or `-query-frontend.results-cache-ttl-for-labels-query` set to a value greater than 0. The following metrics have been added to track the query results cache hit ratio per `request_type`: #5212 #5235 #5426 #5524 diff --git a/go.mod b/go.mod index 2844be650af..9d8db24b45f 100644 --- a/go.mod +++ b/go.mod @@ -84,7 +84,6 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 // indirect - github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/go-test/deep v1.1.0 // indirect github.com/hashicorp/go-retryablehttp v0.7.4 // indirect @@ -100,6 +99,7 @@ require ( cloud.google.com/go/compute/metadata v0.2.3 // indirect cloud.google.com/go/iam v1.1.1 // indirect github.com/DmitriyVTitov/size v1.5.0 // indirect + github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 7ef79beb056..4a801f181e4 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -760,6 +760,11 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) ( db, err := i.getOrCreateTSDB(userID, false) if err != nil { + // Check for a particular per-instance limit and return that error directly + // since it contains extra information for gRPC and our logging middleware. + if errors.Is(err, errMaxTenantsReached) { + return nil, err + } return nil, wrapWithUser(err, userID) } @@ -805,7 +810,12 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) ( level.Warn(i.logger).Log("msg", "failed to rollback appender on error", "user", userID, "err", err) } - return nil, err + // Check for a particular per-instance limit and return that error directly + // since it contains extra information for gRPC and our logging middleware. + if errors.Is(err, errMaxInMemorySeriesReached) { + return nil, err + } + return nil, wrapWithUser(err, userID) } // At this point all samples have been added to the appender, so we can track the time it took. @@ -1038,7 +1048,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre } // Otherwise, return a 500. - return wrapWithUser(err, userID) + return err } numNativeHistogramBuckets := -1 @@ -1079,7 +1089,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre continue } - return wrapWithUser(err, userID) + return err } numNativeHistograms := len(ts.Histograms) if numNativeHistograms > 0 { diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index a51ede4e8a7..ccd7e6859f1 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -53,6 +53,8 @@ import ( "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/grafana/mimir/pkg/ingester/activeseries" "github.com/grafana/mimir/pkg/ingester/client" @@ -63,7 +65,6 @@ import ( "github.com/grafana/mimir/pkg/storage/tsdb/block" "github.com/grafana/mimir/pkg/usagestats" "github.com/grafana/mimir/pkg/util" - util_log "github.com/grafana/mimir/pkg/util/log" util_math "github.com/grafana/mimir/pkg/util/math" "github.com/grafana/mimir/pkg/util/push" util_test "github.com/grafana/mimir/pkg/util/test" @@ -5415,10 +5416,9 @@ func TestIngesterNoFlushWithInFlightRequest(t *testing.T) { func TestIngester_PushInstanceLimits(t *testing.T) { tests := map[string]struct { - limits InstanceLimits - reqs map[string][]*mimirpb.WriteRequest - expectedErr error - expectedErrType interface{} + limits InstanceLimits + reqs map[string][]*mimirpb.WriteRequest + expectedErr error }{ "should succeed creating one user and series": { limits: InstanceLimits{MaxInMemorySeries: 1, MaxInMemoryTenants: 1}, @@ -5461,7 +5461,7 @@ func TestIngester_PushInstanceLimits(t *testing.T) { }, }, - expectedErr: wrapWithUser(errMaxInMemorySeriesReached, "test"), + expectedErr: errMaxInMemorySeriesReached, }, "should fail creating two users": { @@ -5488,7 +5488,7 @@ func TestIngester_PushInstanceLimits(t *testing.T) { ), }, }, - expectedErr: wrapWithUser(errMaxTenantsReached, "user2"), + expectedErr: errMaxTenantsReached, }, "should fail pushing samples in two requests due to rate limit": { @@ -5557,9 +5557,12 @@ func TestIngester_PushInstanceLimits(t *testing.T) { } else { // Last push may expect error. if testData.expectedErr != nil { - assert.Equal(t, testData.expectedErr, err) - } else if testData.expectedErrType != nil { - assert.True(t, errors.As(err, testData.expectedErrType), "expected error type %T, got %v", testData.expectedErrType, err) + assert.ErrorIs(t, err, testData.expectedErr) + var optional middleware.OptionalLogging + assert.ErrorAs(t, err, &optional) + s, ok := status.FromError(err) + require.True(t, ok, "expected to be able to convert to gRPC status") + assert.Equal(t, codes.Unavailable, s.Code()) } else { assert.NoError(t, err) } @@ -5687,10 +5690,17 @@ func TestIngester_inflightPushRequests(t *testing.T) { time.Sleep(10 * time.Millisecond) // Give first goroutine a chance to start pushing... req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase"), 1, 1024) + var optional middleware.OptionalLogging _, err := i.Push(ctx, req) - require.Equal(t, errMaxInflightRequestsReached, err) - require.ErrorAs(t, err, &util_log.DoNotLogError{}) + require.ErrorIs(t, err, errMaxInflightRequestsReached) + require.ErrorAs(t, err, &optional) + require.False(t, optional.ShouldLog(ctx, time.Duration(0)), "expected not to log via .ShouldLog()") + + s, ok := status.FromError(err) + require.True(t, ok, "expected to be able to convert to gRPC status") + require.Equal(t, codes.Unavailable, s.Code()) + return nil }) diff --git a/pkg/ingester/instance_limits.go b/pkg/ingester/instance_limits.go index 6716a1f7bfc..9239b83dce6 100644 --- a/pkg/ingester/instance_limits.go +++ b/pkg/ingester/instance_limits.go @@ -6,13 +6,15 @@ package ingester import ( + "context" "flag" + "time" - "github.com/pkg/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "gopkg.in/yaml.v3" "github.com/grafana/mimir/pkg/util/globalerror" - util_log "github.com/grafana/mimir/pkg/util/log" ) const ( @@ -22,14 +24,41 @@ const ( maxInflightPushRequestsFlag = "ingester.instance-limits.max-inflight-push-requests" ) +// We don't include values in the messages for per-instance limits to avoid leaking Mimir cluster configuration to users. var ( - // We don't include values in the message to avoid leaking Mimir cluster configuration to users. - errMaxIngestionRateReached = errors.New(globalerror.IngesterMaxIngestionRate.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the samples ingestion rate limit", maxIngestionRateFlag)) - errMaxTenantsReached = errors.New(globalerror.IngesterMaxTenants.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of tenants", maxInMemoryTenantsFlag)) - errMaxInMemorySeriesReached = errors.New(globalerror.IngesterMaxInMemorySeries.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of in-memory series", maxInMemorySeriesFlag)) - errMaxInflightRequestsReached = util_log.DoNotLogError{Err: errors.New(globalerror.IngesterMaxInflightPushRequests.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of inflight push requests", maxInflightPushRequestsFlag))} + errMaxIngestionRateReached = newInstanceLimitError(globalerror.IngesterMaxIngestionRate.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the samples ingestion rate limit", maxIngestionRateFlag)) + errMaxTenantsReached = newInstanceLimitError(globalerror.IngesterMaxTenants.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of tenants", maxInMemoryTenantsFlag)) + errMaxInMemorySeriesReached = newInstanceLimitError(globalerror.IngesterMaxInMemorySeries.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of in-memory series", maxInMemorySeriesFlag)) + errMaxInflightRequestsReached = newInstanceLimitError(globalerror.IngesterMaxInflightPushRequests.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of inflight push requests", maxInflightPushRequestsFlag)) ) +type instanceLimitErr struct { + msg string + status *status.Status +} + +func newInstanceLimitError(msg string) error { + return &instanceLimitErr{ + // Errors from hitting per-instance limits are always "unavailable" for gRPC + status: status.New(codes.Unavailable, msg), + msg: msg, + } +} + +func (e *instanceLimitErr) ShouldLog(context.Context, time.Duration) bool { + // We increment metrics when hitting per-instance limits and so there's no need to + // log them, the error doesn't contain any interesting information for us. + return false +} + +func (e *instanceLimitErr) GRPCStatus() *status.Status { + return e.status +} + +func (e *instanceLimitErr) Error() string { + return e.msg +} + // InstanceLimits describes limits used by ingester. Reaching any of these will result in Push method to return // (internal) error. type InstanceLimits struct { diff --git a/pkg/ingester/instance_limits_test.go b/pkg/ingester/instance_limits_test.go index b779a4f1e52..6c73c61c215 100644 --- a/pkg/ingester/instance_limits_test.go +++ b/pkg/ingester/instance_limits_test.go @@ -6,10 +6,16 @@ package ingester import ( + "context" + "fmt" "strings" "testing" + "time" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/middleware" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "gopkg.in/yaml.v3" ) @@ -34,3 +40,31 @@ max_tenants: 50000 require.Equal(t, int64(30), l.MaxInMemorySeries) // default value require.Equal(t, int64(40), l.MaxInflightPushRequests) // default value } + +func TestInstanceLimitErr(t *testing.T) { + t.Run("bare error implements ShouldLog()", func(t *testing.T) { + var optional middleware.OptionalLogging + require.ErrorAs(t, errMaxInflightRequestsReached, &optional) + require.False(t, optional.ShouldLog(context.Background(), time.Duration(0))) + }) + + t.Run("wrapped error implements ShouldLog()", func(t *testing.T) { + err := fmt.Errorf("%w: oh no", errMaxTenantsReached) + var optional middleware.OptionalLogging + require.ErrorAs(t, err, &optional) + require.False(t, optional.ShouldLog(context.Background(), time.Duration(0))) + }) + + t.Run("bare error implements GRPCStatus()", func(t *testing.T) { + s, ok := status.FromError(errMaxInMemorySeriesReached) + require.True(t, ok, "expected to be able to convert to gRPC status") + require.Equal(t, codes.Unavailable, s.Code()) + }) + + t.Run("wrapped error implements GRPCStatus()", func(t *testing.T) { + err := fmt.Errorf("%w: oh no", errMaxIngestionRateReached) + s, ok := status.FromError(err) + require.True(t, ok, "expected to be able to convert to gRPC status") + require.Equal(t, codes.Unavailable, s.Code()) + }) +}