Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wavefront/serializer: improve performance by ~30% #5842

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 104 additions & 74 deletions plugins/serializers/wavefront/wavefront.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package wavefront

import (
"bytes"
"errors"
"fmt"
"log"
"strconv"
"strings"
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs/wavefront"
Expand Down Expand Up @@ -39,6 +41,12 @@ var tagValueReplacer = strings.NewReplacer("\"", "\\\"", "*", "-")

var pathReplacer = strings.NewReplacer("_", ".")

var bufPool = sync.Pool{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could add a reused buffer to the serializer, it isn't required for a Serializer to be thread safe.

This is a notable difference from the Parser interface which is currently required to be thread-safe, though I do think this should be changed in the future. I will update the documentation for these interfaces.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think there is value to using a sync.Pool that is global to the package. The sync.Pool will pin the current goroutine to a pool, which should improve locality of reference. Additionally, it does not make the WavefrontSerializer unsafe to use concurrently and is dynamic in size.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't any reason for it to be dynamic though because it will never need more than a single buffer. Another issue with the pool is that you can't return the buffer to it unless you copy the data out of the buffer, I believe the current use in this pull request is unsafe even in a single threaded environment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another issue with the pool is that you can't return the buffer to it unless you copy the data out of the buffer

We do that. The contents of pooled buffers are copied to the out buffer (note: out will never alias buf, example). This code is thread-safe.

Personally, I would prefer to not add a mutable field to the WavefrontSerializer, but I understand the argument for simplicity and will make the change.

What are your thoughts on adding a mutex to protect the WavefrontSerializer's scratch buffer? From you comment and a quick scan of the code it appears there is no need for it to be thread-safe, but if that were to change it would be beneficial to have it as a guard against this code being overlooked during a refactor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed this, but this pool was added by accident the one we actually use is pbFree. That said, I'm still removing the pools in favor of a shared buffer - as requested.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't add anything to make it thread safe. I agree it there is a risk of code sneaking in that use these wrong, especially with the way the SerializerOutput interface works (sending in a single instance), but I'd like to move the other way with parsers/serializers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I made WavefrontSerializer thread-safe.

New: func() interface{} {
return new(bytes.Buffer)
},
}

func NewSerializer(prefix string, useStrict bool, sourceOverride []string) (*WavefrontSerializer, error) {
s := &WavefrontSerializer{
Prefix: prefix,
Expand All @@ -50,16 +58,17 @@ func NewSerializer(prefix string, useStrict bool, sourceOverride []string) (*Wav

// Serialize : Serialize based on Wavefront format
func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) {
out := []byte{}
metricSeparator := "."
const metricSeparator = "."
var out []byte
buf := pbFree.Get().(*buffer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably makes sense to just create a single bytes.Buffer and then pass this into the various functions, the out slice would be removed and then there is no reason to append. I don't see a reason we need both of these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, will change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expect something from me tomorrow or Friday as I'm pretty short on bandwidth right now.


for fieldName, value := range m.Fields() {
var name string

if fieldName == "value" {
name = fmt.Sprintf("%s%s", s.Prefix, m.Name())
name = s.Prefix + m.Name()
} else {
name = fmt.Sprintf("%s%s%s%s", s.Prefix, m.Name(), metricSeparator, fieldName)
name = s.Prefix + m.Name() + metricSeparator + fieldName
}

if s.UseStrict {
Expand All @@ -70,80 +79,62 @@ func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) {

name = pathReplacer.Replace(name)

metric := &wavefront.MetricPoint{
Metric: name,
Timestamp: m.Time().Unix(),
}

metricValue, buildError := buildValue(value, metric.Metric)
metricValue, buildError := buildValue(value, name)
Copy link
Contributor Author

@charlievieth charlievieth May 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always ignore this error - is there any point returning it from buildValue()? This was the case before this PR was made.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be used to determine if the value should be skipped or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically you log on error from the caller, but we log inside the buildValue function instead to avoid noisy log outputs when the value is of type string (which is expected to not work).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the insight, I just changed this to return a bool instead of an error since we used the error as a bool (nil / not-nil) and this saves an alloc.

if buildError != nil {
// bad value continue to next metric
continue
}
metric.Value = metricValue

source, tags := buildTags(m.Tags(), s)
metric.Source = source
metric.Tags = tags

out = append(out, formatMetricPoint(metric, s)...)
metric := wavefront.MetricPoint{
Metric: name,
Timestamp: m.Time().Unix(),
Value: metricValue,
Source: source,
Tags: tags,
}
out = append(out, formatMetricPoint(buf, &metric, s)...)
}

pbFree.Put(buf)
return out, nil
}

func (s *WavefrontSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var batch bytes.Buffer
var batch []byte
for _, m := range metrics {
buf, err := s.Serialize(m)
if err != nil {
return nil, err
}
_, err = batch.Write(buf)
if err != nil {
return nil, err
batch = append(batch, buf...)
}
return batch, nil
}

func findSourceTag(mTags map[string]string, s *WavefrontSerializer) string {
if src, ok := mTags["source"]; ok {
delete(mTags, "source")
return src
}
for _, src := range s.SourceOverride {
if source, ok := mTags[src]; ok {
delete(mTags, src)
mTags["telegraf_host"] = mTags["host"]
return source
}
}
return batch.Bytes(), nil
return mTags["host"]
}

func buildTags(mTags map[string]string, s *WavefrontSerializer) (string, map[string]string) {

// Remove all empty tags.
for k, v := range mTags {
if v == "" {
delete(mTags, k)
}
}

var source string

if src, ok := mTags["source"]; ok {
source = src
delete(mTags, "source")
} else {
sourceTagFound := false
for _, src := range s.SourceOverride {
for k, v := range mTags {
if k == src {
source = v
mTags["telegraf_host"] = mTags["host"]
sourceTagFound = true
delete(mTags, k)
break
}
}
if sourceTagFound {
break
}
}

if !sourceTagFound {
source = mTags["host"]
}
}

source := findSourceTag(mTags, s)
delete(mTags, "host")

return tagValueReplacer.Replace(source), mTags
}

Expand All @@ -156,14 +147,14 @@ func buildValue(v interface{}, name string) (float64, error) {
return 0, nil
}
case int64:
return float64(v.(int64)), nil
return float64(p), nil
case uint64:
return float64(v.(uint64)), nil
return float64(p), nil
case float64:
return v.(float64), nil
return p, nil
case string:
// return an error but don't log
return 0, fmt.Errorf("string type not supported")
return 0, errors.New("string type not supported")
default:
// return an error and log a debug message
err := fmt.Errorf("unexpected type: %T, with value: %v, for :%s", v, v, name)
Expand All @@ -172,31 +163,70 @@ func buildValue(v interface{}, name string) (float64, error) {
}
}

func formatMetricPoint(metricPoint *wavefront.MetricPoint, s *WavefrontSerializer) []byte {
var buffer bytes.Buffer
buffer.WriteString("\"")
buffer.WriteString(metricPoint.Metric)
buffer.WriteString("\" ")
buffer.WriteString(strconv.FormatFloat(metricPoint.Value, 'f', 6, 64))
buffer.WriteString(" ")
buffer.WriteString(strconv.FormatInt(metricPoint.Timestamp, 10))
buffer.WriteString(" source=\"")
buffer.WriteString(metricPoint.Source)
buffer.WriteString("\"")
func formatMetricPoint(b *buffer, metricPoint *wavefront.MetricPoint, s *WavefrontSerializer) []byte {
b.Reset()

b.WriteChar('"')
b.WriteString(metricPoint.Metric)
b.WriteString(`" `)
b.WriteFloat64(metricPoint.Value)
b.WriteChar(' ')
b.WriteUnit64(uint64(metricPoint.Timestamp))
b.WriteString(` source="`)
b.WriteString(metricPoint.Source)
b.WriteChar('"')

for k, v := range metricPoint.Tags {
buffer.WriteString(" \"")
b.WriteString(` "`)
if s.UseStrict {
buffer.WriteString(strictSanitizedChars.Replace(k))
b.WriteString(strictSanitizedChars.Replace(k))
} else {
buffer.WriteString(sanitizedChars.Replace(k))
b.WriteString(sanitizedChars.Replace(k))
}
buffer.WriteString("\"=\"")
buffer.WriteString(tagValueReplacer.Replace(v))
buffer.WriteString("\"")
b.WriteString(`"="`)
b.WriteString(tagValueReplacer.Replace(v))
b.WriteChar('"')
}

buffer.WriteString("\n")
b.WriteChar('\n')

return *b
}

// pbFree is the print buffer pool
var pbFree = sync.Pool{
New: func() interface{} {
b := make(buffer, 0, 128)
return &b
},
}

// Use a fast and simple buffer for constructing statsd messages
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this comment is a left over from something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kinda, and good catch - will remove.

type buffer []byte

func (b *buffer) Reset() { *b = (*b)[:0] }

func (b *buffer) Write(p []byte) {
*b = append(*b, p...)
}

func (b *buffer) WriteString(s string) {
*b = append(*b, s...)
}

// This is named WriteChar instead of WriteByte because the 'stdmethods' check
// of 'go vet' wants WriteByte to have the signature:
//
// func (b *buffer) WriteByte(c byte) error { ... }
//
func (b *buffer) WriteChar(c byte) {
*b = append(*b, c)
}

func (b *buffer) WriteUnit64(val uint64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

function name has a typo in it, which can be misleading (Unit vs Uint)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦‍♂ thanks - fixed

*b = strconv.AppendUint(*b, val, 10)
}

return buffer.Bytes()
func (b *buffer) WriteFloat64(val float64) {
*b = strconv.AppendFloat(*b, val, 'f', 6, 64)
}
62 changes: 60 additions & 2 deletions plugins/serializers/wavefront/wavefront_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs/wavefront"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -132,7 +133,7 @@ func TestFormatMetricPoint(t *testing.T) {
s := WavefrontSerializer{}

for _, pt := range pointTests {
bout := formatMetricPoint(pt.ptIn, &s)
bout := formatMetricPoint(new(buffer), pt.ptIn, &s)
sout := string(bout[:])
if sout != pt.out {
t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout)
Expand Down Expand Up @@ -160,7 +161,7 @@ func TestUseStrict(t *testing.T) {
s := WavefrontSerializer{UseStrict: true}

for _, pt := range pointTests {
bout := formatMetricPoint(pt.ptIn, &s)
bout := formatMetricPoint(new(buffer), pt.ptIn, &s)
sout := string(bout[:])
if sout != pt.out {
t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout)
Expand Down Expand Up @@ -293,3 +294,60 @@ func TestSerializeMetricPrefix(t *testing.T) {
expS := []string{fmt.Sprintf("\"telegraf.cpu.usage.idle\" 91.000000 %d source=\"realHost\" \"cpu\"=\"cpu0\"", now.UnixNano()/1000000000)}
assert.Equal(t, expS, mS)
}

func benchmarkMetrics(b *testing.B) [4]telegraf.Metric {
b.Helper()
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
"host": "realHost",
}
newMetric := func(v interface{}) telegraf.Metric {
fields := map[string]interface{}{
"usage_idle": v,
}
m, err := metric.New("cpu", tags, fields, now)
if err != nil {
b.Fatal(err)
}
return m
}
return [4]telegraf.Metric{
newMetric(91.5),
newMetric(91),
newMetric(true),
newMetric(false),
}
}

func BenchmarkSerialize(b *testing.B) {
var s WavefrontSerializer
metrics := benchmarkMetrics(b)
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.Serialize(metrics[i%len(metrics)])
}
}

func BenchmarkSerialize_Parallel(b *testing.B) {
var s WavefrontSerializer
metrics := benchmarkMetrics(b)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
s.Serialize(metrics[i%len(metrics)])
i++
}
})
}

func BenchmarkSerializeBatch(b *testing.B) {
var s WavefrontSerializer
m := benchmarkMetrics(b)
metrics := m[:]
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.SerializeBatch(metrics)
}
}