Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Implement adapters for queryable and appendable.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>

This commits adds adapters to convert our db-ingestor and samples
querier into the appendable and queryable that is expected by the rules
manager module.
  • Loading branch information
Harkishen-Singh committed Apr 27, 2022
1 parent bf1a706 commit 816c9f9
Show file tree
Hide file tree
Showing 13 changed files with 314 additions and 70 deletions.
9 changes: 5 additions & 4 deletions pkg/api/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/timescale/promscale/pkg/log"
pgmodel "github.com/timescale/promscale/pkg/pgmodel/model"
"github.com/timescale/promscale/pkg/promql"
"github.com/timescale/promscale/pkg/query"
"github.com/timescale/promscale/pkg/tenancy"
)

Expand Down Expand Up @@ -132,12 +133,12 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {

fs.Var(&cfg.PromscaleEnabledFeatureList, "enable-feature", "Enable beta/experimental features as a comma-separated list. Currently the following values can be passed: promql-at-modifier, promql-negative-offset")

fs.DurationVar(&cfg.MaxQueryTimeout, "metrics.promql.query-timeout", 2*time.Minute, "Maximum time a query may take before being aborted. This option sets both the default and maximum value of the 'timeout' parameter in "+
fs.DurationVar(&cfg.MaxQueryTimeout, "metrics.promql.query-timeout", query.DefaultQueryTimeout, "Maximum time a query may take before being aborted. This option sets both the default and maximum value of the 'timeout' parameter in "+
"'/api/v1/query.*' endpoints.")
fs.DurationVar(&cfg.SubQueryStepInterval, "metrics.promql.default-subquery-step-interval", 1*time.Minute, "Default step interval to be used for PromQL subquery evaluation. "+
fs.DurationVar(&cfg.SubQueryStepInterval, "metrics.promql.default-subquery-step-interval", query.DefaultSubqueryStepInterval, "Default step interval to be used for PromQL subquery evaluation. "+
"This value is used if the subquery does not specify the step value explicitly. Example: <metric_name>[30m:]. Note: in Prometheus this setting is set by the evaluation_interval option.")
fs.DurationVar(&cfg.LookBackDelta, "metrics.promql.lookback-delta", time.Minute*5, "Maximum lookback duration for retrieving metrics during expression evaluations and federation.")
fs.Int64Var(&cfg.MaxSamples, "metrics.promql.max-samples", 50000000, "Maximum number of samples a single "+
fs.DurationVar(&cfg.LookBackDelta, "metrics.promql.lookback-delta", query.DefaultLookBackDelta, "Maximum lookback duration for retrieving metrics during expression evaluations and federation.")
fs.Int64Var(&cfg.MaxSamples, "metrics.promql.max-samples", query.DefaultMaxSamples, "Maximum number of samples a single "+
"query can load into memory. Note that queries will fail if they try to load more samples than this into memory, "+
"so this also limits the number of samples a query can return.")
fs.Int64Var(&cfg.MaxPointsPerTs, "metrics.promql.max-points-per-ts", 11000, "Maximum number of points per time-series in a query-range request. "+
Expand Down
11 changes: 2 additions & 9 deletions pkg/api/parser/text/text.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/textparse"
"github.com/timescale/promscale/pkg/prompb"
"github.com/timescale/promscale/pkg/util"
)

var timeProvider = time.Now
Expand All @@ -26,7 +27,6 @@ func ParseRequest(r *http.Request, wr *prompb.WriteRequest) error {
p = textparse.New(b, r.Header.Get("Content-Type"))
defTime = int64(model.TimeFromUnixNano(timeProvider().UnixNano()))
et textparse.Entry
ll []prompb.Label
)

for {
Expand Down Expand Up @@ -55,14 +55,7 @@ func ParseRequest(r *http.Request, wr *prompb.WriteRequest) error {
var lset labels.Labels
_ = p.Metric(&lset)

ll = make([]prompb.Label, 0, len(lset))

for i := range lset {
ll = append(ll, prompb.Label{
Name: lset[i].Name,
Value: lset[i].Value,
})
}
ll := util.LabelToPrompbLabels(lset)

wr.Timeseries = append(wr.Timeseries, prompb.TimeSeries{
Labels: ll,
Expand Down
1 change: 0 additions & 1 deletion pkg/pgmodel/cache/series_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ func (t *SeriesCacheImpl) GetSeriesFromProtos(labelPairs []prompb.Label) (*model
if err != nil {
return nil, "", err
}

var series *model.Series
useByteAsStringNoCopy(builder.Bytes(), func(key string) {
series = t.loadSeries(key)
Expand Down
10 changes: 7 additions & 3 deletions pkg/pgmodel/ingestor/ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Cfg struct {

// DBIngestor ingest the TimeSeries data into Timescale database.
type DBIngestor struct {
sCache cache.SeriesCache
SCache cache.SeriesCache
dispatcher model.Dispatcher
tWriter trace.Writer
}
Expand All @@ -44,7 +44,7 @@ func NewPgxIngestor(conn pgxconn.PgxConn, cache cache.MetricCache, sCache cache.
return nil, err
}
return &DBIngestor{
sCache: sCache,
SCache: sCache,
dispatcher: dispatcher,
tWriter: trace.NewWriter(conn),
}, nil
Expand Down Expand Up @@ -184,7 +184,7 @@ func (ingestor *DBIngestor) ingestTimeseries(ctx context.Context, timeseries []p
}
// Normalize and canonicalize t.Labels.
// After this point t.Labels should never be used again.
series, metricName, err = ingestor.sCache.GetSeriesFromProtos(ts.Labels)
series, metricName, err = ingestor.SCache.GetSeriesFromProtos(ts.Labels)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -254,6 +254,10 @@ func (ingestor *DBIngestor) ingestMetadata(ctx context.Context, metadata []promp
return numMetadataIngested, nil
}

func (ingestor *DBIngestor) Dispatcher() model.Dispatcher {
return ingestor.dispatcher
}

// Parts of metric creation not needed to insert data
func (ingestor *DBIngestor) CompleteMetricCreation(ctx context.Context) error {
return ingestor.dispatcher.CompleteMetricCreation(ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/pgmodel/ingestor/ingestor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func TestDBIngestorIngest(t *testing.T) {
}
i := DBIngestor{
dispatcher: &inserter,
sCache: sCache,
SCache: sCache,
}

wr := NewWriteRequest()
Expand Down
9 changes: 8 additions & 1 deletion pkg/query/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ import (
"github.com/timescale/promscale/pkg/promql"
)

const (
DefaultQueryTimeout = time.Minute * 2
DefaultLookBackDelta = time.Minute * 5
DefaultSubqueryStepInterval = time.Minute
DefaultMaxSamples = 50000000
)

func NewEngine(logger log.Logger, queryTimeout, lookBackDelta, subqueryDefaultStepInterval time.Duration, maxSamples int64, enabledFeaturesMap map[string]struct{}) (*promql.Engine, error) {
engineOpts := promql.EngineOpts{
Logger: logger,
Expand All @@ -31,5 +38,5 @@ func durationMilliseconds(d time.Duration) int64 {
}

func NewEngineWithDefaults(logger log.Logger) (*promql.Engine, error) {
return NewEngine(logger, time.Minute*2, time.Minute*5, time.Minute*1, 50000000, map[string]struct{}{})
return NewEngine(logger, DefaultQueryTimeout, DefaultLookBackDelta, DefaultSubqueryStepInterval, DefaultMaxSamples, map[string]struct{}{})
}
74 changes: 74 additions & 0 deletions pkg/rules/adapters/ingest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package adapters

import (
"context"
"fmt"
"time"

"github.com/pkg/errors"

"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"

"github.com/timescale/promscale/pkg/pgmodel/ingestor"
"github.com/timescale/promscale/pkg/pgmodel/metrics"
"github.com/timescale/promscale/pkg/pgmodel/model"
"github.com/timescale/promscale/pkg/prompb"
"github.com/timescale/promscale/pkg/util"
)

var samplesIngested = metrics.IngestorItems.With(map[string]string{"type": "metric", "kind": "sample", "subsystem": "rules"})

type ingestAdapter struct {
ingestor *ingestor.DBIngestor
}

func NewIngestAdapter(ingestor *ingestor.DBIngestor) *ingestAdapter {
return &ingestAdapter{ingestor}
}

type appenderAdapter struct {
data map[string][]model.Insertable
ingestor *ingestor.DBIngestor
}

func (a ingestAdapter) Appender(_ context.Context) storage.Appender {
return &appenderAdapter{
data: make(map[string][]model.Insertable),
ingestor: a.ingestor,
}
}

func (app *appenderAdapter) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
series, metricName, err := app.ingestor.SCache.GetSeriesFromProtos(util.LabelToPrompbLabels(l))
if err != nil {
return 0, fmt.Errorf("error creating series: %w", err)
}

samples := model.NewPromSamples(series, []prompb.Sample{{Timestamp: t, Value: v}})
if _, found := app.data[metricName]; !found {
app.data[metricName] = make([]model.Insertable, 0)
}
app.data[metricName] = append(app.data[metricName], samples)
return 0, nil
}

func (app *appenderAdapter) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
// We do not support appending exemplars in recording rules since this is not yet implemented upstream.
// Once upstream implements this feature, we can modify this function.
return 0, nil
}

func (app *appenderAdapter) Commit() error {
numInsertablesIngested, err := app.ingestor.Dispatcher().InsertTs(context.Background(), model.Data{Rows: app.data, ReceivedTime: time.Now()})
if err == nil {
samplesIngested.Add(float64(numInsertablesIngested))
}
return errors.WithMessage(err, "rules: error ingesting data into db-ingestor")
}

func (app *appenderAdapter) Rollback() error {
app.data = nil
return nil
}
51 changes: 51 additions & 0 deletions pkg/rules/adapters/query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package adapters

import (
"context"
"fmt"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/timescale/promscale/pkg/promql"
)

type queryAdapter struct {
queryable promql.Queryable
}

func NewQueryAdapter(q promql.Queryable) *queryAdapter {
return &queryAdapter{q}
}

func (q *queryAdapter) Querier(ctx context.Context, mint int64, maxt int64) (storage.Querier, error) {
qr, err := q.queryable.SamplesQuerier(ctx, mint, maxt)
if err != nil {
return nil, fmt.Errorf("samples-querier: %w", err)
}
return querierAdapter{qr}, nil
}

type querierAdapter struct {
qr promql.SamplesQuerier
}

func (q querierAdapter) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
// Pushdowns are not supported here. This is fine as Prometheus rule-manager only uses queryable to know
// the previous state of the alert. This function is not used in recording/alerting rules evaluation.
seriesSet, _ := q.qr.Select(sortSeries, hints, nil, nil, matchers...)
return seriesSet
}

func (q querierAdapter) LabelValues(name string, _ ...*labels.Matcher) ([]string, storage.Warnings, error) {
// Weak TODO: We need to implement the matchers.
return q.qr.LabelValues(name)
}

func (q querierAdapter) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return q.qr.LabelNames(matchers...)
}

func (q querierAdapter) Close() error {
q.qr.Close()
return nil
}
37 changes: 0 additions & 37 deletions pkg/rules/custom_test.go

This file was deleted.

9 changes: 3 additions & 6 deletions pkg/rules/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/notifier"
prom_rules "github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"

"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgclient"
"github.com/timescale/promscale/pkg/query"
"github.com/timescale/promscale/pkg/rules/adapters"
)

const (
Expand All @@ -43,9 +43,6 @@ type manager struct {
}

func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.Client, opts *Options) (*manager, error) {
appendable := storage.Appendable(nil)
queryable := storage.Queryable(nil)

promqlEngine, err := query.NewEngineWithDefaults(log.GetLogger())
if err != nil {
return nil, fmt.Errorf("error creating PromQL engine with defaults: %w", err)
Expand All @@ -66,12 +63,12 @@ func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.C
}

rulesManager := prom_rules.NewManager(&prom_rules.ManagerOptions{
Appendable: appendable,
Appendable: adapters.NewIngestAdapter(client.Ingestor()),
Queryable: adapters.NewQueryAdapter(client.Queryable()),
Context: ctx,
ExternalURL: parsedUrl,
Logger: log.GetLogger(),
NotifyFunc: sendAlerts(notifierManager, parsedUrl.String()),
Queryable: queryable,
QueryFunc: engineQueryFunc(promqlEngine, client.Queryable()),
Registerer: r,
OutageTolerance: opts.OutageTolerance,
Expand Down
10 changes: 5 additions & 5 deletions pkg/rules/custom.go → pkg/rules/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func engineQueryFunc(engine *promscale_promql.Engine, q promscale_promql.Queryab
}
}

// Compile-time test to make sure that sizes of both vectors are the same.
var _ = [1]bool{}[unsafe.Sizeof(promscale_promql.Vector{})-unsafe.Sizeof(prometheus_promql.Vector{})]

// My guess is this should be way faster than looping through individual samples
// and converting into Prometheus Vector type. This lets us convert the type with
// very less processing.
Expand All @@ -59,7 +62,6 @@ type sender interface {
Send(alerts ...*notifier.Alert)
}

// sendAlerts implements the rules.NotifyFunc for a Notifier.
func sendAlerts(s sender, externalURL string) rules.NotifyFunc {
return func(ctx context.Context, expr string, alerts ...*rules.Alert) {
var res []*notifier.Alert
Expand All @@ -71,16 +73,14 @@ func sendAlerts(s sender, externalURL string) rules.NotifyFunc {
Annotations: alert.Annotations,
GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = alert.ResolvedAt
} else {
a.EndsAt = alert.ResolvedAt
if alert.ResolvedAt.IsZero() {
a.EndsAt = alert.ValidUntil
}
res = append(res, a)
}

if len(alerts) > 0 {
fmt.Println("sending alerts")
s.Send(res...)
}
}
Expand Down
Loading

0 comments on commit 816c9f9

Please sign in to comment.