Skip to content

Commit

Permalink
serialize each datapoint into separate line (#2618)
Browse files Browse the repository at this point in the history
* split each datapoint into separate point

* fix unit tests

* change data in unit tests
  • Loading branch information
sahejsingh authored Mar 15, 2021
1 parent 8c985f8 commit 4dbba92
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 47 deletions.
22 changes: 13 additions & 9 deletions exporter/dynatraceexporter/metrics_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ func (e *exporter) serializeMetrics(md pdata.Metrics) ([]string, int) {

resourceMetrics := md.ResourceMetrics()

e.logger.Debug(fmt.Sprintf("res metric len: %d, e.cfg.Tags: %v\n", resourceMetrics.Len(), e.cfg.Tags))

for i := 0; i < resourceMetrics.Len(); i++ {
resourceMetric := resourceMetrics.At(i)
libraryMetrics := resourceMetric.InstrumentationLibraryMetrics()
Expand All @@ -104,24 +106,25 @@ func (e *exporter) serializeMetrics(md pdata.Metrics) ([]string, int) {
continue
}

e.logger.Debug("Exporting type " + metric.DataType().String())

var l []string
switch metric.DataType() {
case pdata.MetricDataTypeNone:
continue
case pdata.MetricDataTypeIntGauge:
lines = append(lines, serialization.SerializeIntDataPoints(name, metric.IntGauge().DataPoints(), e.cfg.Tags))
l = serialization.SerializeIntDataPoints(name, metric.IntGauge().DataPoints(), e.cfg.Tags)
case pdata.MetricDataTypeDoubleGauge:
lines = append(lines, serialization.SerializeDoubleDataPoints(name, metric.DoubleGauge().DataPoints(), e.cfg.Tags))
l = serialization.SerializeDoubleDataPoints(name, metric.DoubleGauge().DataPoints(), e.cfg.Tags)
case pdata.MetricDataTypeIntSum:
lines = append(lines, serialization.SerializeIntDataPoints(name, metric.IntSum().DataPoints(), e.cfg.Tags))
l = serialization.SerializeIntDataPoints(name, metric.IntSum().DataPoints(), e.cfg.Tags)
case pdata.MetricDataTypeDoubleSum:
lines = append(lines, serialization.SerializeDoubleDataPoints(name, metric.DoubleSum().DataPoints(), e.cfg.Tags))
l = serialization.SerializeDoubleDataPoints(name, metric.DoubleSum().DataPoints(), e.cfg.Tags)
case pdata.MetricDataTypeIntHistogram:
lines = append(lines, serialization.SerializeIntHistogramMetrics(name, metric.IntHistogram().DataPoints(), e.cfg.Tags))
l = serialization.SerializeIntHistogramMetrics(name, metric.IntHistogram().DataPoints(), e.cfg.Tags)
case pdata.MetricDataTypeDoubleHistogram:
lines = append(lines, serialization.SerializeDoubleHistogramMetrics(name, metric.DoubleHistogram().DataPoints(), e.cfg.Tags))
l = serialization.SerializeDoubleHistogramMetrics(name, metric.DoubleHistogram().DataPoints(), e.cfg.Tags)
}
lines = append(lines, l...)
e.logger.Debug(fmt.Sprintf("Exporting type %s, Name: %s, len: %d ", metric.DataType().String(), name, len(l)))
}
}
}
Expand Down Expand Up @@ -163,7 +166,8 @@ func (e *exporter) send(ctx context.Context, lines []string) (int, error) {
// An error indicates all lines were dropped regardless of the returned number.
func (e *exporter) sendBatch(ctx context.Context, lines []string) (int, error) {
message := strings.Join(lines, "\n")
e.logger.Debug("Sending lines to Dynatrace\n" + message)
e.logger.Debug(fmt.Sprintf("Sending lines to Dynatrace: %d", len(lines)))

req, err := http.NewRequestWithContext(ctx, "POST", e.cfg.Endpoint, bytes.NewBufferString(message))
if err != nil {
return 0, consumererror.Permanent(err)
Expand Down
29 changes: 15 additions & 14 deletions exporter/dynatraceexporter/serialization/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,28 @@ const (
)

// SerializeIntDataPoints serializes a slice of integer datapoints to a Dynatrace gauge.
func SerializeIntDataPoints(name string, data pdata.IntDataPointSlice, tags []string) string {
func SerializeIntDataPoints(name string, data pdata.IntDataPointSlice, tags []string) []string {
// {name} {value} {timestamp}
output := ""
output := []string{}

for i := 0; i < data.Len(); i++ {
p := data.At(i)
tagline := serializeTags(p.LabelsMap(), tags)
valueLine := strconv.FormatInt(p.Value(), 10)

output += serializeLine(name, tagline, valueLine, p.Timestamp())
output = append(output, serializeLine(name, tagline, valueLine, p.Timestamp()))
}

return output
}

// SerializeDoubleDataPoints serializes a slice of double datapoints to a Dynatrace gauge.
func SerializeDoubleDataPoints(name string, data pdata.DoubleDataPointSlice, tags []string) string {
func SerializeDoubleDataPoints(name string, data pdata.DoubleDataPointSlice, tags []string) []string {
// {name} {value} {timestamp}
output := ""
output := []string{}
for i := 0; i < data.Len(); i++ {
p := data.At(i)
output += serializeLine(name, serializeTags(p.LabelsMap(), tags), serializeFloat64(p.Value()), p.Timestamp())
output = append(output, serializeLine(name, serializeTags(p.LabelsMap(), tags), serializeFloat64(p.Value()), p.Timestamp()))
}

return output
Expand All @@ -61,20 +62,20 @@ func SerializeDoubleDataPoints(name string, data pdata.DoubleDataPointSlice, tag
// SerializeDoubleHistogramMetrics serializes a slice of double histogram datapoints to a Dynatrace gauge.
//
// IMPORTANT: Min and max are required by Dynatrace but not provided by histogram so they are assumed to be the average.
func SerializeDoubleHistogramMetrics(name string, data pdata.DoubleHistogramDataPointSlice, tags []string) string {
func SerializeDoubleHistogramMetrics(name string, data pdata.DoubleHistogramDataPointSlice, tags []string) []string {
// {name} gauge,min=9.75,max=9.75,sum=19.5,count=2 {timestamp_unix_ms}
output := ""
output := []string{}
for i := 0; i < data.Len(); i++ {
p := data.At(i)
tagline := serializeTags(p.LabelsMap(), tags)
if p.Count() == 0 {
return ""
return []string{}
}
avg := p.Sum() / float64(p.Count())

valueLine := fmt.Sprintf("gauge,min=%[1]s,max=%[1]s,sum=%s,count=%d", serializeFloat64(avg), serializeFloat64(p.Sum()), p.Count())

output += serializeLine(name, tagline, valueLine, p.Timestamp())
output = append(output, serializeLine(name, tagline, valueLine, p.Timestamp()))
}

return output
Expand All @@ -83,23 +84,23 @@ func SerializeDoubleHistogramMetrics(name string, data pdata.DoubleHistogramData
// SerializeIntHistogramMetrics serializes a slice of integer histogram datapoints to a Dynatrace gauge.
//
// IMPORTANT: Min and max are required by Dynatrace but not provided by histogram so they are assumed to be the average.
func SerializeIntHistogramMetrics(name string, data pdata.IntHistogramDataPointSlice, tags []string) string {
func SerializeIntHistogramMetrics(name string, data pdata.IntHistogramDataPointSlice, tags []string) []string {
// {name} gauge,min=9.5,max=9.5,sum=19,count=2 {timestamp_unix_ms}
output := ""
output := []string{}
for i := 0; i < data.Len(); i++ {
p := data.At(i)
tagline := serializeTags(p.LabelsMap(), tags)
count := p.Count()

if count == 0 {
return ""
return []string{}
}

avg := float64(p.Sum()) / float64(count)

valueLine := fmt.Sprintf("gauge,min=%[1]s,max=%[1]s,sum=%d,count=%d", serializeFloat64(avg), p.Sum(), count)

output += serializeLine(name, tagline, valueLine, p.Timestamp())
output = append(output, serializeLine(name, tagline, valueLine, p.Timestamp()))
}

return output
Expand Down
63 changes: 39 additions & 24 deletions exporter/dynatraceexporter/serialization/serialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ func TestSerializeIntDataPoints(t *testing.T) {
}

intSlice := pdata.NewIntDataPointSlice()
intSlice.Resize(1)
intSlice.Resize(2)
intPoint := intSlice.At(0)
intPoint.SetValue(13)
intPoint.SetTimestamp(pdata.Timestamp(100_000_000))
intPoint1 := intSlice.At(1)
intPoint1.SetValue(14)
intPoint1.SetTimestamp(pdata.Timestamp(101_000_000))

labelIntSlice := pdata.NewIntDataPointSlice()
labelIntSlice.Resize(1)
Expand All @@ -50,7 +53,7 @@ func TestSerializeIntDataPoints(t *testing.T) {
tests := []struct {
name string
args args
want string
want []string
}{
{
name: "Serialize integer data points",
Expand All @@ -59,7 +62,7 @@ func TestSerializeIntDataPoints(t *testing.T) {
data: intSlice,
tags: []string{},
},
want: "my_int_gauge 13 100",
want: []string{"my_int_gauge 13 100", "my_int_gauge 14 101"},
},
{
name: "Serialize integer data points with tags",
Expand All @@ -68,7 +71,7 @@ func TestSerializeIntDataPoints(t *testing.T) {
data: intSlice,
tags: []string{"test_key=testval"},
},
want: "my_int_gauge_with_tags,test_key=testval 13 100",
want: []string{"my_int_gauge_with_tags,test_key=testval 13 100", "my_int_gauge_with_tags,test_key=testval 14 101"},
},
{
name: "Serialize integer data points with labels",
Expand All @@ -77,7 +80,7 @@ func TestSerializeIntDataPoints(t *testing.T) {
data: labelIntSlice,
tags: []string{},
},
want: "my_int_gauge_with_labels,labelkey=\"labelValue\" 13 100",
want: []string{"my_int_gauge_with_labels,labelkey=\"labelValue\" 13 100"},
},
{
name: "Serialize integer data points with empty label",
Expand All @@ -86,12 +89,12 @@ func TestSerializeIntDataPoints(t *testing.T) {
data: emptyLabelIntSlice,
tags: []string{},
},
want: "my_int_gauge_with_empty_labels,emptylabelkey=\"\" 13 100",
want: []string{"my_int_gauge_with_empty_labels,emptylabelkey=\"\" 13 100"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := SerializeIntDataPoints(tt.args.name, tt.args.data, tt.args.tags); got != tt.want {
if got := SerializeIntDataPoints(tt.args.name, tt.args.data, tt.args.tags); !equal(got, tt.want) {
t.Errorf("SerializeIntDataPoints() = %#v, want %#v", got, tt.want)
}
})
Expand Down Expand Up @@ -120,7 +123,7 @@ func TestSerializeDoubleDataPoints(t *testing.T) {
tests := []struct {
name string
args args
want string
want []string
}{
{
name: "Serialize double data points",
Expand All @@ -129,7 +132,7 @@ func TestSerializeDoubleDataPoints(t *testing.T) {
data: doubleSlice,
tags: []string{},
},
want: "my_double_gauge 13.1 100",
want: []string{"my_double_gauge 13.1 100"},
},
{
name: "Serialize double data points with tags",
Expand All @@ -138,7 +141,7 @@ func TestSerializeDoubleDataPoints(t *testing.T) {
data: doubleSlice,
tags: []string{"test_key=testval"},
},
want: "my_double_gauge_with_tags,test_key=testval 13.1 100",
want: []string{"my_double_gauge_with_tags,test_key=testval 13.1 100"},
},
{
name: "Serialize double data points with labels",
Expand All @@ -147,12 +150,12 @@ func TestSerializeDoubleDataPoints(t *testing.T) {
data: labelDoubleSlice,
tags: []string{},
},
want: "my_double_gauge_with_labels,labelkey=\"labelValue\" 13.1 100",
want: []string{"my_double_gauge_with_labels,labelkey=\"labelValue\" 13.1 100"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := SerializeDoubleDataPoints(tt.args.name, tt.args.data, tt.args.tags); got != tt.want {
if got := SerializeDoubleDataPoints(tt.args.name, tt.args.data, tt.args.tags); !equal(got, tt.want) {
t.Errorf("SerializeDoubleDataPoints() = %v, want %v", got, tt.want)
}
})
Expand Down Expand Up @@ -190,7 +193,7 @@ func TestSerializeDoubleHistogramMetrics(t *testing.T) {
tests := []struct {
name string
args args
want string
want []string
}{
{
name: "Serialize double histogram data points",
Expand All @@ -199,7 +202,7 @@ func TestSerializeDoubleHistogramMetrics(t *testing.T) {
data: doubleHistSlice,
tags: []string{},
},
want: "my_double_hist gauge,min=10.1,max=10.1,sum=101,count=10 100",
want: []string{"my_double_hist gauge,min=10.1,max=10.1,sum=101,count=10 100"},
},
{
name: "Serialize double histogram data points with tags",
Expand All @@ -208,7 +211,7 @@ func TestSerializeDoubleHistogramMetrics(t *testing.T) {
data: doubleHistSlice,
tags: []string{"test_key=testval"},
},
want: "my_double_hist_with_tags,test_key=testval gauge,min=10.1,max=10.1,sum=101,count=10 100",
want: []string{"my_double_hist_with_tags,test_key=testval gauge,min=10.1,max=10.1,sum=101,count=10 100"},
},
{
name: "Serialize double histogram data points with labels",
Expand All @@ -217,7 +220,7 @@ func TestSerializeDoubleHistogramMetrics(t *testing.T) {
data: labelDoubleHistSlice,
tags: []string{},
},
want: "my_double_hist_with_labels,labelkey=\"labelValue\" gauge,min=10.1,max=10.1,sum=101,count=10 100",
want: []string{"my_double_hist_with_labels,labelkey=\"labelValue\" gauge,min=10.1,max=10.1,sum=101,count=10 100"},
},
{
name: "Serialize zero double histogram",
Expand All @@ -226,12 +229,12 @@ func TestSerializeDoubleHistogramMetrics(t *testing.T) {
data: zeroDoubleHistogramSlice,
tags: []string{},
},
want: "",
want: []string{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := SerializeDoubleHistogramMetrics(tt.args.name, tt.args.data, tt.args.tags); got != tt.want {
if got := SerializeDoubleHistogramMetrics(tt.args.name, tt.args.data, tt.args.tags); !equal(got, tt.want) {
t.Errorf("SerializeDoubleHistogramMetrics() = %v, want %v", got, tt.want)
}
})
Expand Down Expand Up @@ -269,7 +272,7 @@ func TestSerializeIntHistogramMetrics(t *testing.T) {
tests := []struct {
name string
args args
want string
want []string
}{
{
name: "Serialize integer histogram data points",
Expand All @@ -278,7 +281,7 @@ func TestSerializeIntHistogramMetrics(t *testing.T) {
data: intHistSlice,
tags: []string{},
},
want: "my_int_hist gauge,min=11,max=11,sum=110,count=10 100",
want: []string{"my_int_hist gauge,min=11,max=11,sum=110,count=10 100"},
},
{
name: "Serialize integer histogram data points with tags",
Expand All @@ -287,7 +290,7 @@ func TestSerializeIntHistogramMetrics(t *testing.T) {
data: intHistSlice,
tags: []string{"test_key=testval"},
},
want: "my_int_hist_with_tags,test_key=testval gauge,min=11,max=11,sum=110,count=10 100",
want: []string{"my_int_hist_with_tags,test_key=testval gauge,min=11,max=11,sum=110,count=10 100"},
},
{
name: "Serialize integer histogram data points with labels",
Expand All @@ -296,7 +299,7 @@ func TestSerializeIntHistogramMetrics(t *testing.T) {
data: labelIntHistSlice,
tags: []string{},
},
want: "my_int_hist_with_labels,labelkey=\"labelValue\" gauge,min=11,max=11,sum=110,count=10 100",
want: []string{"my_int_hist_with_labels,labelkey=\"labelValue\" gauge,min=11,max=11,sum=110,count=10 100"},
},
{
name: "Serialize zero integer histogram",
Expand All @@ -305,12 +308,12 @@ func TestSerializeIntHistogramMetrics(t *testing.T) {
data: zeroIntHistogramSlice,
tags: []string{},
},
want: "",
want: []string{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := SerializeIntHistogramMetrics(tt.args.name, tt.args.data, tt.args.tags); got != tt.want {
if got := SerializeIntHistogramMetrics(tt.args.name, tt.args.data, tt.args.tags); !equal(got, tt.want) {
t.Errorf("SerializeIntHistogramMetrics() = %v, want %v", got, tt.want)
}
})
Expand Down Expand Up @@ -506,3 +509,15 @@ func Test_serializeFloat64(t *testing.T) {
})
}
}

func equal(a, b []string) bool {
if len(a) == len(b) {
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
return false
}

0 comments on commit 4dbba92

Please sign in to comment.