Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Register promscale_ingest_channel_len metric and make it a Gauge.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>
  • Loading branch information
Harkishen-Singh committed Feb 21, 2022
1 parent 5b2c63c commit 8e871f0
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 45 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ We use the following categories for changes:

## [Unreleased]

### Fixed
- Register `promscale_ingest_channel_len_bucket` metric and make it a gauge

## [0.10.0] - 2022-02-17

### Added
Expand Down
20 changes: 3 additions & 17 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -674,23 +674,9 @@ promscale_ingest_active_write_requests{kind="span",type="trace"} 0
promscale_ingest_channel_cap{kind="sample",subsystem="copier",type="metric"} 10000
promscale_ingest_channel_cap{kind="sample",subsystem="metric_batcher",type="metric"} 1000
# HELP promscale_ingest_channel_len Length of the ingestor channel.
# TYPE promscale_ingest_channel_len histogram
promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="0"} 1462
promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="1"} 1462
promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="2"} 1462
promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="4"} 1462
promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="8"} 1462
promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="16"} 1462
promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="32"} 1462
promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="64"} 1462
promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="128"} 1462
promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="256"} 1462
promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="512"} 1462
promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="990"} 1462
promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="1000"} 1462
promscale_ingest_channel_len_bucket{kind="samples",subsystem="metric_batcher",type="metric",le="+Inf"} 1462
promscale_ingest_channel_len_sum{kind="samples",subsystem="metric_batcher",type="metric"} 0
promscale_ingest_channel_len_count{kind="samples",subsystem="metric_batcher",type="metric"} 1462
# TYPE promscale_ingest_channel_len gauge
promscale_ingest_channel_len{kind="sample",subsystem="copier",type="metric"} 0
promscale_ingest_channel_len{kind="sample",subsystem="metric_batcher",type="metric"} 0
# HELP promscale_ingest_duplicates_total Total number of processed samples/write_requests_to_db/metrics which where duplicates.
# TYPE promscale_ingest_duplicates_total counter
promscale_ingest_duplicates_total{kind="metric",type="metric"} 0
Expand Down
12 changes: 0 additions & 12 deletions pkg/pgmodel/ingestor/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,15 +484,3 @@ func insertMetadata(conn pgxconn.PgxConn, reqs []pgmodel.Metadata) (insertedRows
metrics.IngestorInsertDuration.With(prometheus.Labels{"type": "metric", "subsystem": "", "kind": "exemplar"}).Observe(time.Since(start).Seconds())
return insertedRows, nil
}

var copierChannelMutex sync.Mutex

func setCopierChannelToMonitor(toSamplesCopiers chan readRequest) {
copierChannelMutex.Lock()
defer copierChannelMutex.Unlock()

metrics.IngestorChannelCap.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Set(float64(cap(toSamplesCopiers)))
metrics.SampleCopierChannelLengthFunc = func() float64 {
return float64(len(toSamplesCopiers))
}
}
6 changes: 4 additions & 2 deletions pkg/pgmodel/ingestor/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ func newPgxDispatcher(conn pgxconn.PgxConn, cache cache.MetricCache, scache cach
//between metrucs
maxMetrics := 10000
copierReadRequestCh := make(chan readRequest, maxMetrics)
setCopierChannelToMonitor(copierReadRequestCh)

metrics.IngestorChannelCap.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Set(float64(cap(copierReadRequestCh)))
metrics.RegisterCopierChannelLenMetric(func() float64 { return float64(len(copierReadRequestCh)) })

if cfg.IgnoreCompressedChunks {
// Handle decompression to not decompress anything.
Expand Down Expand Up @@ -296,7 +298,7 @@ func (p *pgxDispatcher) getMetricBatcher(metric string) chan<- *insertDataReques
}
}
ch := batcher.(chan *insertDataRequest)
metrics.IngestorChannelLen.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher", "kind": "samples"}).Observe(float64(len(ch)))
metrics.IngestorChannelLenBatcher.Set(float64(len(ch)))
return ch
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/pgmodel/ingestor/ingestor_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,9 @@ func TestPGXInserterCacheReset(t *testing.T) {
}

func TestPGXInserterInsertData(t *testing.T) {
if err := os.Setenv("IS_TEST", "true"); err != nil {
t.Fatal(err)
}
makeLabel := func() *model.Series {
l := &model.Series{}
l.SetSeriesID(1, 1)
Expand Down
43 changes: 29 additions & 14 deletions pkg/pgmodel/metrics/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package metrics

import (
"os"

"github.com/prometheus/client_golang/prometheus"
"github.com/timescale/promscale/pkg/util"
)
Expand Down Expand Up @@ -62,24 +64,14 @@ var (
Help: "Capacity of the ingest channel.",
}, []string{"type", "subsystem", "kind"},
)
IngestorChannelLen = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: util.PromNamespace,
Subsystem: "ingest",
Name: "channel_len",
Help: "Length of the ingestor channel.",
Buckets: util.HistogramBucketsSaturating(0, 2, MetricBatcherChannelCap),
}, []string{"type", "subsystem", "kind"},
)
SampleCopierChannelLengthFunc func() float64
IngestorChannelLenCopier = prometheus.NewGaugeFunc(
IngestorChannelLenBatcher = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "ingest",
Name: "channel_len",
Help: "Length of the ingestor channel.",
ConstLabels: map[string]string{"type": "metric", "subsystem": "copier", "kind": "sample"},
}, SampleCopierChannelLengthFunc,
ConstLabels: map[string]string{"type": "metric", "subsystem": "metric_batcher", "kind": "sample"},
},
)
IngestorFlushSeries = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down Expand Up @@ -187,7 +179,7 @@ func init() {
IngestorDecompressEarliest,
IngestorMaxSentTimestamp,
IngestorChannelCap,
IngestorChannelLen,
IngestorChannelLenBatcher,
IngestorFlushSeries,
IngestorInsertablesIngested,
IngestorInsertsPerBatch,
Expand All @@ -201,3 +193,26 @@ func init() {
InsertBatchSize,
)
}

// RegisterCopierChannelLenMetric creates and registers the copier channel len metric with a callback
// that should return the length of the channel.
//
// Note: ingestorChannelLenCopier metric depends on prometheus call to /metrics hence we need to update with
// a callback. This is an odd one out from the other metrics in the ingestor as other metrics
// are async to prometheus calls.
func RegisterCopierChannelLenMetric(updater func() float64) {
r := prometheus.DefaultRegisterer
if val := os.Getenv("IS_TEST"); val == "true" {
r = prometheus.NewRegistry()
}
ingestorChannelLenCopier := prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "ingest",
Name: "channel_len",
Help: "Length of the ingestor channel.",
ConstLabels: map[string]string{"type": "metric", "subsystem": "copier", "kind": "sample"},
}, updater,
)
r.MustRegister(ingestorChannelLenCopier)
}

0 comments on commit 8e871f0

Please sign in to comment.