From 6c4d90960b7d47b16afee74621543e4ea35ea296 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Wed, 7 Mar 2018 13:07:20 -0700 Subject: [PATCH 1/2] Perform ingester pushes in a background context rather than the request context --- pkg/distributor/distributor.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 0998d62efa..7868f43fd0 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -14,6 +14,7 @@ import ( "golang.org/x/time/rate" "github.com/go-kit/kit/log/level" + opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" @@ -350,7 +351,14 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie defer cancel() // cancel the timeout to release resources for ingester, samples := range samplesByIngester { go func(ingester *ring.IngesterDesc, samples []*sampleTracker) { - d.sendSamples(ctx, ingester, samples, &pushTracker) + // Use a background context to make sure all ingesters get samples even if we return early + localCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + localCtx = user.InjectOrgID(localCtx, userID) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + localCtx = opentracing.ContextWithSpan(localCtx, sp) + } + d.sendSamples(localCtx, ingester, samples, &pushTracker) }(ingester, samples) } select { From 3823a14a034a69f99609b85a8f83afff6e17b336 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Wed, 7 Mar 2018 17:02:44 -0700 Subject: [PATCH 2/2] Remove timeout on overall context, and add configured timeout to each sample push --- pkg/distributor/distributor.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 7868f43fd0..f3552b2995 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -347,12 +347,10 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie done: make(chan struct{}), err: make(chan error), } - ctx, cancel := context.WithTimeout(ctx, d.cfg.RemoteTimeout) - defer cancel() // cancel the timeout to release resources for ingester, samples := range samplesByIngester { go func(ingester *ring.IngesterDesc, samples []*sampleTracker) { // Use a background context to make sure all ingesters get samples even if we return early - localCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout) defer cancel() localCtx = user.InjectOrgID(localCtx, userID) if sp := opentracing.SpanFromContext(ctx); sp != nil {