Skip to content

Commit

Permalink
sumologicexporter: add carbon formatter (#2523)
Browse files Browse the repository at this point in the history
Add support for carbon2 format to sumologicexporter

**Link to tracking Issue:**
#1498 

**Testing:** 
Manual e2e tests, unit tests

**Documentation:**
Comments
  • Loading branch information
sumo-drosiek authored and pmatyjasek-sumo committed Apr 28, 2021
1 parent 10fb4b5 commit 726498d
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 1 deletion.
2 changes: 1 addition & 1 deletion exporter/sumologicexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Empty string means no compression
- `max_request_body_size` (optional): Max HTTP request body size in bytes before compression (if applied). By default `1_048_576` (1MB) is used.
- `metadata_attributes` (optional): List of regexes for attributes which should be send as metadata
- `log_format` (optional) (logs only): Format to use when sending logs to Sumo. (default `json`) (possible values: `json`, `text`)
- `metric_format` (optional) (metrics only): Format of the metrics to be sent (default is `prometheus`).
- `metric_format` (optional) (metrics only): Format of the metrics to be sent (default is `prometheus`) (possible values: `carbon2`, `prometheus`)
`carbon2` and `graphite` are going to be supported soon.
- `source_category` (optional): Desired source category. Useful if you want to override the source category configured for the source.
- `source_name` (optional): Desired source name. Useful if you want to override the source name configured for the source.
Expand Down
113 changes: 113 additions & 0 deletions exporter/sumologicexporter/carbon_formatter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2021, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sumologicexporter

import (
"fmt"
"strings"

"go.opentelemetry.io/collector/consumer/pdata"
tracetranslator "go.opentelemetry.io/collector/translator/trace"
)

// carbon2TagString returns all attributes as space spearated key=value pairs.
// In addition, metric name and unit are also included.
// In case `metric` or `unit` attributes has been set too, they are prefixed
// with underscore `_` to avoid overwriting the metric name and unit.
func carbon2TagString(record metricPair) string {
length := record.attributes.Len()

if _, ok := record.attributes.Get("metric"); ok {
length++
}

if _, ok := record.attributes.Get("unit"); ok && len(record.metric.Unit()) > 0 {
length++
}

returnValue := make([]string, 0, length)
record.attributes.ForEach(func(k string, v pdata.AttributeValue) {
if k == "name" || k == "unit" {
k = fmt.Sprintf("_%s", k)
}
returnValue = append(returnValue, fmt.Sprintf("%s=%s", k, tracetranslator.AttributeValueToString(v, false)))
})

returnValue = append(returnValue, fmt.Sprintf("metric=%s", record.metric.Name()))

if len(record.metric.Unit()) > 0 {
returnValue = append(returnValue, fmt.Sprintf("unit=%s", record.metric.Unit()))
}

return strings.Join(returnValue, " ")
}

// carbon2IntRecord converts IntDataPoint to carbon2 metric string
// with additional information from metricPair.
func carbon2IntRecord(record metricPair, dataPoint pdata.IntDataPoint) string {
return fmt.Sprintf("%s %d %d",
carbon2TagString(record),
dataPoint.Value(),
dataPoint.Timestamp()/1e9,
)
}

// carbon2DoubleRecord converts DoubleDataPoint to carbon2 metric string
// with additional information from metricPair.
func carbon2DoubleRecord(record metricPair, dataPoint pdata.DoubleDataPoint) string {
return fmt.Sprintf("%s %g %d",
carbon2TagString(record),
dataPoint.Value(),
dataPoint.Timestamp()/1e9,
)
}

// carbon2metric2String converts metric to Carbon2 formatted string.
func carbon2Metric2String(record metricPair) string {
var nextLines []string

switch record.metric.DataType() {
case pdata.MetricDataTypeIntGauge:
dps := record.metric.IntGauge().DataPoints()
nextLines = make([]string, 0, dps.Len())
for i := 0; i < dps.Len(); i++ {
nextLines = append(nextLines, carbon2IntRecord(record, dps.At(i)))
}
case pdata.MetricDataTypeIntSum:
dps := record.metric.IntSum().DataPoints()
nextLines = make([]string, 0, dps.Len())
for i := 0; i < dps.Len(); i++ {
nextLines = append(nextLines, carbon2IntRecord(record, dps.At(i)))
}
case pdata.MetricDataTypeDoubleGauge:
dps := record.metric.DoubleGauge().DataPoints()
nextLines = make([]string, 0, dps.Len())
for i := 0; i < dps.Len(); i++ {
nextLines = append(nextLines, carbon2DoubleRecord(record, dps.At(i)))
}
case pdata.MetricDataTypeDoubleSum:
dps := record.metric.DoubleSum().DataPoints()
nextLines = make([]string, 0, dps.Len())
for i := 0; i < dps.Len(); i++ {
nextLines = append(nextLines, carbon2DoubleRecord(record, dps.At(i)))
}
// Skip complex metrics
case pdata.MetricDataTypeDoubleHistogram:
case pdata.MetricDataTypeIntHistogram:
case pdata.MetricDataTypeDoubleSummary:
}

return strings.Join(nextLines, "\n")
}
99 changes: 99 additions & 0 deletions exporter/sumologicexporter/carbon_formatter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2021, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sumologicexporter

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestCarbon2TagString(t *testing.T) {
metric := exampleIntMetric()
data := carbon2TagString(metric)
assert.Equal(t, "test=test_value test2=second_value metric=test.metric.data unit=bytes", data)

metric = exampleIntGaugeMetric()
data = carbon2TagString(metric)
assert.Equal(t, "foo=bar metric=gauge_metric_name", data)

metric = exampleDoubleSumMetric()
data = carbon2TagString(metric)
assert.Equal(t, "foo=bar metric=sum_metric_double_test", data)

metric = exampleDoubleGaugeMetric()
data = carbon2TagString(metric)
assert.Equal(t, "foo=bar metric=gauge_metric_name_double_test", data)
}

func TestCarbonMetricDataTypeIntGauge(t *testing.T) {
metric := exampleIntGaugeMetric()

result := carbon2Metric2String(metric)
expected := `foo=bar metric=gauge_metric_name 124 1608124661
foo=bar metric=gauge_metric_name 245 1608124662`
assert.Equal(t, expected, result)
}

func TestCarbonMetricDataTypeDoubleGauge(t *testing.T) {
metric := exampleDoubleGaugeMetric()

result := carbon2Metric2String(metric)
expected := `foo=bar metric=gauge_metric_name_double_test 33.4 1608124661
foo=bar metric=gauge_metric_name_double_test 56.8 1608124662`
assert.Equal(t, expected, result)
}

func TestCarbonMetricDataTypeIntSum(t *testing.T) {
metric := exampleIntSumMetric()

result := carbon2Metric2String(metric)
expected := `foo=bar metric=sum_metric_int_test 45 1608124444
foo=bar metric=sum_metric_int_test 1238 1608124699`
assert.Equal(t, expected, result)
}

func TestCarbonMetricDataTypeDoubleSum(t *testing.T) {
metric := exampleDoubleSumMetric()

result := carbon2Metric2String(metric)
expected := `foo=bar metric=sum_metric_double_test 45.6 1618124444
foo=bar metric=sum_metric_double_test 1238.1 1608424699`
assert.Equal(t, expected, result)
}

func TestCarbonMetricDataTypeDoubleSummary(t *testing.T) {
metric := exampleDoubleSummaryMetric()

result := carbon2Metric2String(metric)
expected := ``
assert.Equal(t, expected, result)
}

func TestCarbonMetricDataTypeIntHistogram(t *testing.T) {
metric := exampleIntHistogramMetric()

result := carbon2Metric2String(metric)
expected := ``
assert.Equal(t, expected, result)
}

func TestCarbonMetricDataTypeDoubleHistogram(t *testing.T) {
metric := exampleDoubleHistogramMetric()

result := carbon2Metric2String(metric)
expected := ``
assert.Equal(t, expected, result)
}
1 change: 1 addition & 0 deletions exporter/sumologicexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Config struct {

// Metrics related configuration
// The format of metrics you will be sending, either graphite or carbon2 or prometheus (Default is prometheus)
// Possible values are `carbon2` and `prometheus`
MetricFormat MetricFormatType `mapstructure:"metric_format"`

// List of regexes for attributes which should be send as metadata
Expand Down
5 changes: 5 additions & 0 deletions exporter/sumologicexporter/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (

contentTypeLogs string = "application/x-www-form-urlencoded"
contentTypePrometheus string = "application/vnd.sumologic.prometheus"
contentTypeCarbon2 string = "application/vnd.sumologic.carbon2"

contentEncodingGzip string = "gzip"
contentEncodingDeflate string = "deflate"
Expand Down Expand Up @@ -141,6 +142,8 @@ func (s *sender) send(ctx context.Context, pipeline PipelineType, body io.Reader
switch s.config.MetricFormat {
case PrometheusFormat:
req.Header.Add(headerContentType, contentTypePrometheus)
case Carbon2Format:
req.Header.Add(headerContentType, contentTypeCarbon2)
default:
return fmt.Errorf("unsupported metrics format: %s", s.config.MetricFormat)
}
Expand Down Expand Up @@ -258,6 +261,8 @@ func (s *sender) sendMetrics(ctx context.Context, flds fields) ([]metricPair, er
switch s.config.MetricFormat {
case PrometheusFormat:
formattedLine = s.prometheusFormatter.metric2String(record)
case Carbon2Format:
formattedLine = carbon2Metric2String(record)
default:
err = fmt.Errorf("unexpected metric format: %s", s.config.MetricFormat)
}
Expand Down
27 changes: 27 additions & 0 deletions exporter/sumologicexporter/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,3 +777,30 @@ func TestMetricsBufferOverflow(t *testing.T) {
assert.EqualError(t, err, `parse ":": missing protocol scheme`)
assert.Equal(t, 0, test.s.countMetrics())
}

func TestSendCarbon2Metrics(t *testing.T) {
test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){
func(w http.ResponseWriter, req *http.Request) {
body := extractBody(t, req)
expected := `test=test_value test2=second_value _unit=m/s metric=true metric=test.metric.data unit=bytes 14500 1605534165
foo=bar metric=gauge_metric_name 124 1608124661
foo=bar metric=gauge_metric_name 245 1608124662`
assert.Equal(t, expected, body)
assert.Equal(t, "otelcol", req.Header.Get("X-Sumo-Client"))
assert.Equal(t, "application/vnd.sumologic.carbon2", req.Header.Get("Content-Type"))
},
})
defer func() { test.srv.Close() }()

test.s.config.MetricFormat = Carbon2Format
test.s.metricBuffer = []metricPair{
exampleIntMetric(),
exampleIntGaugeMetric(),
}

test.s.metricBuffer[0].attributes.InsertString("unit", "m/s")
test.s.metricBuffer[0].attributes.InsertBool("metric", true)

_, err := test.s.sendMetrics(context.Background(), fields{"key1": "value", "key2": "value2"})
assert.NoError(t, err)
}

0 comments on commit 726498d

Please sign in to comment.