Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create base error type for ingester per-instance errors and remove logging for them #5585

Merged
merged 3 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [CHANGE] Querier: Renamed `-querier.prefer-streaming-chunks` to `-querier.prefer-streaming-chunks-from-ingesters` to enable streaming chunks from ingesters to queriers. #5182
* [CHANGE] Querier: `-query-frontend.cache-unaligned-requests` has been moved from a global flag to a per-tenant override. #5312
* [CHANGE] Ingester: removed `cortex_ingester_shipper_dir_syncs_total` and `cortex_ingester_shipper_dir_sync_failures_total` metrics. The former metric was not much useful, and the latter was never incremented. #5396
* [CHANGE] Ingester: Do not log errors related to hitting per-instance limits to reduce resource usage when ingesters are under pressure. #5585
* [CHANGE] gRPC clients: use default connect timeout of 5s, and therefore enable default connect backoff max delay of 5s. #5562
* [FEATURE] Cardinality API: Add a new `count_method` parameter which enables counting active series #5136
* [FEATURE] Query-frontend: added experimental support to cache cardinality, label names and label values query responses. The cache will be used when `-query-frontend.cache-results` is enabled, and `-query-frontend.results-cache-ttl-for-cardinality-query` or `-query-frontend.results-cache-ttl-for-labels-query` set to a value greater than 0. The following metrics have been added to track the query results cache hit ratio per `request_type`: #5212 #5235 #5426 #5524
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 // indirect
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/go-test/deep v1.1.0 // indirect
github.com/hashicorp/go-retryablehttp v0.7.4 // indirect
Expand All @@ -100,6 +99,7 @@ require (
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.1 // indirect
github.com/DmitriyVTitov/size v1.5.0 // indirect
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
Expand Down
16 changes: 13 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,11 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (

db, err := i.getOrCreateTSDB(userID, false)
if err != nil {
// Check for a particular per-instance limit and return that error directly
// since it contains extra information for gRPC and our logging middleware.
if errors.Is(err, errMaxTenantsReached) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logging middleware can handle wrapped errors, but unfortunately gRPC FromError() does not. I'm wondering if what we should here to make code more generic is check if the error is implementing GRPCStatus() and, if so, do not wrap it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wrong here as well. FromError() can unwrap too.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So why do we need this logic at all?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you improve the existing tests to assert on the returned error and check that gRPC status.FromError() returns the expected code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So why do we need this logic at all?

Because wrapWithUser() as it exists today doesn't actually "wrap" the error. It does something like errors.New(err.Error()) and so it returns a brand new error that doesn't have any relation to the original error or implement the GRPCStatus() or ShouldLog() methods. I could change it but it seemed like it purposefully didn't wrap the existing error.

Could you improve the existing tests to assert on the returned error and check that gRPC status.FromError() returns the expected code?

Sure, will do.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because wrapWithUser() as it exists today doesn't actually "wrap" the error.

Ouf... you're right, again.

I could change it but it seemed like it purposefully didn't wrap the existing error.

No, don't do it. I thin would be unsafe. There's some context here cortexproject/cortex#2004 but TL;DR is that some errors carry the series labels but these series labels are only safe to read during the execution of push() because they're unmarshalled from the write request into a pool. So before returning, we effectively "make a copy" to ensure the error will be safe after the push() has returned.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The errMaxTenantsReached and errMaxInMemorySeriesReached don't carry any label, so they're expected to be safe.

As a retrospective, we could have better handled the returned error, maybe implementing a specific interface for unsafe error messages, because it's currently very obscure. But that's for another day :)

Copy link
Contributor

@colega colega Sep 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a retrospective, we could have better handled the returned error, maybe implementing a specific interface for unsafe error messages, because it's currently very obscure. But that's for another day :)

IMO we should create an issue and tackle that, as otherwise this is a recipe for a disaster, and it's just a matter of time that it explodes somewhere.

Errors are something that is meant to escape your function, we shouldn't return errors that aren't safe to escape further than some point.

Edit: issue #6008

return nil, err
}
return nil, wrapWithUser(err, userID)
}

Expand Down Expand Up @@ -805,7 +810,12 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (
level.Warn(i.logger).Log("msg", "failed to rollback appender on error", "user", userID, "err", err)
}

return nil, err
// Check for a particular per-instance limit and return that error directly
// since it contains extra information for gRPC and our logging middleware.
if errors.Is(err, errMaxInMemorySeriesReached) {
return nil, err
}
return nil, wrapWithUser(err, userID)
}

// At this point all samples have been added to the appender, so we can track the time it took.
Expand Down Expand Up @@ -1038,7 +1048,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
}

// Otherwise, return a 500.
return wrapWithUser(err, userID)
return err
}

numNativeHistogramBuckets := -1
Expand Down Expand Up @@ -1079,7 +1089,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
continue
}

return wrapWithUser(err, userID)
return err
}
numNativeHistograms := len(ts.Histograms)
if numNativeHistograms > 0 {
Expand Down
34 changes: 22 additions & 12 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ import (
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/grafana/mimir/pkg/ingester/activeseries"
"github.com/grafana/mimir/pkg/ingester/client"
Expand All @@ -63,7 +65,6 @@ import (
"github.com/grafana/mimir/pkg/storage/tsdb/block"
"github.com/grafana/mimir/pkg/usagestats"
"github.com/grafana/mimir/pkg/util"
util_log "github.com/grafana/mimir/pkg/util/log"
util_math "github.com/grafana/mimir/pkg/util/math"
"github.com/grafana/mimir/pkg/util/push"
util_test "github.com/grafana/mimir/pkg/util/test"
Expand Down Expand Up @@ -5415,10 +5416,9 @@ func TestIngesterNoFlushWithInFlightRequest(t *testing.T) {

func TestIngester_PushInstanceLimits(t *testing.T) {
tests := map[string]struct {
limits InstanceLimits
reqs map[string][]*mimirpb.WriteRequest
expectedErr error
expectedErrType interface{}
limits InstanceLimits
reqs map[string][]*mimirpb.WriteRequest
expectedErr error
}{
"should succeed creating one user and series": {
limits: InstanceLimits{MaxInMemorySeries: 1, MaxInMemoryTenants: 1},
Expand Down Expand Up @@ -5461,7 +5461,7 @@ func TestIngester_PushInstanceLimits(t *testing.T) {
},
},

expectedErr: wrapWithUser(errMaxInMemorySeriesReached, "test"),
expectedErr: errMaxInMemorySeriesReached,
},

"should fail creating two users": {
Expand All @@ -5488,7 +5488,7 @@ func TestIngester_PushInstanceLimits(t *testing.T) {
),
},
},
expectedErr: wrapWithUser(errMaxTenantsReached, "user2"),
expectedErr: errMaxTenantsReached,
},

"should fail pushing samples in two requests due to rate limit": {
Expand Down Expand Up @@ -5557,9 +5557,12 @@ func TestIngester_PushInstanceLimits(t *testing.T) {
} else {
// Last push may expect error.
if testData.expectedErr != nil {
assert.Equal(t, testData.expectedErr, err)
} else if testData.expectedErrType != nil {
assert.True(t, errors.As(err, testData.expectedErrType), "expected error type %T, got %v", testData.expectedErrType, err)
assert.ErrorIs(t, err, testData.expectedErr)
var optional middleware.OptionalLogging
assert.ErrorAs(t, err, &optional)
s, ok := status.FromError(err)
require.True(t, ok, "expected to be able to convert to gRPC status")
assert.Equal(t, codes.Unavailable, s.Code())
} else {
assert.NoError(t, err)
}
Expand Down Expand Up @@ -5687,10 +5690,17 @@ func TestIngester_inflightPushRequests(t *testing.T) {

time.Sleep(10 * time.Millisecond) // Give first goroutine a chance to start pushing...
req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase"), 1, 1024)
var optional middleware.OptionalLogging

_, err := i.Push(ctx, req)
require.Equal(t, errMaxInflightRequestsReached, err)
require.ErrorAs(t, err, &util_log.DoNotLogError{})
require.ErrorIs(t, err, errMaxInflightRequestsReached)
require.ErrorAs(t, err, &optional)
require.False(t, optional.ShouldLog(ctx, time.Duration(0)), "expected not to log via .ShouldLog()")

s, ok := status.FromError(err)
require.True(t, ok, "expected to be able to convert to gRPC status")
require.Equal(t, codes.Unavailable, s.Code())

return nil
})

Expand Down
43 changes: 36 additions & 7 deletions pkg/ingester/instance_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
package ingester

import (
"context"
"flag"
"time"

"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"gopkg.in/yaml.v3"

"github.com/grafana/mimir/pkg/util/globalerror"
util_log "github.com/grafana/mimir/pkg/util/log"
)

const (
Expand All @@ -22,14 +24,41 @@ const (
maxInflightPushRequestsFlag = "ingester.instance-limits.max-inflight-push-requests"
)

// We don't include values in the messages for per-instance limits to avoid leaking Mimir cluster configuration to users.
var (
// We don't include values in the message to avoid leaking Mimir cluster configuration to users.
errMaxIngestionRateReached = errors.New(globalerror.IngesterMaxIngestionRate.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the samples ingestion rate limit", maxIngestionRateFlag))
errMaxTenantsReached = errors.New(globalerror.IngesterMaxTenants.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of tenants", maxInMemoryTenantsFlag))
errMaxInMemorySeriesReached = errors.New(globalerror.IngesterMaxInMemorySeries.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of in-memory series", maxInMemorySeriesFlag))
errMaxInflightRequestsReached = util_log.DoNotLogError{Err: errors.New(globalerror.IngesterMaxInflightPushRequests.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of inflight push requests", maxInflightPushRequestsFlag))}
errMaxIngestionRateReached = newInstanceLimitError(globalerror.IngesterMaxIngestionRate.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the samples ingestion rate limit", maxIngestionRateFlag))
errMaxTenantsReached = newInstanceLimitError(globalerror.IngesterMaxTenants.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of tenants", maxInMemoryTenantsFlag))
errMaxInMemorySeriesReached = newInstanceLimitError(globalerror.IngesterMaxInMemorySeries.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of in-memory series", maxInMemorySeriesFlag))
errMaxInflightRequestsReached = newInstanceLimitError(globalerror.IngesterMaxInflightPushRequests.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of inflight push requests", maxInflightPushRequestsFlag))
)

type instanceLimitErr struct {
msg string
status *status.Status
}

func newInstanceLimitError(msg string) error {
return &instanceLimitErr{
// Errors from hitting per-instance limits are always "unavailable" for gRPC
status: status.New(codes.Unavailable, msg),
msg: msg,
}
}

func (e *instanceLimitErr) ShouldLog(context.Context, time.Duration) bool {
// We increment metrics when hitting per-instance limits and so there's no need to
// log them, the error doesn't contain any interesting information for us.
return false
}
Comment on lines +48 to +52
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I read this correctly, then this will remove all logging, and as such should be prominently noted in the PR description.

I have an alternative proposal in #5584, to log 1 in N. I think there is useful information in the specific labels that cause problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, it will remove all logging for these per-instance errors. We already don't log anything for in-flight requests and all of these increment metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an alternative proposal in #5584, to log 1 in N. I think there is useful information in the specific labels that cause problems.

I don't understand, there are no labels here. This is only for per-instance (ingester) limits so there's no extra information to log beyond "this thing happened" -- unlike per-tenant limits which it seems like #5584 is addressing (and I agree, sampling is useful for that case).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bboreham Maybe there's a misunderstanding. This PR is about per-instance limits, which don't look be rate limited by the PR #5584 (as far as I can see).


func (e *instanceLimitErr) GRPCStatus() *status.Status {
return e.status
}

func (e *instanceLimitErr) Error() string {
return e.msg
}

// InstanceLimits describes limits used by ingester. Reaching any of these will result in Push method to return
// (internal) error.
type InstanceLimits struct {
Expand Down
34 changes: 34 additions & 0 deletions pkg/ingester/instance_limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@
package ingester

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/weaveworks/common/middleware"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"gopkg.in/yaml.v3"
)

Expand All @@ -34,3 +40,31 @@ max_tenants: 50000
require.Equal(t, int64(30), l.MaxInMemorySeries) // default value
require.Equal(t, int64(40), l.MaxInflightPushRequests) // default value
}

func TestInstanceLimitErr(t *testing.T) {
t.Run("bare error implements ShouldLog()", func(t *testing.T) {
var optional middleware.OptionalLogging
require.ErrorAs(t, errMaxInflightRequestsReached, &optional)
require.False(t, optional.ShouldLog(context.Background(), time.Duration(0)))
})

t.Run("wrapped error implements ShouldLog()", func(t *testing.T) {
err := fmt.Errorf("%w: oh no", errMaxTenantsReached)
var optional middleware.OptionalLogging
require.ErrorAs(t, err, &optional)
require.False(t, optional.ShouldLog(context.Background(), time.Duration(0)))
})

t.Run("bare error implements GRPCStatus()", func(t *testing.T) {
s, ok := status.FromError(errMaxInMemorySeriesReached)
require.True(t, ok, "expected to be able to convert to gRPC status")
require.Equal(t, codes.Unavailable, s.Code())
})

t.Run("wrapped error implements GRPCStatus()", func(t *testing.T) {
err := fmt.Errorf("%w: oh no", errMaxIngestionRateReached)
s, ok := status.FromError(err)
require.True(t, ok, "expected to be able to convert to gRPC status")
require.Equal(t, codes.Unavailable, s.Code())
})
}