diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b3079e456..a43c4e9829 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,11 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7080](https://github.com/thanos-io/thanos/pull/7080) Receive: race condition in handler Close() when stopped early - [#7132](https://github.com/thanos-io/thanos/pull/7132) Documentation: fix broken helm installation instruction - [#7134](https://github.com/thanos-io/thanos/pull/7134) Store, Compact: Revert the recursive block listing mechanism introduced in https://github.com/thanos-io/thanos/pull/6474 and use the same strategy as in 0.31. Introduce a `--block-discovery-strategy` flag to control the listing strategy so that a recursive lister can still be used if the tradeoff of slower but cheaper discovery is preferred. +- [#7122](https://github.com/thanos-io/thanos/pull/7122) Store Gateway: Fix lazy expanded postings estimate base cardinality using posting group with remove keys. +- [#7224](https://github.com/thanos-io/thanos/pull/7224) Query-frontend: Add Redis username to the client configuration. +- [#7220](https://github.com/thanos-io/thanos/pull/7220) Store Gateway: Fix lazy expanded postings caching partial expanded postings and bug of estimating remove postings with non existent value. Added `PromQLSmith` based fuzz test to improve correctness. +- [#7244](https://github.com/thanos-io/thanos/pull/7244) Query: Fix Internal Server Error unknown targetHealth: "unknown" when trying to open the targets page. +- [#7248](https://github.com/thanos-io/thanos/pull/7248) Receive: Fix RemoteWriteAsync was sequentially executed causing high latency in the ingestion path. ### Added - [#7105](https://github.com/thanos-io/thanos/pull/7105) Rule: add flag `--query.enable-x-functions` to allow usage of extended promql functions (xrate, xincrease, xdelta) in loaded rules diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 17066d131f..e14cf87650 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -237,24 +237,25 @@ func runReceive( } webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ - Writer: writer, - ListenAddress: conf.rwAddress, - Registry: reg, - Endpoint: conf.endpoint, - TenantHeader: conf.tenantHeader, - TenantField: conf.tenantField, - DefaultTenantID: conf.defaultTenantID, - ReplicaHeader: conf.replicaHeader, - ReplicationFactor: conf.replicationFactor, - RelabelConfigs: relabelConfig, - ReceiverMode: receiveMode, - Tracer: tracer, - TLSConfig: rwTLSConfig, - DialOpts: dialOpts, - ForwardTimeout: time.Duration(*conf.forwardTimeout), - MaxBackoff: time.Duration(*conf.maxBackoff), - TSDBStats: dbs, - Limiter: limiter, + Writer: writer, + ListenAddress: conf.rwAddress, + Registry: reg, + Endpoint: conf.endpoint, + TenantHeader: conf.tenantHeader, + TenantField: conf.tenantField, + DefaultTenantID: conf.defaultTenantID, + ReplicaHeader: conf.replicaHeader, + ReplicationFactor: conf.replicationFactor, + RelabelConfigs: relabelConfig, + ReceiverMode: receiveMode, + Tracer: tracer, + TLSConfig: rwTLSConfig, + DialOpts: dialOpts, + ForwardTimeout: time.Duration(*conf.forwardTimeout), + MaxBackoff: time.Duration(*conf.maxBackoff), + TSDBStats: dbs, + Limiter: limiter, + AsyncForwardWorkerCount: conf.asyncForwardWorkerCount, }) grpcProbe := prober.NewGRPC() diff --git a/pkg/pool/worker_pool.go b/pkg/pool/worker_pool.go new file mode 100644 index 0000000000..b1e39bde78 --- /dev/null +++ b/pkg/pool/worker_pool.go @@ -0,0 +1,73 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package pool + +import ( + "context" + "sync" +) + +// Work is a unit of item to be worked on, like Java Runnable. +type Work func() + +// WorkerPool is a pool of goroutines that are reusable, similar to Java ThreadPool. +type WorkerPool interface { + // Init initializes the worker pool. + Init() + + // Go waits until the next worker becomes available and executes the given work. + Go(work Work) + + // Close cancels all workers and waits for them to finish. + Close() + + // Size returns the number of workers in the pool. + Size() int +} + +type workerPool struct { + sync.Once + workCh chan Work + cancel context.CancelFunc +} + +func NewWorkerPool(workers uint) WorkerPool { + return &workerPool{ + workCh: make(chan Work, workers), + } +} + +func (p *workerPool) Init() { + p.Do(func() { + ctx, cancel := context.WithCancel(context.Background()) + p.cancel = cancel + + for i := 0; i < p.Size(); i++ { + go func() { + for { + select { + case <-ctx.Done(): + // TODO: exhaust workCh before exit + return + case work := <-p.workCh: + work() + } + } + }() + } + }) +} + +func (p *workerPool) Go(work Work) { + p.Init() + p.workCh <- work +} + +func (p *workerPool) Close() { + p.cancel() +} + +func (p *workerPool) Size() int { + return cap(p.workCh) +} diff --git a/pkg/pool/worker_pool_test.go b/pkg/pool/worker_pool_test.go new file mode 100644 index 0000000000..bbd73ec6b4 --- /dev/null +++ b/pkg/pool/worker_pool_test.go @@ -0,0 +1,35 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package pool + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGo(t *testing.T) { + var expectedWorksDone uint32 + var workerPoolSize uint + var mu sync.Mutex + workerPoolSize = 5 + p := NewWorkerPool(workerPoolSize) + p.Init() + defer p.Close() + + var wg sync.WaitGroup + for i := 0; i < int(workerPoolSize*3); i++ { + wg.Add(1) + p.Go(func() { + mu.Lock() + defer mu.Unlock() + expectedWorksDone++ + wg.Done() + }) + } + wg.Wait() + require.Equal(t, uint32(workerPoolSize*3), expectedWorksDone) + require.Equal(t, int(workerPoolSize), p.Size()) +} diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 5d15bafef3..f2de570fef 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -44,6 +44,7 @@ import ( "github.com/thanos-io/thanos/pkg/logging" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" + "github.com/thanos-io/thanos/pkg/pool" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/server/http/middleware" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -144,6 +145,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler { if workers == 0 { workers = 1 } + level.Info(logger).Log("msg", "Starting receive handler with async forward workers", "workers", workers) h := &Handler{ logger: logger, @@ -1266,90 +1268,23 @@ func newReplicationErrors(threshold, numErrors int) []*replicationErrors { return errs } -func (pw *peerWorker) initWorkers() { - pw.initWorkersOnce.Do(func() { - work := make(chan peerWorkItem) - pw.work = work - - ctx, cancel := context.WithCancel(context.Background()) - pw.turnOffGoroutines = cancel - - for i := 0; i < int(pw.asyncWorkerCount); i++ { - go func() { - for { - select { - case <-ctx.Done(): - return - case w := <-work: - pw.forwardDelay.Observe(time.Since(w.sendTime).Seconds()) - - tracing.DoInSpan(w.workItemCtx, "receive_forward", func(ctx context.Context) { - _, err := storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, w.req) - w.workResult <- peerWorkResponse{ - er: w.er, - err: errors.Wrapf(err, "forwarding request to endpoint %v", w.er.endpoint), - } - if err != nil { - sp := trace.SpanFromContext(ctx) - sp.SetAttributes(attribute.Bool("error", true)) - sp.SetAttributes(attribute.String("error.msg", err.Error())) - } - close(w.workResult) - }, opentracing.Tags{ - "endpoint": w.er.endpoint, - "replica": w.er.replica, - }) - - } - } - }() - } - - }) -} - func newPeerWorker(cc *grpc.ClientConn, forwardDelay prometheus.Histogram, asyncWorkerCount uint) *peerWorker { return &peerWorker{ - cc: cc, - asyncWorkerCount: asyncWorkerCount, - forwardDelay: forwardDelay, + cc: cc, + wp: pool.NewWorkerPool(asyncWorkerCount), + forwardDelay: forwardDelay, } } -type peerWorkItem struct { - cc *grpc.ClientConn - req *storepb.WriteRequest - workItemCtx context.Context - - workResult chan peerWorkResponse - er endpointReplica - sendTime time.Time -} - func (pw *peerWorker) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) { - pw.initWorkers() - - w := peerWorkItem{ - cc: pw.cc, - req: in, - workResult: make(chan peerWorkResponse, 1), - workItemCtx: ctx, - sendTime: time.Now(), - } - - pw.work <- w - return nil, (<-w.workResult).err + return storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, in) } type peerWorker struct { cc *grpc.ClientConn + wp pool.WorkerPool - work chan peerWorkItem - turnOffGoroutines func() - - initWorkersOnce sync.Once - asyncWorkerCount uint - forwardDelay prometheus.Histogram + forwardDelay prometheus.Histogram } func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, dialOpts ...grpc.DialOption) peersContainer { @@ -1373,29 +1308,29 @@ type peersContainer interface { reset() } -type peerWorkResponse struct { - er endpointReplica - err error -} - func (p *peerWorker) RemoteWriteAsync(ctx context.Context, req *storepb.WriteRequest, er endpointReplica, seriesIDs []int, responseWriter chan writeResponse, cb func(error)) { - p.initWorkers() - - w := peerWorkItem{ - cc: p.cc, - req: req, - workResult: make(chan peerWorkResponse, 1), - workItemCtx: ctx, - er: er, - - sendTime: time.Now(), - } - - p.work <- w - res := <-w.workResult - - responseWriter <- newWriteResponse(seriesIDs, res.err, er) - cb(res.err) + now := time.Now() + p.wp.Go(func() { + p.forwardDelay.Observe(time.Since(now).Seconds()) + + tracing.DoInSpan(ctx, "receive_forward", func(ctx context.Context) { + _, err := storepb.NewWriteableStoreClient(p.cc).RemoteWrite(ctx, req) + responseWriter <- newWriteResponse( + seriesIDs, + errors.Wrapf(err, "forwarding request to endpoint %v", er.endpoint), + er, + ) + if err != nil { + sp := trace.SpanFromContext(ctx) + sp.SetAttributes(attribute.Bool("error", true)) + sp.SetAttributes(attribute.String("error.msg", err.Error())) + } + cb(err) + }, opentracing.Tags{ + "endpoint": er.endpoint, + "replica": er.replica, + }) + }) } type peerGroup struct { @@ -1423,7 +1358,7 @@ func (p *peerGroup) close(addr string) error { return nil } - p.connections[addr].turnOffGoroutines() + p.connections[addr].wp.Close() delete(p.connections, addr) if err := c.cc.Close(); err != nil { return fmt.Errorf("closing connection for %s", addr)