Skip to content

Commit

Permalink
send metrics as batches
Browse files Browse the repository at this point in the history
  • Loading branch information
saikalyan.bhagavathula committed Aug 7, 2024
1 parent b3e30a7 commit 9b28ba7
Showing 1 changed file with 66 additions and 35 deletions.
101 changes: 66 additions & 35 deletions metrics/opsramp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/prometheus/common/model"
"io"
"net/http"
"os"
Expand All @@ -22,7 +23,6 @@ import (
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"

"github.com/gogo/protobuf/proto"
Expand All @@ -42,6 +42,7 @@ var (
serverMut sync.Mutex
hostname string
)
var timeSeriesChannel = make(chan []prompb.TimeSeries)

func init() {
muxer = mux.NewRouter()
Expand Down Expand Up @@ -153,25 +154,27 @@ func (p *OpsRampMetrics) Start() error {
p.Logger.Error().Logf("error while initializing oAuth Token Err: %v", err)
}

// Start timeseries listener thread
go p.ListenTimeseries()

for range metricsTicker.C {
statusCode, err := p.Push()
if err != nil {
p.Logger.Error().Logf("error while pushing metrics with statusCode: %d and Error: %v", statusCode, err)
if err.Error() == missingMetricsWriteScope {
p.Logger.Info().Logf("renewing auth token since the existing token is missing metrics:write scope")
err := p.RenewOAuthToken()
if err != nil {
p.Logger.Error().Logf("error while initializing oAuth Token Err: %v", err)
}
}
}
p.Push()
}
}()
}

return nil
}

func (p *OpsRampMetrics) ListenTimeseries() {

for timeSeries := range timeSeriesChannel {
// call request push with timeSeries payload
p.frameRequest(timeSeries)
}

}

// Register takes a name and a metric type. The type should be one of "counter",
// "gauge", or "histogram"
func (p *OpsRampMetrics) Register(name, metricType string) {
Expand Down Expand Up @@ -594,11 +597,11 @@ func (p *OpsRampMetrics) calculateTraceOperationError(metricFamilySlice []*io_pr
}
}

func (p *OpsRampMetrics) Push() (int, error) {
func (p *OpsRampMetrics) Push() {
metricsConfig := p.Config.GetMetricsConfig()

// setting up default values and removing metrics older than 5 minutes
p.lock.Lock() // nolint: all // no linting here since we need to release the lock sooner than end of funtion
p.lock.Lock() // nolint: all // no linting here since we need to release the lock sooner than end of function

for metricName, metData := range p.metrics {
for labelValStr, t := range metData.LabelValues.Copy() {
Expand All @@ -617,14 +620,23 @@ func (p *OpsRampMetrics) Push() (int, error) {
}

if timeDiff > time.Minute*15 {
p.Logger.Error().WithField("metric: ", metricName).
WithField("Label Values: ", labelVals).
Logf("Deleting a Gauge metric for ")
p.gaugeDeleteLabelValues(metricName, labelVals)
}
case COUNTER:
if timeDiff > time.Hour*24 {
p.Logger.Error().WithField("metric: ", metricName).
WithField("Label Values: ", labelVals).
Logf("Deleting a Counter metric for ")
p.counterDeleteLabelValues(metricName, labelVals)
}
case HISTOGRAM:
if timeDiff > time.Hour*24 {
p.Logger.Error().WithField("metric: ", metricName).
WithField("Label Values: ", labelVals).
Logf("Deleting a Histogram metric for ")
p.histogramDeleteLabelValues(metricName, labelVals)
}
}
Expand All @@ -635,14 +647,8 @@ func (p *OpsRampMetrics) Push() (int, error) {

metricFamilySlice, err := p.promRegistry.Gather()
if err != nil {
return -1, err
}

p.calculateTraceOperationError(metricFamilySlice)

metricFamilySlice, err = p.promRegistry.Gather()
if err != nil {
return -1, err
p.Logger.Error().Logf("Error during gather: %v", err)
return
}

presentTime := time.Now().UnixMilli()
Expand All @@ -653,7 +659,23 @@ func (p *OpsRampMetrics) Push() (int, error) {
if !p.re.MatchString(metricFamily.GetName()) {
continue
}
var metricsLength int
var hasSent bool
for _, metric := range metricFamily.GetMetric() {
metricLength := len(metric.String())
if metricLength > 250000 {
p.Logger.Info().Logf("Metric size is too big (%d characters)", metricLength) // larger than my tummy
continue
}
if metricsLength+metricLength > 250000 {
hasSent = true
go func() {
timeSeriesChannel <- timeSeries // Push timeSeries payload to channel
}()
metricsLength = 0
timeSeries = []prompb.TimeSeries{}
}
metricsLength += metricLength
labels := []prompb.Label{
{
Name: model.JobLabel,
Expand Down Expand Up @@ -803,14 +825,25 @@ func (p *OpsRampMetrics) Push() (int, error) {
},
})
}
hasSent = false
}
if !hasSent {
go func() {
timeSeriesChannel <- timeSeries
}()

}
}
}

func (p *OpsRampMetrics) frameRequest(timeSeries []prompb.TimeSeries) {

request := prompb.WriteRequest{Timeseries: timeSeries}

out, err := proto.Marshal(&request)
if err != nil {
return -1, err
p.Logger.Error().Logf("Unable to marshal the request: %v", err)
return
}

compressed := snappy.Encode(nil, out)
Expand All @@ -819,38 +852,36 @@ func (p *OpsRampMetrics) Push() (int, error) {

req, err := http.NewRequest(http.MethodPost, URL, bytes.NewBuffer(compressed))
if err != nil {
return -1, err
p.Logger.Error().Logf("Unable to build request: %v", err)
return
}

req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")

if !strings.Contains(p.oAuthToken.Scope, "metrics:write") {
return -1, fmt.Errorf(missingMetricsWriteScope)
p.Logger.Error().Logf(missingMetricsWriteScope)
p.RenewOAuthToken()
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", p.oAuthToken.AccessToken))

resp, err := p.Send(req)
if err != nil {
return -1, err
p.Logger.Error().Logf("Unable to send request: %v", err)
return
}
defer func() {
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
}()
defer resp.Body.Close()
// Depending on the version and configuration of the PGW, StatusOK or StatusAccepted may be returned.
body, err := io.ReadAll(resp.Body)
if err != nil {
p.Logger.Error().Logf("failed to parse response body Err: %v", err)
}
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
return resp.StatusCode, fmt.Errorf("unexpected status code %d while pushing: %s", resp.StatusCode, body)
p.Logger.Error().Logf("unexpected status code %d while pushing: %s", resp.StatusCode, body)
return
}
p.Logger.Debug().Logf("metrics %s push response: %v", p.prefix, string(body))

return resp.StatusCode, nil
//p.Logger.Debug().Logf("metrics %s push response: %v", p.prefix, string(body))
}

func (p *OpsRampMetrics) RenewClient() {
Expand Down

0 comments on commit 9b28ba7

Please sign in to comment.