From 855f892be1e3df4c7601ccbf03e9ffe696625c2d Mon Sep 17 00:00:00 2001 From: msaf1980 Date: Wed, 1 Feb 2023 00:35:03 +0500 Subject: [PATCH] limiter: refactor for enter on multi-targets render --- autocomplete/autocomplete.go | 4 +- find/handler.go | 2 +- helper/clickhouse/clickhouse.go | 9 ++- helper/clickhouse/external-data.go | 1 - helper/clickhouse/external-data_test.go | 1 - limiter/interface.go | 3 +- limiter/wlimiter.go | 4 +- prometheus/querier_select.go | 32 +++++++- render/data/multi_target.go | 46 +++++++++++- render/data/targets.go | 5 +- render/handler.go | 99 +++++++++++++------------ 11 files changed, 140 insertions(+), 66 deletions(-) diff --git a/autocomplete/autocomplete.go b/autocomplete/autocomplete.go index ee30160f8..668d9cc11 100644 --- a/autocomplete/autocomplete.go +++ b/autocomplete/autocomplete.go @@ -339,7 +339,7 @@ func (h *Handler) ServeTags(w http.ResponseWriter, r *http.Request) { } if err != nil { - status = clickhouse.HandleError(w, err) + status, _ = clickhouse.HandleError(w, err) return } readBytes = int64(len(body)) @@ -579,7 +579,7 @@ func (h *Handler) ServeValues(w http.ResponseWriter, r *http.Request) { } if err != nil { - status = clickhouse.HandleError(w, err) + status, _ = clickhouse.HandleError(w, err) return } diff --git a/find/handler.go b/find/handler.go index 8e53565c0..99c661b55 100644 --- a/find/handler.go +++ b/find/handler.go @@ -181,7 +181,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } if err != nil { - status = clickhouse.HandleError(w, err) + status, _ = clickhouse.HandleError(w, err) return } diff --git a/helper/clickhouse/clickhouse.go b/helper/clickhouse/clickhouse.go index 6a9064323..b6584763f 100644 --- a/helper/clickhouse/clickhouse.go +++ b/helper/clickhouse/clickhouse.go @@ -17,6 +17,7 @@ import ( "time" "github.com/lomik/graphite-clickhouse/helper/errs" + "github.com/lomik/graphite-clickhouse/limiter" "github.com/lomik/graphite-clickhouse/pkg/scope" "go.uber.org/zap" @@ -64,7 +65,7 @@ func extractClickhouseError(e string) (int, string) { return http.StatusInternalServerError, "Storage error" } -func HandleError(w http.ResponseWriter, err error) (status int) { +func HandleError(w http.ResponseWriter, err error) (status int, queueFail bool) { status = http.StatusOK errStr := err.Error() if err == ErrInvalidTimeRange { @@ -72,6 +73,12 @@ func HandleError(w http.ResponseWriter, err error) (status int) { http.Error(w, errStr, status) return } + if err == limiter.ErrTimeout || err == limiter.ErrOverflow { + queueFail = true + status = http.StatusServiceUnavailable + http.Error(w, err.Error(), status) + return + } if _, ok := err.(*ErrWithDescr); ok { status, errStr = extractClickhouseError(errStr) http.Error(w, errStr, status) diff --git a/helper/clickhouse/external-data.go b/helper/clickhouse/external-data.go index 6224aa491..297a65b5a 100644 --- a/helper/clickhouse/external-data.go +++ b/helper/clickhouse/external-data.go @@ -59,7 +59,6 @@ func (e *ExternalData) SetDebug(debugDir string, perm os.FileMode) { e.debug = nil } e.debug = &extDataDebug{debugDir, perm} - return } // buildBody returns multiform body, content type header and error diff --git a/helper/clickhouse/external-data_test.go b/helper/clickhouse/external-data_test.go index c39f53b17..d9eba4e1b 100644 --- a/helper/clickhouse/external-data_test.go +++ b/helper/clickhouse/external-data_test.go @@ -91,7 +91,6 @@ func TestBuildBody(t *testing.T) { b += "--" + contentID + "--\r\n" assert.Equal(t, b, body.String(), "built body and expected body don't match") } - return } func TestDebugDump(t *testing.T) { diff --git a/limiter/interface.go b/limiter/interface.go index b131ef29f..921f483f1 100644 --- a/limiter/interface.go +++ b/limiter/interface.go @@ -6,8 +6,7 @@ import ( ) var ErrTimeout = errors.New("timeout exceeded") -var ErrOverflow = errors.New("storage maximum read slot wait timeout") -var ErrConcurrency = errors.New("storage concurrent read slot wait timeout") +var ErrOverflow = errors.New("storage maximum queries exceeded") type ServerLimiter interface { Capacity() int diff --git a/limiter/wlimiter.go b/limiter/wlimiter.go index 71389ee8c..c979a9d66 100644 --- a/limiter/wlimiter.go +++ b/limiter/wlimiter.go @@ -48,7 +48,7 @@ func (sl *WLimiter) Enter(ctx context.Context, s string) (err error) { if sl.cL.enter(ctx, s) != nil { sl.l.leave(ctx, s) sl.m.WaitErrors.Add(1) - err = ErrConcurrency + err = ErrTimeout } } return @@ -66,7 +66,7 @@ func (sl *WLimiter) TryEnter(ctx context.Context, s string) (err error) { if sl.cL.tryEnter(ctx, s) != nil { sl.l.leave(ctx, s) sl.m.WaitErrors.Add(1) - err = ErrConcurrency + err = ErrTimeout } } return diff --git a/prometheus/querier_select.go b/prometheus/querier_select.go index 8c8818a78..4179220e5 100644 --- a/prometheus/querier_select.go +++ b/prometheus/querier_select.go @@ -4,10 +4,12 @@ package prometheus import ( + "context" "time" "github.com/lomik/graphite-clickhouse/config" "github.com/lomik/graphite-clickhouse/finder" + "github.com/lomik/graphite-clickhouse/limiter" "github.com/lomik/graphite-clickhouse/pkg/alias" "github.com/lomik/graphite-clickhouse/render/data" "github.com/prometheus/prometheus/model/labels" @@ -17,12 +19,30 @@ import ( // override in unit tests for stable results var timeNow = time.Now -func (q *Querier) lookup(from, until int64, labelsMatcher ...*labels.Matcher) (*alias.Map, error) { +func (q *Querier) lookup(from, until int64, qlimiter limiter.ServerLimiter, queueDuration *time.Duration, labelsMatcher ...*labels.Matcher) (*alias.Map, error) { terms, err := makeTaggedFromPromQL(labelsMatcher) if err != nil { return nil, err } - var stat finder.FinderStat + var ( + stat finder.FinderStat + limitCtx context.Context + cancel context.CancelFunc + ) + if qlimiter.Enabled() { + limitCtx, cancel = context.WithTimeout(q.ctx, q.config.ClickHouse.IndexTimeout) + defer cancel() + start := time.Now() + err = qlimiter.Enter(limitCtx, "render") + *queueDuration += time.Since(start) + if err != nil { + // status = http.StatusServiceUnavailable + // queueFail = true + // http.Error(w, err.Error(), status) + return nil, err + } + defer qlimiter.Leave(limitCtx, "render") + } // TODO: implement use stat for Prometheus queries fndResult, err := finder.FindTagged(q.config, q.ctx, terms, from, until, &stat) @@ -67,8 +87,12 @@ func (q *Querier) timeRange(hints *storage.SelectHints) (int64, int64) { // Select returns a set of series that matches the given label matchers. func (q *Querier) Select(sortSeries bool, hints *storage.SelectHints, labelsMatcher ...*labels.Matcher) storage.SeriesSet { + var ( + queueDuration time.Duration + ) from, until := q.timeRange(hints) - am, err := q.lookup(from, until, labelsMatcher...) + qlimiter := data.GetQueryLimiterFrom("", q.config, from, until) + am, err := q.lookup(from, until, qlimiter, &queueDuration, labelsMatcher...) if err != nil { return nil //, nil, err @TODO } @@ -96,7 +120,7 @@ func (q *Querier) Select(sortSeries bool, hints *storage.SelectHints, labelsMatc MaxDataPoints: maxDataPoints, }: data.NewTargets([]string{}, am), } - reply, err := multiTarget.Fetch(q.ctx, q.config, config.ContextPrometheus) + reply, err := multiTarget.Fetch(q.ctx, q.config, config.ContextPrometheus, qlimiter, &queueDuration) if err != nil { return nil // , nil, err @TODO } diff --git a/render/data/multi_target.go b/render/data/multi_target.go index 180f77790..3831db537 100644 --- a/render/data/multi_target.go +++ b/render/data/multi_target.go @@ -101,6 +101,21 @@ func GetQueryLimiter(username string, cfg *config.Config, m *MultiTarget) limite return cfg.ClickHouse.QueryParams[n].Limiter } +func GetQueryLimiterFrom(username string, cfg *config.Config, from, until int64) limiter.ServerLimiter { + n := 0 + if username != "" && len(cfg.ClickHouse.UserLimits) > 0 { + if u, ok := cfg.ClickHouse.UserLimits[username]; ok { + return u.Limiter + } + } + + if len(cfg.ClickHouse.QueryParams) > 1 { + n = config.GetQueryParam(cfg.ClickHouse.QueryParams, time.Second*time.Duration(until-from)) + } + + return cfg.ClickHouse.QueryParams[n].Limiter +} + func GetQueryParam(username string, cfg *config.Config, m *MultiTarget) (*config.QueryParam, int) { n := 0 @@ -120,9 +135,12 @@ func GetQueryParam(username string, cfg *config.Config, m *MultiTarget) (*config } // Fetch fetches the parsed ClickHouse data returns CHResponses -func (m *MultiTarget) Fetch(ctx context.Context, cfg *config.Config, chContext string) (CHResponses, error) { - var lock sync.RWMutex - var wg sync.WaitGroup +func (m *MultiTarget) Fetch(ctx context.Context, cfg *config.Config, chContext string, qlimiter limiter.ServerLimiter, queueDuration *time.Duration) (CHResponses, error) { + var ( + lock sync.RWMutex + wg sync.WaitGroup + entered int + ) logger := scope.Logger(ctx) setCarbonlinkClient(&cfg.Carbonlink) @@ -135,7 +153,12 @@ func (m *MultiTarget) Fetch(ctx context.Context, cfg *config.Config, chContext s dataTimeout := getDataTimeout(cfg, m) ctxTimeout, cancel := context.WithTimeout(ctx, dataTimeout) - defer cancel() + defer func() { + for i := 0; i < entered; i++ { + qlimiter.Leave(ctxTimeout, "render") + } + cancel() + }() errors := make([]error, 0, len(*m)) query := newQuery(cfg, len(*m)) @@ -154,6 +177,21 @@ func (m *MultiTarget) Fetch(ctx context.Context, cfg *config.Config, chContext s logger.Error("data tables is not specified", zap.Error(err)) return EmptyResponse(), err } + if qlimiter.Enabled() { + start := time.Now() + err = qlimiter.Enter(ctxTimeout, "render") + *queueDuration += time.Since(start) + if err != nil { + // status = http.StatusServiceUnavailable + // queueFail = true + // http.Error(w, err.Error(), status) + lock.Lock() + errors = append(errors, err) + lock.Unlock() + break + } + entered++ + } wg.Add(1) go func(cond *conditions) { defer wg.Done() diff --git a/render/data/targets.go b/render/data/targets.go index d651ea51b..c6da231d3 100644 --- a/render/data/targets.go +++ b/render/data/targets.go @@ -22,8 +22,9 @@ type Cache struct { // Targets represents requested metrics type Targets struct { // List contains queried metrics, e.g. [metric.{name1,name2}, metric.name[3-9]] - List []string - Cache []Cache + List []string + Cache []Cache + Cached bool // all is cached // AM stores found expanded metrics AM *alias.Map pointsTable string diff --git a/render/handler.go b/render/handler.go index 6549031f2..115da5474 100644 --- a/render/handler.go +++ b/render/handler.go @@ -76,7 +76,6 @@ func (h *Handler) finderCached(ts time.Time, fetchRequests data.MultiTarget, log body, err := h.config.Common.FindCache.Get(targets.Cache[n].Key) if err == nil { if len(body) > 0 { - cachedFind++ targets.Cache[n].M.CacheHits.Add(1) var f finder.Finder if strings.HasPrefix(target, "seriesByTag(") { @@ -107,14 +106,43 @@ func (h *Handler) finderCached(ts time.Time, fetchRequests data.MultiTarget, log wg.Wait() if len(errors) != 0 { err = errors[0] + return + } + for _, targets := range fetchRequests { + var cached int + for _, c := range targets.Cache { + if c.Cached { + cached++ + } + } + cachedFind += cached + if cached == len(targets.Cache) { + targets.Cached = true + } } return } // try to fetch finder queries -func (h *Handler) finder(fetchRequests data.MultiTarget, ctx context.Context, logger *zap.Logger, metricsLen *int, useCache bool) (maxDuration int64, err error) { - var wg sync.WaitGroup - var lock sync.RWMutex +func (h *Handler) finder(fetchRequests data.MultiTarget, ctx context.Context, logger *zap.Logger, qlimiter limiter.ServerLimiter, metricsLen *int, queueDuration *time.Duration, useCache bool) (maxDuration int64, err error) { + var ( + wg sync.WaitGroup + lock sync.RWMutex + entered int + limitCtx context.Context + cancel context.CancelFunc + ) + if qlimiter.Enabled() { + // no reason wait longer than index-timeout + limitCtx, cancel = context.WithTimeout(ctx, h.config.ClickHouse.IndexTimeout) + defer func() { + for i := 0; i < entered; i++ { + qlimiter.Leave(limitCtx, "render") + } + defer cancel() + }() + } + errors := make([]error, 0, len(fetchRequests)) for tf, targets := range fetchRequests { for i, expr := range targets.List { @@ -125,6 +153,18 @@ func (h *Handler) finder(fetchRequests data.MultiTarget, ctx context.Context, lo if targets.Cache[i].Cached { continue } + if qlimiter.Enabled() { + start := time.Now() + err = qlimiter.Enter(limitCtx, "render") + *queueDuration += time.Since(start) + if err != nil { + lock.Lock() + errors = append(errors, err) + lock.Unlock() + break + } + entered++ + } wg.Add(1) go func(tf data.TimeFrame, target string, targets *data.Targets, n int) { defer wg.Done() @@ -193,7 +233,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { r = r.WithContext(scope.WithLogger(r.Context(), logger)) username := w.Header().Get("X-Forwarded-User") - var limiter limiter.ServerLimiter = limiter.NoopLimiter{} + var qlimiter limiter.ServerLimiter = limiter.NoopLimiter{} defer func() { if rec := recover(); rec != nil { @@ -208,7 +248,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } end := time.Now() logs.AccessLog(accessLogger, h.config, r, status, end.Sub(start), queueDuration, cachedFind, queueFail) - limiter.SendDuration(queueDuration.Milliseconds()) + qlimiter.SendDuration(queueDuration.Milliseconds()) metrics.SendRenderMetrics(metrics.RenderRequestMetric, status, start, fetchStart, end, maxDuration, h.config.Metrics.ExtendedStat, int64(metricsLen), pointsCount) }() @@ -231,42 +271,14 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if tf.From >= tf.Until { // wrong duration if err != nil { - status = clickhouse.HandleError(w, clickhouse.ErrInvalidTimeRange) + status, _ = clickhouse.HandleError(w, clickhouse.ErrInvalidTimeRange) return } } targetsLen += len(targets.List) } - var ( - entered bool - ctx context.Context - cancel context.CancelFunc - ) - limiter = data.GetQueryLimiter(username, h.config, &fetchRequests) - if limiter.Enabled() { - // no reason wait longer than index-timeout - ctx, cancel = context.WithTimeout(context.Background(), h.config.ClickHouse.IndexTimeout) - defer cancel() - - err = limiter.Enter(ctx, "render") - queueDuration = time.Since(start) - if err != nil { - status = http.StatusServiceUnavailable - queueFail = true - logger.Error(err.Error()) - http.Error(w, err.Error(), status) - return - } - queueDuration = time.Since(start) - entered = true - defer func() { - if entered { - limiter.Leave(ctx, "render") - entered = false - } - }() - } + qlimiter = data.GetQueryLimiter(username, h.config, &fetchRequests) var maxCacheTimeoutStr string useCache := h.config.Common.FindCache != nil && !parser.TruthyBool(r.FormValue("noCache")) @@ -275,7 +287,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var cached int cached, maxCacheTimeoutStr, err = h.finderCached(start, fetchRequests, logger, &metricsLen) if err != nil { - status = clickhouse.HandleError(w, err) + status, _ = clickhouse.HandleError(w, err) return } if cached > 0 { @@ -289,9 +301,9 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - maxDuration, err = h.finder(fetchRequests, r.Context(), logger, &metricsLen, useCache) + maxDuration, err = h.finder(fetchRequests, r.Context(), logger, qlimiter, &metricsLen, &queueDuration, useCache) if err != nil { - status = clickhouse.HandleError(w, err) + status, queueFail = clickhouse.HandleError(w, err) return } @@ -308,14 +320,9 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { fetchStart = time.Now() - reply, err := fetchRequests.Fetch(r.Context(), h.config, config.ContextGraphite) - if entered { - // release early as possible - limiter.Leave(ctx, "render") - entered = false - } + reply, err := fetchRequests.Fetch(r.Context(), h.config, config.ContextGraphite, qlimiter, &queueDuration) if err != nil { - status = clickhouse.HandleError(w, err) + status, queueFail = clickhouse.HandleError(w, err) return }