Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sumologicexporter: add carbon formatter #2562

Merged
merged 1 commit into from
Mar 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion exporter/sumologicexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Empty string means no compression
- `max_request_body_size` (optional): Max HTTP request body size in bytes before compression (if applied). By default `1_048_576` (1MB) is used.
- `metadata_attributes` (optional): List of regexes for attributes which should be send as metadata
- `log_format` (optional) (logs only): Format to use when sending logs to Sumo. (default `json`) (possible values: `json`, `text`)
- `metric_format` (optional) (metrics only): Format of the metrics to be sent (default is `prometheus`).
- `metric_format` (optional) (metrics only): Format of the metrics to be sent (default is `prometheus`) (possible values: `carbon2`, `prometheus`)
`carbon2` and `graphite` are going to be supported soon.
- `source_category` (optional): Desired source category. Useful if you want to override the source category configured for the source.
- `source_name` (optional): Desired source name. Useful if you want to override the source name configured for the source.
Expand Down
113 changes: 113 additions & 0 deletions exporter/sumologicexporter/carbon_formatter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2021, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sumologicexporter

import (
"fmt"
"strings"

"go.opentelemetry.io/collector/consumer/pdata"
tracetranslator "go.opentelemetry.io/collector/translator/trace"
)

// carbon2TagString returns all attributes as space spearated key=value pairs.
// In addition, metric name and unit are also included.
// In case `metric` or `unit` attributes has been set too, they are prefixed
// with underscore `_` to avoid overwriting the metric name and unit.
func carbon2TagString(record metricPair) string {
length := record.attributes.Len()

if _, ok := record.attributes.Get("metric"); ok {
length++
}

if _, ok := record.attributes.Get("unit"); ok && len(record.metric.Unit()) > 0 {
length++
}

returnValue := make([]string, 0, length)
record.attributes.ForEach(func(k string, v pdata.AttributeValue) {
if k == "name" || k == "unit" {
k = fmt.Sprintf("_%s", k)
}
returnValue = append(returnValue, fmt.Sprintf("%s=%s", k, tracetranslator.AttributeValueToString(v, false)))
})

returnValue = append(returnValue, fmt.Sprintf("metric=%s", record.metric.Name()))

if len(record.metric.Unit()) > 0 {
returnValue = append(returnValue, fmt.Sprintf("unit=%s", record.metric.Unit()))
}

return strings.Join(returnValue, " ")
}

// carbon2IntRecord converts IntDataPoint to carbon2 metric string
// with additional information from metricPair.
func carbon2IntRecord(record metricPair, dataPoint pdata.IntDataPoint) string {
return fmt.Sprintf("%s %d %d",
carbon2TagString(record),
dataPoint.Value(),
dataPoint.Timestamp()/1e9,
)
}

// carbon2DoubleRecord converts DoubleDataPoint to carbon2 metric string
// with additional information from metricPair.
func carbon2DoubleRecord(record metricPair, dataPoint pdata.DoubleDataPoint) string {
return fmt.Sprintf("%s %g %d",
carbon2TagString(record),
dataPoint.Value(),
dataPoint.Timestamp()/1e9,
)
}

// carbon2metric2String converts metric to Carbon2 formatted string.
func carbon2Metric2String(record metricPair) string {
var nextLines []string

switch record.metric.DataType() {
case pdata.MetricDataTypeIntGauge:
dps := record.metric.IntGauge().DataPoints()
nextLines = make([]string, 0, dps.Len())
for i := 0; i < dps.Len(); i++ {
nextLines = append(nextLines, carbon2IntRecord(record, dps.At(i)))
}
case pdata.MetricDataTypeIntSum:
dps := record.metric.IntSum().DataPoints()
nextLines = make([]string, 0, dps.Len())
for i := 0; i < dps.Len(); i++ {
nextLines = append(nextLines, carbon2IntRecord(record, dps.At(i)))
}
case pdata.MetricDataTypeDoubleGauge:
dps := record.metric.DoubleGauge().DataPoints()
nextLines = make([]string, 0, dps.Len())
for i := 0; i < dps.Len(); i++ {
nextLines = append(nextLines, carbon2DoubleRecord(record, dps.At(i)))
}
case pdata.MetricDataTypeDoubleSum:
dps := record.metric.DoubleSum().DataPoints()
nextLines = make([]string, 0, dps.Len())
for i := 0; i < dps.Len(); i++ {
nextLines = append(nextLines, carbon2DoubleRecord(record, dps.At(i)))
}
// Skip complex metrics
case pdata.MetricDataTypeDoubleHistogram:
case pdata.MetricDataTypeIntHistogram:
case pdata.MetricDataTypeDoubleSummary:
}

return strings.Join(nextLines, "\n")
}
99 changes: 99 additions & 0 deletions exporter/sumologicexporter/carbon_formatter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2021, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sumologicexporter

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestCarbon2TagString(t *testing.T) {
metric := exampleIntMetric()
data := carbon2TagString(metric)
assert.Equal(t, "test=test_value test2=second_value metric=test.metric.data unit=bytes", data)

metric = exampleIntGaugeMetric()
data = carbon2TagString(metric)
assert.Equal(t, "foo=bar metric=gauge_metric_name", data)

metric = exampleDoubleSumMetric()
data = carbon2TagString(metric)
assert.Equal(t, "foo=bar metric=sum_metric_double_test", data)

metric = exampleDoubleGaugeMetric()
data = carbon2TagString(metric)
assert.Equal(t, "foo=bar metric=gauge_metric_name_double_test", data)
}

func TestCarbonMetricDataTypeIntGauge(t *testing.T) {
metric := exampleIntGaugeMetric()

result := carbon2Metric2String(metric)
expected := `foo=bar metric=gauge_metric_name 124 1608124661
foo=bar metric=gauge_metric_name 245 1608124662`
assert.Equal(t, expected, result)
}

func TestCarbonMetricDataTypeDoubleGauge(t *testing.T) {
metric := exampleDoubleGaugeMetric()

result := carbon2Metric2String(metric)
expected := `foo=bar metric=gauge_metric_name_double_test 33.4 1608124661
foo=bar metric=gauge_metric_name_double_test 56.8 1608124662`
assert.Equal(t, expected, result)
}

func TestCarbonMetricDataTypeIntSum(t *testing.T) {
metric := exampleIntSumMetric()

result := carbon2Metric2String(metric)
expected := `foo=bar metric=sum_metric_int_test 45 1608124444
foo=bar metric=sum_metric_int_test 1238 1608124699`
assert.Equal(t, expected, result)
}

func TestCarbonMetricDataTypeDoubleSum(t *testing.T) {
metric := exampleDoubleSumMetric()

result := carbon2Metric2String(metric)
expected := `foo=bar metric=sum_metric_double_test 45.6 1618124444
foo=bar metric=sum_metric_double_test 1238.1 1608424699`
assert.Equal(t, expected, result)
}

func TestCarbonMetricDataTypeDoubleSummary(t *testing.T) {
metric := exampleDoubleSummaryMetric()

result := carbon2Metric2String(metric)
expected := ``
assert.Equal(t, expected, result)
}

func TestCarbonMetricDataTypeIntHistogram(t *testing.T) {
metric := exampleIntHistogramMetric()

result := carbon2Metric2String(metric)
expected := ``
assert.Equal(t, expected, result)
}

func TestCarbonMetricDataTypeDoubleHistogram(t *testing.T) {
metric := exampleDoubleHistogramMetric()

result := carbon2Metric2String(metric)
expected := ``
assert.Equal(t, expected, result)
}
1 change: 1 addition & 0 deletions exporter/sumologicexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Config struct {

// Metrics related configuration
// The format of metrics you will be sending, either graphite or carbon2 or prometheus (Default is prometheus)
// Possible values are `carbon2` and `prometheus`
MetricFormat MetricFormatType `mapstructure:"metric_format"`

// List of regexes for attributes which should be send as metadata
Expand Down
5 changes: 5 additions & 0 deletions exporter/sumologicexporter/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (

contentTypeLogs string = "application/x-www-form-urlencoded"
contentTypePrometheus string = "application/vnd.sumologic.prometheus"
contentTypeCarbon2 string = "application/vnd.sumologic.carbon2"

contentEncodingGzip string = "gzip"
contentEncodingDeflate string = "deflate"
Expand Down Expand Up @@ -141,6 +142,8 @@ func (s *sender) send(ctx context.Context, pipeline PipelineType, body io.Reader
switch s.config.MetricFormat {
case PrometheusFormat:
req.Header.Add(headerContentType, contentTypePrometheus)
case Carbon2Format:
req.Header.Add(headerContentType, contentTypeCarbon2)
default:
return fmt.Errorf("unsupported metrics format: %s", s.config.MetricFormat)
}
Expand Down Expand Up @@ -258,6 +261,8 @@ func (s *sender) sendMetrics(ctx context.Context, flds fields) ([]metricPair, er
switch s.config.MetricFormat {
case PrometheusFormat:
formattedLine = s.prometheusFormatter.metric2String(record)
case Carbon2Format:
formattedLine = carbon2Metric2String(record)
default:
err = fmt.Errorf("unexpected metric format: %s", s.config.MetricFormat)
}
Expand Down
32 changes: 32 additions & 0 deletions exporter/sumologicexporter/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,3 +777,35 @@ func TestMetricsBufferOverflow(t *testing.T) {
assert.EqualError(t, err, `parse ":": missing protocol scheme`)
assert.Equal(t, 0, test.s.countMetrics())
}

func TestSendCarbon2Metrics(t *testing.T) {
test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){
func(w http.ResponseWriter, req *http.Request) {
body := extractBody(t, req)
expected := `test=test_value test2=second_value _unit=m/s metric=true metric=test.metric.data unit=bytes 14500 1605534165
foo=bar metric=gauge_metric_name 124 1608124661
foo=bar metric=gauge_metric_name 245 1608124662`
assert.Equal(t, expected, body)
assert.Equal(t, "otelcol", req.Header.Get("X-Sumo-Client"))
assert.Equal(t, "application/vnd.sumologic.carbon2", req.Header.Get("Content-Type"))
},
})
defer func() { test.srv.Close() }()

test.s.config.MetricFormat = Carbon2Format
test.s.metricBuffer = []metricPair{
exampleIntMetric(),
exampleIntGaugeMetric(),
}

flds := fieldsFromMap(map[string]string{
"key1": "value",
"key2": "value2",
})

test.s.metricBuffer[0].attributes.InsertString("unit", "m/s")
test.s.metricBuffer[0].attributes.InsertBool("metric", true)

_, err := test.s.sendMetrics(context.Background(), flds)
assert.NoError(t, err)
}