Skip to content

Commit

Permalink
Comment unsafe memory usage in ingester push path (#2004)
Browse files Browse the repository at this point in the history
* Wrap ingester Push errors to avoid retaining any reference to unsafe data
* Comment unsafe memory usage in ingester push path

Signed-off-by: Bryan Boreham <bryan@weave.works>
  • Loading branch information
bboreham authored Feb 28, 2020
1 parent c0db39b commit 9d69608
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 16 deletions.
17 changes: 6 additions & 11 deletions pkg/ingester/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"net/http"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/httpgrpc"
)
Expand Down Expand Up @@ -50,11 +49,6 @@ func makeMetricLimitError(errorType string, labels labels.Labels, err error) err
}
}

func (e *validationError) WrapWithUser(userID string) *validationError {
e.err = wrapWithUser(e.err, userID)
return e
}

func (e *validationError) Error() string {
if e.err == nil {
return e.errorType
Expand All @@ -65,14 +59,15 @@ func (e *validationError) Error() string {
return fmt.Sprintf("%s for series %s", e.err.Error(), e.labels.String())
}

// WrappedError returns a HTTP gRPC error than is correctly forwarded over gRPC.
func (e *validationError) WrappedError() error {
// returns a HTTP gRPC error than is correctly forwarded over gRPC, with no reference to `e` retained.
func grpcForwardableError(userID string, code int, e error) error {
return httpgrpc.ErrorFromHTTPResponse(&httpgrpc.HTTPResponse{
Code: int32(e.code),
Body: []byte(e.Error()),
Code: int32(code),
Body: []byte(wrapWithUser(e, userID).Error()),
})
}

// Note: does not retain a reference to `err`
func wrapWithUser(err error, userID string) error {
return errors.Wrapf(err, "user=%s", userID)
return fmt.Errorf("user=%s: %s", userID, err)
}
8 changes: 6 additions & 2 deletions pkg/ingester/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ func New() *InvertedIndex {
}

// Add a fingerprint under the specified labels.
// NOTE: memory for `labels` is unsafe; anything retained beyond the
// life of this function must be copied
func (ii *InvertedIndex) Add(labels []client.LabelAdapter, fp model.Fingerprint) labels.Labels {
shard := &ii.shards[util.HashFP(fp)%indexShards]
return shard.add(labels, fp)
return shard.add(labels, fp) // add() returns 'interned' values so the original labels are not retained
}

// Lookup all fingerprints for the provided matchers.
Expand Down Expand Up @@ -109,7 +111,9 @@ func copyString(s string) string {
return string([]byte(s))
}

// add metric to the index; return all the name/value pairs as strings from the index, sorted
// add metric to the index; return all the name/value pairs as a fresh
// sorted slice, referencing 'interned' strings from the index so that
// no references are retained to the memory of `metric`.
func (shard *indexShard) add(metric []client.LabelAdapter, fp model.Fingerprint) labels.Labels {
shard.mtx.Lock()
defer shard.mtx.Unlock()
Expand Down
12 changes: 10 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.
return i.v2Push(ctx, req)
}

// NOTE: because we use `unsafe` in deserialisation, we must not
// retain anything from `req` past the call to ReuseSlice
defer client.ReuseSlice(req.Timeseries)

userID, err := user.ExtractOrgID(ctx)
Expand All @@ -298,6 +300,7 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.

for _, ts := range req.Timeseries {
for _, s := range ts.Samples {
// append() copies the memory in `ts.Labels` except on the error path
err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source, record)
if err == nil {
continue
Expand All @@ -309,12 +312,14 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.
continue
}

return nil, wrapWithUser(err, userID)
// non-validation error: abandon this request
return nil, grpcForwardableError(userID, http.StatusInternalServerError, err)
}
}

if lastPartialErr != nil {
return &client.WriteResponse{}, lastPartialErr.WrapWithUser(userID).WrappedError()
// grpcForwardableError turns the error into a string so it no longer references `req`
return &client.WriteResponse{}, grpcForwardableError(userID, lastPartialErr.code, lastPartialErr)
}

if record != nil {
Expand All @@ -328,6 +333,8 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.
return &client.WriteResponse{}, nil
}

// NOTE: memory for `labels` is unsafe; anything retained beyond the
// life of this function must be copied
func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum, record *Record) error {
labels.removeBlanks()

Expand All @@ -346,6 +353,7 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs,
return fmt.Errorf("ingester stopping")
}

// getOrCreateSeries copies the memory for `labels`, except on the error path.
state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels, record)
if err != nil {
if ve, ok := err.(*validationError); ok {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ func (i *Ingester) updateLoop() {
func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
var firstPartialErr error

// NOTE: because we use `unsafe` in deserialisation, we must not
// retain anything from `req` past the call to ReuseSlice
defer client.ReuseSlice(req.Timeseries)

userID, err := user.ExtractOrgID(ctx)
Expand Down
9 changes: 8 additions & 1 deletion pkg/ingester/user_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,17 @@ func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, erro
return state, ok, nil
}

// NOTE: memory for `labels` is unsafe; anything retained beyond the
// life of this function must be copied
func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labels []client.LabelAdapter, record *Record) (*userState, model.Fingerprint, *memorySeries, error) {
state := us.getOrCreate(userID)
// WARNING: `err` may have a reference to unsafe memory in `labels`
fp, series, err := state.getSeries(labels, record)
return state, fp, series, err
}

// NOTE: memory for `metric` is unsafe; anything retained beyond the
// life of this function must be copied
func (u *userState) getSeries(metric labelPairs, record *Record) (model.Fingerprint, *memorySeries, error) {
rawFP := client.FastFingerprint(metric)
u.fpLocker.Lock(rawFP)
Expand Down Expand Up @@ -198,6 +203,7 @@ func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric lab
}
}

// MetricNameFromLabelAdapters returns a copy of the string in `metric`
metricName, err := extract.MetricNameFromLabelAdapters(metric)
if err != nil {
return nil, err
Expand All @@ -206,6 +212,7 @@ func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric lab
if !recovery {
// Check if the per-metric limit has been exceeded
if err = u.canAddSeriesFor(string(metricName)); err != nil {
// WARNING: returns a reference to `metric`
return nil, makeMetricLimitError(perMetricSeriesLimit, client.FromLabelAdaptersToLabels(metric), err)
}
}
Expand All @@ -220,7 +227,7 @@ func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric lab
})
}

labels := u.index.Add(metric, fp)
labels := u.index.Add(metric, fp) // Add() returns 'interned' values so the original labels are not retained
series := newMemorySeries(labels)
u.fpToSeries.put(fp, series)

Expand Down

0 comments on commit 9d69608

Please sign in to comment.