From 9b28ba75b24bd7a9da38bdcf0414653a1259edcb Mon Sep 17 00:00:00 2001 From: "saikalyan.bhagavathula" Date: Wed, 7 Aug 2024 18:11:01 +0530 Subject: [PATCH] send metrics as batches --- metrics/opsramp.go | 101 +++++++++++++++++++++++++++++---------------- 1 file changed, 66 insertions(+), 35 deletions(-) diff --git a/metrics/opsramp.go b/metrics/opsramp.go index 47d79f1c93..0069e716c4 100644 --- a/metrics/opsramp.go +++ b/metrics/opsramp.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/prometheus/common/model" "io" "net/http" "os" @@ -22,7 +23,6 @@ import ( "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" io_prometheus_client "github.com/prometheus/client_model/go" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" "github.com/gogo/protobuf/proto" @@ -42,6 +42,7 @@ var ( serverMut sync.Mutex hostname string ) +var timeSeriesChannel = make(chan []prompb.TimeSeries) func init() { muxer = mux.NewRouter() @@ -153,18 +154,11 @@ func (p *OpsRampMetrics) Start() error { p.Logger.Error().Logf("error while initializing oAuth Token Err: %v", err) } + // Start timeseries listener thread + go p.ListenTimeseries() + for range metricsTicker.C { - statusCode, err := p.Push() - if err != nil { - p.Logger.Error().Logf("error while pushing metrics with statusCode: %d and Error: %v", statusCode, err) - if err.Error() == missingMetricsWriteScope { - p.Logger.Info().Logf("renewing auth token since the existing token is missing metrics:write scope") - err := p.RenewOAuthToken() - if err != nil { - p.Logger.Error().Logf("error while initializing oAuth Token Err: %v", err) - } - } - } + p.Push() } }() } @@ -172,6 +166,15 @@ func (p *OpsRampMetrics) Start() error { return nil } +func (p *OpsRampMetrics) ListenTimeseries() { + + for timeSeries := range timeSeriesChannel { + // call request push with timeSeries payload + p.frameRequest(timeSeries) + } + +} + // Register takes a name and a metric type. The type should be one of "counter", // "gauge", or "histogram" func (p *OpsRampMetrics) Register(name, metricType string) { @@ -594,11 +597,11 @@ func (p *OpsRampMetrics) calculateTraceOperationError(metricFamilySlice []*io_pr } } -func (p *OpsRampMetrics) Push() (int, error) { +func (p *OpsRampMetrics) Push() { metricsConfig := p.Config.GetMetricsConfig() // setting up default values and removing metrics older than 5 minutes - p.lock.Lock() // nolint: all // no linting here since we need to release the lock sooner than end of funtion + p.lock.Lock() // nolint: all // no linting here since we need to release the lock sooner than end of function for metricName, metData := range p.metrics { for labelValStr, t := range metData.LabelValues.Copy() { @@ -617,14 +620,23 @@ func (p *OpsRampMetrics) Push() (int, error) { } if timeDiff > time.Minute*15 { + p.Logger.Error().WithField("metric: ", metricName). + WithField("Label Values: ", labelVals). + Logf("Deleting a Gauge metric for ") p.gaugeDeleteLabelValues(metricName, labelVals) } case COUNTER: if timeDiff > time.Hour*24 { + p.Logger.Error().WithField("metric: ", metricName). + WithField("Label Values: ", labelVals). + Logf("Deleting a Counter metric for ") p.counterDeleteLabelValues(metricName, labelVals) } case HISTOGRAM: if timeDiff > time.Hour*24 { + p.Logger.Error().WithField("metric: ", metricName). + WithField("Label Values: ", labelVals). + Logf("Deleting a Histogram metric for ") p.histogramDeleteLabelValues(metricName, labelVals) } } @@ -635,14 +647,8 @@ func (p *OpsRampMetrics) Push() (int, error) { metricFamilySlice, err := p.promRegistry.Gather() if err != nil { - return -1, err - } - - p.calculateTraceOperationError(metricFamilySlice) - - metricFamilySlice, err = p.promRegistry.Gather() - if err != nil { - return -1, err + p.Logger.Error().Logf("Error during gather: %v", err) + return } presentTime := time.Now().UnixMilli() @@ -653,7 +659,23 @@ func (p *OpsRampMetrics) Push() (int, error) { if !p.re.MatchString(metricFamily.GetName()) { continue } + var metricsLength int + var hasSent bool for _, metric := range metricFamily.GetMetric() { + metricLength := len(metric.String()) + if metricLength > 250000 { + p.Logger.Info().Logf("Metric size is too big (%d characters)", metricLength) // larger than my tummy + continue + } + if metricsLength+metricLength > 250000 { + hasSent = true + go func() { + timeSeriesChannel <- timeSeries // Push timeSeries payload to channel + }() + metricsLength = 0 + timeSeries = []prompb.TimeSeries{} + } + metricsLength += metricLength labels := []prompb.Label{ { Name: model.JobLabel, @@ -803,14 +825,25 @@ func (p *OpsRampMetrics) Push() (int, error) { }, }) } + hasSent = false + } + if !hasSent { + go func() { + timeSeriesChannel <- timeSeries + }() + } } +} + +func (p *OpsRampMetrics) frameRequest(timeSeries []prompb.TimeSeries) { request := prompb.WriteRequest{Timeseries: timeSeries} out, err := proto.Marshal(&request) if err != nil { - return -1, err + p.Logger.Error().Logf("Unable to marshal the request: %v", err) + return } compressed := snappy.Encode(nil, out) @@ -819,7 +852,8 @@ func (p *OpsRampMetrics) Push() (int, error) { req, err := http.NewRequest(http.MethodPost, URL, bytes.NewBuffer(compressed)) if err != nil { - return -1, err + p.Logger.Error().Logf("Unable to build request: %v", err) + return } req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") @@ -827,30 +861,27 @@ func (p *OpsRampMetrics) Push() (int, error) { req.Header.Set("Content-Type", "application/x-protobuf") if !strings.Contains(p.oAuthToken.Scope, "metrics:write") { - return -1, fmt.Errorf(missingMetricsWriteScope) + p.Logger.Error().Logf(missingMetricsWriteScope) + p.RenewOAuthToken() } req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", p.oAuthToken.AccessToken)) resp, err := p.Send(req) if err != nil { - return -1, err + p.Logger.Error().Logf("Unable to send request: %v", err) + return } - defer func() { - if resp != nil && resp.Body != nil { - _ = resp.Body.Close() - } - }() + defer resp.Body.Close() // Depending on the version and configuration of the PGW, StatusOK or StatusAccepted may be returned. body, err := io.ReadAll(resp.Body) if err != nil { p.Logger.Error().Logf("failed to parse response body Err: %v", err) } if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { - return resp.StatusCode, fmt.Errorf("unexpected status code %d while pushing: %s", resp.StatusCode, body) + p.Logger.Error().Logf("unexpected status code %d while pushing: %s", resp.StatusCode, body) + return } - p.Logger.Debug().Logf("metrics %s push response: %v", p.prefix, string(body)) - - return resp.StatusCode, nil + //p.Logger.Debug().Logf("metrics %s push response: %v", p.prefix, string(body)) } func (p *OpsRampMetrics) RenewClient() {