diff --git a/CHANGELOG.md b/CHANGELOG.md index e8c810af30..7ea75bd94a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ We use the following categories for changes: ## [Unreleased] +### Fixed +- Register `promscale_ingest_channel_len_bucket` metric and make it a gauge + ## [0.10.0] - 2022-02-17 ### Added diff --git a/docs/metrics.md b/docs/metrics.md index d2137fabb1..8ba71a1b72 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -674,23 +674,9 @@ promscale_ingest_active_write_requests{kind="span",type="trace"} 0 promscale_ingest_channel_cap{kind="sample",subsystem="copier",type="metric"} 10000 promscale_ingest_channel_cap{kind="sample",subsystem="metric_batcher",type="metric"} 1000 # HELP promscale_ingest_channel_len Length of the ingestor channel. -# TYPE promscale_ingest_channel_len histogram -promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="0"} 1462 -promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="1"} 1462 -promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="2"} 1462 -promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="4"} 1462 -promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="8"} 1462 -promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="16"} 1462 -promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="32"} 1462 -promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="64"} 1462 -promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="128"} 1462 -promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="256"} 1462 -promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="512"} 1462 -promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="990"} 1462 -promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="1000"} 1462 -promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="+Inf"} 1462 -promscale_ingest_channel_len_sum{kind="samples",subsystem="metric_batcher",type="metric"} 0 -promscale_ingest_channel_len_count{kind="samples",subsystem="metric_batcher",type="metric"} 1462 +# TYPE promscale_ingest_channel_len gauge +promscale_ingest_channel_len{kind="sample",subsystem="copier",type="metric"} 0 +promscale_ingest_channel_len{kind="sample",subsystem="metric_batcher",type="metric"} 0 # HELP promscale_ingest_duplicates_total Total number of processed samples/write_requests_to_db/metrics which where duplicates. # TYPE promscale_ingest_duplicates_total counter promscale_ingest_duplicates_total{kind="metric",type="metric"} 0 diff --git a/pkg/pgmodel/ingestor/copier.go b/pkg/pgmodel/ingestor/copier.go index 5789193fe3..53022c15da 100644 --- a/pkg/pgmodel/ingestor/copier.go +++ b/pkg/pgmodel/ingestor/copier.go @@ -484,15 +484,3 @@ func insertMetadata(conn pgxconn.PgxConn, reqs []pgmodel.Metadata) (insertedRows metrics.IngestorInsertDuration.With(prometheus.Labels{"type": "metric", "subsystem": "", "kind": "exemplar"}).Observe(time.Since(start).Seconds()) return insertedRows, nil } - -var copierChannelMutex sync.Mutex - -func setCopierChannelToMonitor(toSamplesCopiers chan readRequest) { - copierChannelMutex.Lock() - defer copierChannelMutex.Unlock() - - metrics.IngestorChannelCap.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Set(float64(cap(toSamplesCopiers))) - metrics.SampleCopierChannelLengthFunc = func() float64 { - return float64(len(toSamplesCopiers)) - } -} diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index deb1f551bf..076831df13 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -59,7 +59,9 @@ func newPgxDispatcher(conn pgxconn.PgxConn, cache cache.MetricCache, scache cach //between metrucs maxMetrics := 10000 copierReadRequestCh := make(chan readRequest, maxMetrics) - setCopierChannelToMonitor(copierReadRequestCh) + + metrics.IngestorChannelCap.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Set(float64(cap(copierReadRequestCh))) + metrics.RegisterCopierChannelLenMetric(func() float64 { return float64(len(copierReadRequestCh)) }) if cfg.IgnoreCompressedChunks { // Handle decompression to not decompress anything. @@ -296,7 +298,7 @@ func (p *pgxDispatcher) getMetricBatcher(metric string) chan<- *insertDataReques } } ch := batcher.(chan *insertDataRequest) - metrics.IngestorChannelLen.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher", "kind": "samples"}).Observe(float64(len(ch))) + metrics.IngestorChannelLenBatcher.Set(float64(len(ch))) return ch } diff --git a/pkg/pgmodel/ingestor/ingestor_sql_test.go b/pkg/pgmodel/ingestor/ingestor_sql_test.go index 3d1416b93b..375a0e9866 100644 --- a/pkg/pgmodel/ingestor/ingestor_sql_test.go +++ b/pkg/pgmodel/ingestor/ingestor_sql_test.go @@ -523,6 +523,9 @@ func TestPGXInserterCacheReset(t *testing.T) { } func TestPGXInserterInsertData(t *testing.T) { + if err := os.Setenv("IS_TEST", "true"); err != nil { + t.Fatal(err) + } makeLabel := func() *model.Series { l := &model.Series{} l.SetSeriesID(1, 1) diff --git a/pkg/pgmodel/metrics/ingest.go b/pkg/pgmodel/metrics/ingest.go index 5874dbde3c..bd52619c21 100644 --- a/pkg/pgmodel/metrics/ingest.go +++ b/pkg/pgmodel/metrics/ingest.go @@ -5,6 +5,8 @@ package metrics import ( + "os" + "github.com/prometheus/client_golang/prometheus" "github.com/timescale/promscale/pkg/util" ) @@ -62,24 +64,14 @@ var ( Help: "Capacity of the ingest channel.", }, []string{"type", "subsystem", "kind"}, ) - IngestorChannelLen = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: util.PromNamespace, - Subsystem: "ingest", - Name: "channel_len", - Help: "Length of the ingestor channel.", - Buckets: util.HistogramBucketsSaturating(0, 2, MetricBatcherChannelCap), - }, []string{"type", "subsystem", "kind"}, - ) - SampleCopierChannelLengthFunc func() float64 - IngestorChannelLenCopier = prometheus.NewGaugeFunc( + IngestorChannelLenBatcher = prometheus.NewGauge( prometheus.GaugeOpts{ Namespace: util.PromNamespace, Subsystem: "ingest", Name: "channel_len", Help: "Length of the ingestor channel.", - ConstLabels: map[string]string{"type": "metric", "subsystem": "copier", "kind": "sample"}, - }, SampleCopierChannelLengthFunc, + ConstLabels: map[string]string{"type": "metric", "subsystem": "metric_batcher", "kind": "sample"}, + }, ) IngestorFlushSeries = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -187,7 +179,7 @@ func init() { IngestorDecompressEarliest, IngestorMaxSentTimestamp, IngestorChannelCap, - IngestorChannelLen, + IngestorChannelLenBatcher, IngestorFlushSeries, IngestorInsertablesIngested, IngestorInsertsPerBatch, @@ -201,3 +193,26 @@ func init() { InsertBatchSize, ) } + +// RegisterCopierChannelLenMetric creates and registers the copier channel len metric with a callback +// that should return the length of the channel. +// +// Note: ingestorChannelLenCopier metric depends on prometheus call to /metrics hence we need to update with +// a callback. This is an odd one out from the other metrics in the ingestor as other metrics +// are async to prometheus calls. +func RegisterCopierChannelLenMetric(updater func() float64) { + r := prometheus.DefaultRegisterer + if val := os.Getenv("IS_TEST"); val == "true" { + r = prometheus.NewRegistry() + } + ingestorChannelLenCopier := prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "channel_len", + Help: "Length of the ingestor channel.", + ConstLabels: map[string]string{"type": "metric", "subsystem": "copier", "kind": "sample"}, + }, updater, + ) + r.MustRegister(ingestorChannelLenCopier) +}