diff --git a/pkg/api/common.go b/pkg/api/common.go index 8ad58a2f1f..4830c3d2f2 100644 --- a/pkg/api/common.go +++ b/pkg/api/common.go @@ -24,6 +24,7 @@ import ( "github.com/timescale/promscale/pkg/log" pgmodel "github.com/timescale/promscale/pkg/pgmodel/model" "github.com/timescale/promscale/pkg/promql" + "github.com/timescale/promscale/pkg/query" "github.com/timescale/promscale/pkg/tenancy" ) @@ -132,12 +133,12 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config { fs.Var(&cfg.PromscaleEnabledFeatureList, "enable-feature", "Enable beta/experimental features as a comma-separated list. Currently the following values can be passed: promql-at-modifier, promql-negative-offset") - fs.DurationVar(&cfg.MaxQueryTimeout, "metrics.promql.query-timeout", 2*time.Minute, "Maximum time a query may take before being aborted. This option sets both the default and maximum value of the 'timeout' parameter in "+ + fs.DurationVar(&cfg.MaxQueryTimeout, "metrics.promql.query-timeout", query.DefaultQueryTimeout, "Maximum time a query may take before being aborted. This option sets both the default and maximum value of the 'timeout' parameter in "+ "'/api/v1/query.*' endpoints.") - fs.DurationVar(&cfg.SubQueryStepInterval, "metrics.promql.default-subquery-step-interval", 1*time.Minute, "Default step interval to be used for PromQL subquery evaluation. "+ + fs.DurationVar(&cfg.SubQueryStepInterval, "metrics.promql.default-subquery-step-interval", query.DefaultSubqueryStepInterval, "Default step interval to be used for PromQL subquery evaluation. "+ "This value is used if the subquery does not specify the step value explicitly. Example: [30m:]. Note: in Prometheus this setting is set by the evaluation_interval option.") - fs.DurationVar(&cfg.LookBackDelta, "metrics.promql.lookback-delta", time.Minute*5, "Maximum lookback duration for retrieving metrics during expression evaluations and federation.") - fs.Int64Var(&cfg.MaxSamples, "metrics.promql.max-samples", 50000000, "Maximum number of samples a single "+ + fs.DurationVar(&cfg.LookBackDelta, "metrics.promql.lookback-delta", query.DefaultLookBackDelta, "Maximum lookback duration for retrieving metrics during expression evaluations and federation.") + fs.Int64Var(&cfg.MaxSamples, "metrics.promql.max-samples", query.DefaultMaxSamples, "Maximum number of samples a single "+ "query can load into memory. Note that queries will fail if they try to load more samples than this into memory, "+ "so this also limits the number of samples a query can return.") fs.Int64Var(&cfg.MaxPointsPerTs, "metrics.promql.max-points-per-ts", 11000, "Maximum number of points per time-series in a query-range request. "+ diff --git a/pkg/api/parser/text/text.go b/pkg/api/parser/text/text.go index 776a184112..24957e2d38 100644 --- a/pkg/api/parser/text/text.go +++ b/pkg/api/parser/text/text.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/textparse" "github.com/timescale/promscale/pkg/prompb" + "github.com/timescale/promscale/pkg/util" ) var timeProvider = time.Now @@ -26,7 +27,6 @@ func ParseRequest(r *http.Request, wr *prompb.WriteRequest) error { p = textparse.New(b, r.Header.Get("Content-Type")) defTime = int64(model.TimeFromUnixNano(timeProvider().UnixNano())) et textparse.Entry - ll []prompb.Label ) for { @@ -55,14 +55,7 @@ func ParseRequest(r *http.Request, wr *prompb.WriteRequest) error { var lset labels.Labels _ = p.Metric(&lset) - ll = make([]prompb.Label, 0, len(lset)) - - for i := range lset { - ll = append(ll, prompb.Label{ - Name: lset[i].Name, - Value: lset[i].Value, - }) - } + ll := util.LabelToPrompbLabels(lset) wr.Timeseries = append(wr.Timeseries, prompb.TimeSeries{ Labels: ll, diff --git a/pkg/pgmodel/cache/series_cache.go b/pkg/pgmodel/cache/series_cache.go index c17dedd825..fe0641092a 100644 --- a/pkg/pgmodel/cache/series_cache.go +++ b/pkg/pgmodel/cache/series_cache.go @@ -251,7 +251,6 @@ func (t *SeriesCacheImpl) GetSeriesFromProtos(labelPairs []prompb.Label) (*model if err != nil { return nil, "", err } - var series *model.Series useByteAsStringNoCopy(builder.Bytes(), func(key string) { series = t.loadSeries(key) diff --git a/pkg/pgmodel/ingestor/ingestor.go b/pkg/pgmodel/ingestor/ingestor.go index 3d17f800d4..0322afb0bf 100644 --- a/pkg/pgmodel/ingestor/ingestor.go +++ b/pkg/pgmodel/ingestor/ingestor.go @@ -31,7 +31,7 @@ type Cfg struct { // DBIngestor ingest the TimeSeries data into Timescale database. type DBIngestor struct { - sCache cache.SeriesCache + SCache cache.SeriesCache dispatcher model.Dispatcher tWriter trace.Writer } @@ -44,7 +44,7 @@ func NewPgxIngestor(conn pgxconn.PgxConn, cache cache.MetricCache, sCache cache. return nil, err } return &DBIngestor{ - sCache: sCache, + SCache: sCache, dispatcher: dispatcher, tWriter: trace.NewWriter(conn), }, nil @@ -184,7 +184,7 @@ func (ingestor *DBIngestor) ingestTimeseries(ctx context.Context, timeseries []p } // Normalize and canonicalize t.Labels. // After this point t.Labels should never be used again. - series, metricName, err = ingestor.sCache.GetSeriesFromProtos(ts.Labels) + series, metricName, err = ingestor.SCache.GetSeriesFromProtos(ts.Labels) if err != nil { return 0, err } @@ -254,6 +254,10 @@ func (ingestor *DBIngestor) ingestMetadata(ctx context.Context, metadata []promp return numMetadataIngested, nil } +func (ingestor *DBIngestor) Dispatcher() model.Dispatcher { + return ingestor.dispatcher +} + // Parts of metric creation not needed to insert data func (ingestor *DBIngestor) CompleteMetricCreation(ctx context.Context) error { return ingestor.dispatcher.CompleteMetricCreation(ctx) diff --git a/pkg/pgmodel/ingestor/ingestor_test.go b/pkg/pgmodel/ingestor/ingestor_test.go index fe7be6601b..1f2d146397 100644 --- a/pkg/pgmodel/ingestor/ingestor_test.go +++ b/pkg/pgmodel/ingestor/ingestor_test.go @@ -263,7 +263,7 @@ func TestDBIngestorIngest(t *testing.T) { } i := DBIngestor{ dispatcher: &inserter, - sCache: sCache, + SCache: sCache, } wr := NewWriteRequest() diff --git a/pkg/query/query_engine.go b/pkg/query/query_engine.go index 0fc77b605f..7b2d772fa3 100644 --- a/pkg/query/query_engine.go +++ b/pkg/query/query_engine.go @@ -12,6 +12,13 @@ import ( "github.com/timescale/promscale/pkg/promql" ) +const ( + DefaultQueryTimeout = time.Minute * 2 + DefaultLookBackDelta = time.Minute * 5 + DefaultSubqueryStepInterval = time.Minute + DefaultMaxSamples = 50000000 +) + func NewEngine(logger log.Logger, queryTimeout, lookBackDelta, subqueryDefaultStepInterval time.Duration, maxSamples int64, enabledFeaturesMap map[string]struct{}) (*promql.Engine, error) { engineOpts := promql.EngineOpts{ Logger: logger, @@ -31,5 +38,5 @@ func durationMilliseconds(d time.Duration) int64 { } func NewEngineWithDefaults(logger log.Logger) (*promql.Engine, error) { - return NewEngine(logger, time.Minute*2, time.Minute*5, time.Minute*1, 50000000, map[string]struct{}{}) + return NewEngine(logger, DefaultQueryTimeout, DefaultLookBackDelta, DefaultSubqueryStepInterval, DefaultMaxSamples, map[string]struct{}{}) } diff --git a/pkg/rules/adapters/ingest.go b/pkg/rules/adapters/ingest.go new file mode 100644 index 0000000000..5ec2a66c8a --- /dev/null +++ b/pkg/rules/adapters/ingest.go @@ -0,0 +1,74 @@ +package adapters + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + + "github.com/timescale/promscale/pkg/pgmodel/ingestor" + "github.com/timescale/promscale/pkg/pgmodel/metrics" + "github.com/timescale/promscale/pkg/pgmodel/model" + "github.com/timescale/promscale/pkg/prompb" + "github.com/timescale/promscale/pkg/util" +) + +var samplesIngested = metrics.IngestorItems.With(map[string]string{"type": "metric", "kind": "sample", "subsystem": "rules"}) + +type ingestAdapter struct { + ingestor *ingestor.DBIngestor +} + +func NewIngestAdapter(ingestor *ingestor.DBIngestor) *ingestAdapter { + return &ingestAdapter{ingestor} +} + +type appenderAdapter struct { + data map[string][]model.Insertable + ingestor *ingestor.DBIngestor +} + +func (a ingestAdapter) Appender(_ context.Context) storage.Appender { + return &appenderAdapter{ + data: make(map[string][]model.Insertable), + ingestor: a.ingestor, + } +} + +func (app *appenderAdapter) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + series, metricName, err := app.ingestor.SCache.GetSeriesFromProtos(util.LabelToPrompbLabels(l)) + if err != nil { + return 0, fmt.Errorf("error creating series: %w", err) + } + + samples := model.NewPromSamples(series, []prompb.Sample{{Timestamp: t, Value: v}}) + if _, found := app.data[metricName]; !found { + app.data[metricName] = make([]model.Insertable, 0) + } + app.data[metricName] = append(app.data[metricName], samples) + return 0, nil +} + +func (app *appenderAdapter) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + // We do not support appending exemplars in recording rules since this is not yet implemented upstream. + // Once upstream implements this feature, we can modify this function. + return 0, nil +} + +func (app *appenderAdapter) Commit() error { + numInsertablesIngested, err := app.ingestor.Dispatcher().InsertTs(context.Background(), model.Data{Rows: app.data, ReceivedTime: time.Now()}) + if err == nil { + samplesIngested.Add(float64(numInsertablesIngested)) + } + return errors.WithMessage(err, "rules: error ingesting data into db-ingestor") +} + +func (app *appenderAdapter) Rollback() error { + app.data = nil + return nil +} diff --git a/pkg/rules/adapters/query.go b/pkg/rules/adapters/query.go new file mode 100644 index 0000000000..01f9160efd --- /dev/null +++ b/pkg/rules/adapters/query.go @@ -0,0 +1,51 @@ +package adapters + +import ( + "context" + "fmt" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/timescale/promscale/pkg/promql" +) + +type queryAdapter struct { + queryable promql.Queryable +} + +func NewQueryAdapter(q promql.Queryable) *queryAdapter { + return &queryAdapter{q} +} + +func (q *queryAdapter) Querier(ctx context.Context, mint int64, maxt int64) (storage.Querier, error) { + qr, err := q.queryable.SamplesQuerier(ctx, mint, maxt) + if err != nil { + return nil, fmt.Errorf("samples-querier: %w", err) + } + return querierAdapter{qr}, nil +} + +type querierAdapter struct { + qr promql.SamplesQuerier +} + +func (q querierAdapter) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + // Pushdowns are not supported here. This is fine as Prometheus rule-manager only uses queryable to know + // the previous state of the alert. This function is not used in recording/alerting rules evaluation. + seriesSet, _ := q.qr.Select(sortSeries, hints, nil, nil, matchers...) + return seriesSet +} + +func (q querierAdapter) LabelValues(name string, _ ...*labels.Matcher) ([]string, storage.Warnings, error) { + // Weak TODO: We need to implement the matchers. + return q.qr.LabelValues(name) +} + +func (q querierAdapter) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { + return q.qr.LabelNames(matchers...) +} + +func (q querierAdapter) Close() error { + q.qr.Close() + return nil +} diff --git a/pkg/rules/custom_test.go b/pkg/rules/custom_test.go deleted file mode 100644 index 325f6ee221..0000000000 --- a/pkg/rules/custom_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package rules - -import ( - "testing" - - "github.com/prometheus/prometheus/model/labels" - prometheus_promql "github.com/prometheus/prometheus/promql" - "github.com/stretchr/testify/require" - - promscale_promql "github.com/timescale/promscale/pkg/promql" -) - -func TestYoloVector(t *testing.T) { - lbls := []labels.Label{{Name: "__name__", Value: "foo"}, {Name: "instance", Value: "localhost:9201"}} - promscaleVector := promscale_promql.Vector([]promscale_promql.Sample{ - { - Point: promscale_promql.Point{T: 1, V: 1}, - Metric: lbls, - }, { - Point: promscale_promql.Point{T: 2, V: 2}, - Metric: lbls, - }, - }) - - prometheusVector := prometheus_promql.Vector([]prometheus_promql.Sample{ - { - Point: prometheus_promql.Point{T: 1, V: 1}, - Metric: lbls, - }, { - Point: prometheus_promql.Point{T: 2, V: 2}, - Metric: lbls, - }, - }) - - gotVector := yoloVector(&promscaleVector) - require.Equal(t, prometheusVector, gotVector) -} \ No newline at end of file diff --git a/pkg/rules/rules.go b/pkg/rules/rules.go index c0d37f8bf3..4276c14b4d 100644 --- a/pkg/rules/rules.go +++ b/pkg/rules/rules.go @@ -15,11 +15,11 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/notifier" prom_rules "github.com/prometheus/prometheus/rules" - "github.com/prometheus/prometheus/storage" "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgclient" "github.com/timescale/promscale/pkg/query" + "github.com/timescale/promscale/pkg/rules/adapters" ) const ( @@ -43,9 +43,6 @@ type manager struct { } func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.Client, opts *Options) (*manager, error) { - appendable := storage.Appendable(nil) - queryable := storage.Queryable(nil) - promqlEngine, err := query.NewEngineWithDefaults(log.GetLogger()) if err != nil { return nil, fmt.Errorf("error creating PromQL engine with defaults: %w", err) @@ -66,12 +63,12 @@ func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.C } rulesManager := prom_rules.NewManager(&prom_rules.ManagerOptions{ - Appendable: appendable, + Appendable: adapters.NewIngestAdapter(client.Ingestor()), + Queryable: adapters.NewQueryAdapter(client.Queryable()), Context: ctx, ExternalURL: parsedUrl, Logger: log.GetLogger(), NotifyFunc: sendAlerts(notifierManager, parsedUrl.String()), - Queryable: queryable, QueryFunc: engineQueryFunc(promqlEngine, client.Queryable()), Registerer: r, OutageTolerance: opts.OutageTolerance, diff --git a/pkg/rules/custom.go b/pkg/rules/upstream.go similarity index 91% rename from pkg/rules/custom.go rename to pkg/rules/upstream.go index 6c18ddfb20..f5331da4de 100644 --- a/pkg/rules/custom.go +++ b/pkg/rules/upstream.go @@ -48,6 +48,9 @@ func engineQueryFunc(engine *promscale_promql.Engine, q promscale_promql.Queryab } } +// Compile-time test to make sure that sizes of both vectors are the same. +var _ = [1]bool{}[unsafe.Sizeof(promscale_promql.Vector{})-unsafe.Sizeof(prometheus_promql.Vector{})] + // My guess is this should be way faster than looping through individual samples // and converting into Prometheus Vector type. This lets us convert the type with // very less processing. @@ -59,7 +62,6 @@ type sender interface { Send(alerts ...*notifier.Alert) } -// sendAlerts implements the rules.NotifyFunc for a Notifier. func sendAlerts(s sender, externalURL string) rules.NotifyFunc { return func(ctx context.Context, expr string, alerts ...*rules.Alert) { var res []*notifier.Alert @@ -71,16 +73,14 @@ func sendAlerts(s sender, externalURL string) rules.NotifyFunc { Annotations: alert.Annotations, GeneratorURL: externalURL + strutil.TableLinkForExpression(expr), } - if !alert.ResolvedAt.IsZero() { - a.EndsAt = alert.ResolvedAt - } else { + a.EndsAt = alert.ResolvedAt + if alert.ResolvedAt.IsZero() { a.EndsAt = alert.ValidUntil } res = append(res, a) } if len(alerts) > 0 { - fmt.Println("sending alerts") s.Send(res...) } } diff --git a/pkg/rules/upstream_test.go b/pkg/rules/upstream_test.go new file mode 100644 index 0000000000..9fbde8bd1f --- /dev/null +++ b/pkg/rules/upstream_test.go @@ -0,0 +1,142 @@ +package rules + +import ( + "testing" + + "github.com/prometheus/prometheus/model/labels" + prometheus_promql "github.com/prometheus/prometheus/promql" + "github.com/stretchr/testify/require" + + promscale_promql "github.com/timescale/promscale/pkg/promql" +) + +func TestYoloVector(t *testing.T) { + cases := []struct { + name string + in promscale_promql.Vector + expected prometheus_promql.Vector + }{ + { + name: "response", + in: promscale_promql.Vector([]promscale_promql.Sample{ + { + Point: promscale_promql.Point{T: 1, V: 1}, + Metric: []labels.Label{{Name: "__name__", Value: "foo"}, {Name: "instance", Value: "localhost:9201"}}, + }, { + Point: promscale_promql.Point{T: 2, V: 2}, + Metric: []labels.Label{{Name: "__name__", Value: "foo"}, {Name: "instance", Value: "localhost:9201"}}, + }, + }), + expected: prometheus_promql.Vector([]prometheus_promql.Sample{ + { + Point: prometheus_promql.Point{T: 1, V: 1}, + Metric: []labels.Label{{Name: "__name__", Value: "foo"}, {Name: "instance", Value: "localhost:9201"}}, + }, { + Point: prometheus_promql.Point{T: 2, V: 2}, + Metric: []labels.Label{{Name: "__name__", Value: "foo"}, {Name: "instance", Value: "localhost:9201"}}, + }, + }), + }, + { + name: "response with decimals", + in: promscale_promql.Vector([]promscale_promql.Sample{ + { + Point: promscale_promql.Point{T: 10, V: 1.1}, + Metric: []labels.Label{{Name: "__name__", Value: "foo"}}, + }, { + Point: promscale_promql.Point{T: 20, V: 2.22}, + Metric: []labels.Label{{Name: "__name__", Value: "foo"}}, + }, + }), + expected: prometheus_promql.Vector([]prometheus_promql.Sample{ + { + Point: prometheus_promql.Point{T: 10, V: 1.1}, + Metric: []labels.Label{{Name: "__name__", Value: "foo"}}, + }, { + Point: prometheus_promql.Point{T: 20, V: 2.22}, + Metric: []labels.Label{{Name: "__name__", Value: "foo"}}, + }, + }), + }, + { + name: "nil", + in: promscale_promql.Vector(nil), + expected: prometheus_promql.Vector(nil), + }, + { + name: "empty arrays", + in: promscale_promql.Vector([]promscale_promql.Sample{}), + expected: prometheus_promql.Vector([]prometheus_promql.Sample{}), + }, + { + name: "slice", + in: promscale_promql.Vector(make([]promscale_promql.Sample, 0)), + expected: prometheus_promql.Vector(make([]prometheus_promql.Sample, 0)), + }, + { + name: "defaults", + in: promscale_promql.Vector([]promscale_promql.Sample{ + { + Point: promscale_promql.Point{}, + Metric: []labels.Label{{Name: "__name__", Value: "foo"}, {Name: "instance", Value: "localhost:9201"}}, + }, { + Point: promscale_promql.Point{}, + Metric: []labels.Label{{Name: "__name__", Value: "foo"}, {Name: "instance", Value: "localhost:9201"}}, + }, + }), + expected: prometheus_promql.Vector([]prometheus_promql.Sample{ + { + Point: prometheus_promql.Point{}, + Metric: []labels.Label{{Name: "__name__", Value: "foo"}, {Name: "instance", Value: "localhost:9201"}}, + }, { + Point: prometheus_promql.Point{}, + Metric: []labels.Label{{Name: "__name__", Value: "foo"}, {Name: "instance", Value: "localhost:9201"}}, + }, + }), + }, + } + + for _, c := range cases { + gotVector := yoloVector(&c.in) + require.Equal(t, c.expected, gotVector) + } +} + +// harkishen@harkishen:~/go/src/github.com/timescale/promscale/pkg/rules$ go test -bench . -cpu=1,2,4 -benchtime=100000x +//goos: linux +//goarch: amd64 +//pkg: github.com/timescale/promscale/pkg/rules +//cpu: Intel(R) Core(TM) i5-8265U CPU @ 1.60GHz +//BenchmarkVectorConversion/yolo 100000 0.001350 ns/op +//BenchmarkVectorConversion/yolo-2 100000 0.001100 ns/op +//BenchmarkVectorConversion/yolo-4 100000 0.002040 ns/op +//BenchmarkVectorConversion/looping 100000 1.021 ns/op +//BenchmarkVectorConversion/looping-2 100000 0.7710 ns/op +//BenchmarkVectorConversion/looping-4 100000 0.6590 ns/op +//PASS +//ok github.com/timescale/promscale/pkg/rules 0.031s +func BenchmarkVectorConversion(b *testing.B) { + var in promscale_promql.Vector + + for i := 0; i <= 10000; i++ { + in = append(in, promscale_promql.Sample{Point: promscale_promql.Point{T: int64(100 + i), V: float64(i)}, Metric: labels.Labels{}}) + } + + // Yolo method + b.Run("yolo", func(b *testing.B) { + yoloVector(&in) + }) + + vectorConvertor := func(psv promscale_promql.Vector) (promv prometheus_promql.Vector) { + promv = make(prometheus_promql.Vector, len(psv)) + for i := range psv { + promv[i].Metric = psv[i].Metric + promv[i].T = psv[i].T + promv[i].V = psv[i].V + } + return promv + } + b.Run("looping", func(b *testing.B) { + vectorConvertor(in) + }) +} diff --git a/pkg/util/util.go b/pkg/util/util.go index bf157101ae..eca2b4cf43 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -8,11 +8,12 @@ import ( "context" "flag" "fmt" - "os" - "strings" - + "github.com/prometheus/prometheus/model/labels" "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgxconn" + "github.com/timescale/promscale/pkg/prompb" + "os" + "strings" ) const PromNamespace = "promscale" @@ -74,6 +75,18 @@ func GetEnvVarName(prefix, fName string) (envVar string) { return strings.ReplaceAll(envVar, ".", "_") } +func LabelToPrompbLabels(l labels.Labels) []prompb.Label { + if len(l) == 0 { + return []prompb.Label{} + } + lbls := make([]prompb.Label, len(l)) + for i := range l { + lbls[i].Name = l[i].Name + lbls[i].Value = l[i].Value + } + return lbls +} + func IsTimescaleDBInstalled(conn pgxconn.PgxConn) bool { var installed bool err := conn.QueryRow(context.Background(), `SELECT count(*) > 0 FROM pg_extension where extname = 'timescaledb'`).Scan(&installed)