From ec4b147bebacece7e6198d3ae783b3c9c8cea638 Mon Sep 17 00:00:00 2001 From: Charlie Vieth Date: Sat, 11 May 2019 21:50:38 -0400 Subject: [PATCH 1/7] wavefront/serializer: improve performance by ~30% This PR improves the performance and memory consumption of the Wavefront serializer by roughly 30%. ``` benchmark old ns/op new ns/op delta BenchmarkSerialize-16 1168 778 -33.39% benchmark old allocs new allocs delta BenchmarkSerialize-16 16 8 -50.00% benchmark old bytes new bytes delta BenchmarkSerialize-16 1116 800 -28.32% ``` --- plugins/serializers/wavefront/wavefront.go | 178 ++++++++++-------- .../serializers/wavefront/wavefront_test.go | 62 +++++- 2 files changed, 164 insertions(+), 76 deletions(-) diff --git a/plugins/serializers/wavefront/wavefront.go b/plugins/serializers/wavefront/wavefront.go index 70b87512fad61..7a502b7615cfd 100755 --- a/plugins/serializers/wavefront/wavefront.go +++ b/plugins/serializers/wavefront/wavefront.go @@ -2,10 +2,12 @@ package wavefront import ( "bytes" + "errors" "fmt" "log" "strconv" "strings" + "sync" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs/wavefront" @@ -39,6 +41,12 @@ var tagValueReplacer = strings.NewReplacer("\"", "\\\"", "*", "-") var pathReplacer = strings.NewReplacer("_", ".") +var bufPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + func NewSerializer(prefix string, useStrict bool, sourceOverride []string) (*WavefrontSerializer, error) { s := &WavefrontSerializer{ Prefix: prefix, @@ -50,16 +58,17 @@ func NewSerializer(prefix string, useStrict bool, sourceOverride []string) (*Wav // Serialize : Serialize based on Wavefront format func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) { - out := []byte{} - metricSeparator := "." + const metricSeparator = "." + var out []byte + buf := pbFree.Get().(*buffer) for fieldName, value := range m.Fields() { var name string if fieldName == "value" { - name = fmt.Sprintf("%s%s", s.Prefix, m.Name()) + name = s.Prefix + m.Name() } else { - name = fmt.Sprintf("%s%s%s%s", s.Prefix, m.Name(), metricSeparator, fieldName) + name = s.Prefix + m.Name() + metricSeparator + fieldName } if s.UseStrict { @@ -70,80 +79,62 @@ func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) { name = pathReplacer.Replace(name) - metric := &wavefront.MetricPoint{ - Metric: name, - Timestamp: m.Time().Unix(), - } - - metricValue, buildError := buildValue(value, metric.Metric) + metricValue, buildError := buildValue(value, name) if buildError != nil { // bad value continue to next metric continue } - metric.Value = metricValue - source, tags := buildTags(m.Tags(), s) - metric.Source = source - metric.Tags = tags - - out = append(out, formatMetricPoint(metric, s)...) + metric := wavefront.MetricPoint{ + Metric: name, + Timestamp: m.Time().Unix(), + Value: metricValue, + Source: source, + Tags: tags, + } + out = append(out, formatMetricPoint(buf, &metric, s)...) } + + pbFree.Put(buf) return out, nil } func (s *WavefrontSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { - var batch bytes.Buffer + var batch []byte for _, m := range metrics { buf, err := s.Serialize(m) if err != nil { return nil, err } - _, err = batch.Write(buf) - if err != nil { - return nil, err + batch = append(batch, buf...) + } + return batch, nil +} + +func findSourceTag(mTags map[string]string, s *WavefrontSerializer) string { + if src, ok := mTags["source"]; ok { + delete(mTags, "source") + return src + } + for _, src := range s.SourceOverride { + if source, ok := mTags[src]; ok { + delete(mTags, src) + mTags["telegraf_host"] = mTags["host"] + return source } } - return batch.Bytes(), nil + return mTags["host"] } func buildTags(mTags map[string]string, s *WavefrontSerializer) (string, map[string]string) { - // Remove all empty tags. for k, v := range mTags { if v == "" { delete(mTags, k) } } - - var source string - - if src, ok := mTags["source"]; ok { - source = src - delete(mTags, "source") - } else { - sourceTagFound := false - for _, src := range s.SourceOverride { - for k, v := range mTags { - if k == src { - source = v - mTags["telegraf_host"] = mTags["host"] - sourceTagFound = true - delete(mTags, k) - break - } - } - if sourceTagFound { - break - } - } - - if !sourceTagFound { - source = mTags["host"] - } - } - + source := findSourceTag(mTags, s) delete(mTags, "host") - return tagValueReplacer.Replace(source), mTags } @@ -156,14 +147,14 @@ func buildValue(v interface{}, name string) (float64, error) { return 0, nil } case int64: - return float64(v.(int64)), nil + return float64(p), nil case uint64: - return float64(v.(uint64)), nil + return float64(p), nil case float64: - return v.(float64), nil + return p, nil case string: // return an error but don't log - return 0, fmt.Errorf("string type not supported") + return 0, errors.New("string type not supported") default: // return an error and log a debug message err := fmt.Errorf("unexpected type: %T, with value: %v, for :%s", v, v, name) @@ -172,31 +163,70 @@ func buildValue(v interface{}, name string) (float64, error) { } } -func formatMetricPoint(metricPoint *wavefront.MetricPoint, s *WavefrontSerializer) []byte { - var buffer bytes.Buffer - buffer.WriteString("\"") - buffer.WriteString(metricPoint.Metric) - buffer.WriteString("\" ") - buffer.WriteString(strconv.FormatFloat(metricPoint.Value, 'f', 6, 64)) - buffer.WriteString(" ") - buffer.WriteString(strconv.FormatInt(metricPoint.Timestamp, 10)) - buffer.WriteString(" source=\"") - buffer.WriteString(metricPoint.Source) - buffer.WriteString("\"") +func formatMetricPoint(b *buffer, metricPoint *wavefront.MetricPoint, s *WavefrontSerializer) []byte { + b.Reset() + + b.WriteChar('"') + b.WriteString(metricPoint.Metric) + b.WriteString(`" `) + b.WriteFloat64(metricPoint.Value) + b.WriteChar(' ') + b.WriteUnit64(uint64(metricPoint.Timestamp)) + b.WriteString(` source="`) + b.WriteString(metricPoint.Source) + b.WriteChar('"') for k, v := range metricPoint.Tags { - buffer.WriteString(" \"") + b.WriteString(` "`) if s.UseStrict { - buffer.WriteString(strictSanitizedChars.Replace(k)) + b.WriteString(strictSanitizedChars.Replace(k)) } else { - buffer.WriteString(sanitizedChars.Replace(k)) + b.WriteString(sanitizedChars.Replace(k)) } - buffer.WriteString("\"=\"") - buffer.WriteString(tagValueReplacer.Replace(v)) - buffer.WriteString("\"") + b.WriteString(`"="`) + b.WriteString(tagValueReplacer.Replace(v)) + b.WriteChar('"') } - buffer.WriteString("\n") + b.WriteChar('\n') + + return *b +} + +// pbFree is the print buffer pool +var pbFree = sync.Pool{ + New: func() interface{} { + b := make(buffer, 0, 128) + return &b + }, +} + +// Use a fast and simple buffer for constructing statsd messages +type buffer []byte + +func (b *buffer) Reset() { *b = (*b)[:0] } + +func (b *buffer) Write(p []byte) { + *b = append(*b, p...) +} + +func (b *buffer) WriteString(s string) { + *b = append(*b, s...) +} + +// This is named WriteChar instead of WriteByte because the 'stdmethods' check +// of 'go vet' wants WriteByte to have the signature: +// +// func (b *buffer) WriteByte(c byte) error { ... } +// +func (b *buffer) WriteChar(c byte) { + *b = append(*b, c) +} + +func (b *buffer) WriteUnit64(val uint64) { + *b = strconv.AppendUint(*b, val, 10) +} - return buffer.Bytes() +func (b *buffer) WriteFloat64(val float64) { + *b = strconv.AppendFloat(*b, val, 'f', 6, 64) } diff --git a/plugins/serializers/wavefront/wavefront_test.go b/plugins/serializers/wavefront/wavefront_test.go index 3230ce51534c0..da521f4b3c974 100755 --- a/plugins/serializers/wavefront/wavefront_test.go +++ b/plugins/serializers/wavefront/wavefront_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/outputs/wavefront" "github.com/stretchr/testify/assert" @@ -132,7 +133,7 @@ func TestFormatMetricPoint(t *testing.T) { s := WavefrontSerializer{} for _, pt := range pointTests { - bout := formatMetricPoint(pt.ptIn, &s) + bout := formatMetricPoint(new(buffer), pt.ptIn, &s) sout := string(bout[:]) if sout != pt.out { t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout) @@ -160,7 +161,7 @@ func TestUseStrict(t *testing.T) { s := WavefrontSerializer{UseStrict: true} for _, pt := range pointTests { - bout := formatMetricPoint(pt.ptIn, &s) + bout := formatMetricPoint(new(buffer), pt.ptIn, &s) sout := string(bout[:]) if sout != pt.out { t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout) @@ -293,3 +294,60 @@ func TestSerializeMetricPrefix(t *testing.T) { expS := []string{fmt.Sprintf("\"telegraf.cpu.usage.idle\" 91.000000 %d source=\"realHost\" \"cpu\"=\"cpu0\"", now.UnixNano()/1000000000)} assert.Equal(t, expS, mS) } + +func benchmarkMetrics(b *testing.B) [4]telegraf.Metric { + b.Helper() + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + "host": "realHost", + } + newMetric := func(v interface{}) telegraf.Metric { + fields := map[string]interface{}{ + "usage_idle": v, + } + m, err := metric.New("cpu", tags, fields, now) + if err != nil { + b.Fatal(err) + } + return m + } + return [4]telegraf.Metric{ + newMetric(91.5), + newMetric(91), + newMetric(true), + newMetric(false), + } +} + +func BenchmarkSerialize(b *testing.B) { + var s WavefrontSerializer + metrics := benchmarkMetrics(b) + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.Serialize(metrics[i%len(metrics)]) + } +} + +func BenchmarkSerialize_Parallel(b *testing.B) { + var s WavefrontSerializer + metrics := benchmarkMetrics(b) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + s.Serialize(metrics[i%len(metrics)]) + i++ + } + }) +} + +func BenchmarkSerializeBatch(b *testing.B) { + var s WavefrontSerializer + m := benchmarkMetrics(b) + metrics := m[:] + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.SerializeBatch(metrics) + } +} From 96f2a2d7d25b731a716a57c8ed7f7cbac9ba7c9f Mon Sep 17 00:00:00 2001 From: Charlie Vieth Date: Thu, 16 May 2019 21:10:36 -0400 Subject: [PATCH 2/7] wavefront/serializer: add scratch buffer to WavefrontSerializer Note this removes use of `sync.Pool` and since this is no longer thread-safe the parallel benchmark is also removed. --- plugins/serializers/wavefront/wavefront.go | 50 +++++++------------ .../serializers/wavefront/wavefront_test.go | 13 ----- 2 files changed, 18 insertions(+), 45 deletions(-) diff --git a/plugins/serializers/wavefront/wavefront.go b/plugins/serializers/wavefront/wavefront.go index 7a502b7615cfd..2a303be26cf50 100755 --- a/plugins/serializers/wavefront/wavefront.go +++ b/plugins/serializers/wavefront/wavefront.go @@ -1,13 +1,11 @@ package wavefront import ( - "bytes" "errors" "fmt" "log" "strconv" "strings" - "sync" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs/wavefront" @@ -18,6 +16,7 @@ type WavefrontSerializer struct { Prefix string UseStrict bool SourceOverride []string + scratch buffer } // catch many of the invalid chars that could appear in a metric or tag name @@ -41,12 +40,6 @@ var tagValueReplacer = strings.NewReplacer("\"", "\\\"", "*", "-") var pathReplacer = strings.NewReplacer("_", ".") -var bufPool = sync.Pool{ - New: func() interface{} { - return new(bytes.Buffer) - }, -} - func NewSerializer(prefix string, useStrict bool, sourceOverride []string) (*WavefrontSerializer, error) { s := &WavefrontSerializer{ Prefix: prefix, @@ -56,11 +49,8 @@ func NewSerializer(prefix string, useStrict bool, sourceOverride []string) (*Wav return s, nil } -// Serialize : Serialize based on Wavefront format -func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) { +func (s *WavefrontSerializer) serialize(buf *buffer, m telegraf.Metric) { const metricSeparator = "." - var out []byte - buf := pbFree.Get().(*buffer) for fieldName, value := range m.Fields() { var name string @@ -92,23 +82,23 @@ func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) { Source: source, Tags: tags, } - out = append(out, formatMetricPoint(buf, &metric, s)...) + formatMetricPoint(&s.scratch, &metric, s) } +} - pbFree.Put(buf) - return out, nil +// Serialize : Serialize based on Wavefront format +func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) { + s.scratch.Reset() + s.serialize(&s.scratch, m) + return s.scratch.Copy(), nil } func (s *WavefrontSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { - var batch []byte + s.scratch.Reset() for _, m := range metrics { - buf, err := s.Serialize(m) - if err != nil { - return nil, err - } - batch = append(batch, buf...) + s.serialize(&s.scratch, m) } - return batch, nil + return s.scratch.Copy(), nil } func findSourceTag(mTags map[string]string, s *WavefrontSerializer) string { @@ -164,8 +154,6 @@ func buildValue(v interface{}, name string) (float64, error) { } func formatMetricPoint(b *buffer, metricPoint *wavefront.MetricPoint, s *WavefrontSerializer) []byte { - b.Reset() - b.WriteChar('"') b.WriteString(metricPoint.Metric) b.WriteString(`" `) @@ -193,19 +181,17 @@ func formatMetricPoint(b *buffer, metricPoint *wavefront.MetricPoint, s *Wavefro return *b } -// pbFree is the print buffer pool -var pbFree = sync.Pool{ - New: func() interface{} { - b := make(buffer, 0, 128) - return &b - }, -} - // Use a fast and simple buffer for constructing statsd messages type buffer []byte func (b *buffer) Reset() { *b = (*b)[:0] } +func (b *buffer) Copy() []byte { + p := make([]byte, len(*b)) + copy(p, *b) + return p +} + func (b *buffer) Write(p []byte) { *b = append(*b, p...) } diff --git a/plugins/serializers/wavefront/wavefront_test.go b/plugins/serializers/wavefront/wavefront_test.go index da521f4b3c974..548326e703e6c 100755 --- a/plugins/serializers/wavefront/wavefront_test.go +++ b/plugins/serializers/wavefront/wavefront_test.go @@ -329,19 +329,6 @@ func BenchmarkSerialize(b *testing.B) { } } -func BenchmarkSerialize_Parallel(b *testing.B) { - var s WavefrontSerializer - metrics := benchmarkMetrics(b) - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - i := 0 - for pb.Next() { - s.Serialize(metrics[i%len(metrics)]) - i++ - } - }) -} - func BenchmarkSerializeBatch(b *testing.B) { var s WavefrontSerializer m := benchmarkMetrics(b) From 6fc8173c128c4a0790d9882b3402b99f4cd6f301 Mon Sep 17 00:00:00 2001 From: Charlie Vieth Date: Thu, 16 May 2019 21:17:16 -0400 Subject: [PATCH 3/7] wavefront/serializer: remove unused buffer.Write() method --- plugins/serializers/wavefront/wavefront.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/plugins/serializers/wavefront/wavefront.go b/plugins/serializers/wavefront/wavefront.go index 2a303be26cf50..0ee113ce583a4 100755 --- a/plugins/serializers/wavefront/wavefront.go +++ b/plugins/serializers/wavefront/wavefront.go @@ -192,10 +192,6 @@ func (b *buffer) Copy() []byte { return p } -func (b *buffer) Write(p []byte) { - *b = append(*b, p...) -} - func (b *buffer) WriteString(s string) { *b = append(*b, s...) } From f63ab04485d8a28583d8b15515dd20a3ba31e99a Mon Sep 17 00:00:00 2001 From: Charlie Vieth Date: Fri, 24 May 2019 16:54:50 -0400 Subject: [PATCH 4/7] wavefront/serializer: remove comment --- plugins/serializers/wavefront/wavefront.go | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/serializers/wavefront/wavefront.go b/plugins/serializers/wavefront/wavefront.go index 0ee113ce583a4..75988b66a2ba4 100755 --- a/plugins/serializers/wavefront/wavefront.go +++ b/plugins/serializers/wavefront/wavefront.go @@ -181,7 +181,6 @@ func formatMetricPoint(b *buffer, metricPoint *wavefront.MetricPoint, s *Wavefro return *b } -// Use a fast and simple buffer for constructing statsd messages type buffer []byte func (b *buffer) Reset() { *b = (*b)[:0] } From 0b6a6b7e131e9d765af7c6be251262fde820bb1b Mon Sep 17 00:00:00 2001 From: Charlie Vieth Date: Fri, 24 May 2019 16:55:19 -0400 Subject: [PATCH 5/7] wavefront/serializer: fix misspelling WriteUnit64 => WriteUint64 --- plugins/serializers/wavefront/wavefront.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/serializers/wavefront/wavefront.go b/plugins/serializers/wavefront/wavefront.go index 75988b66a2ba4..83e7e3a9fe931 100755 --- a/plugins/serializers/wavefront/wavefront.go +++ b/plugins/serializers/wavefront/wavefront.go @@ -159,7 +159,7 @@ func formatMetricPoint(b *buffer, metricPoint *wavefront.MetricPoint, s *Wavefro b.WriteString(`" `) b.WriteFloat64(metricPoint.Value) b.WriteChar(' ') - b.WriteUnit64(uint64(metricPoint.Timestamp)) + b.WriteUint64(uint64(metricPoint.Timestamp)) b.WriteString(` source="`) b.WriteString(metricPoint.Source) b.WriteChar('"') @@ -204,7 +204,7 @@ func (b *buffer) WriteChar(c byte) { *b = append(*b, c) } -func (b *buffer) WriteUnit64(val uint64) { +func (b *buffer) WriteUint64(val uint64) { *b = strconv.AppendUint(*b, val, 10) } From cb722ebd29a5a80bb82f54b6d98f43b67797f389 Mon Sep 17 00:00:00 2001 From: Charlie Vieth Date: Fri, 24 May 2019 16:55:54 -0400 Subject: [PATCH 6/7] wavefront/serializer: make WavefrontSerializer thread-safe Since the buffer we use is now a field of the WavefrontSerializer we need to protect it against concurrent access with a mutex. --- plugins/serializers/wavefront/wavefront.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/plugins/serializers/wavefront/wavefront.go b/plugins/serializers/wavefront/wavefront.go index 83e7e3a9fe931..8382da60b8041 100755 --- a/plugins/serializers/wavefront/wavefront.go +++ b/plugins/serializers/wavefront/wavefront.go @@ -6,6 +6,7 @@ import ( "log" "strconv" "strings" + "sync" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs/wavefront" @@ -17,6 +18,7 @@ type WavefrontSerializer struct { UseStrict bool SourceOverride []string scratch buffer + mu sync.Mutex // buffer mutex } // catch many of the invalid chars that could appear in a metric or tag name @@ -88,17 +90,23 @@ func (s *WavefrontSerializer) serialize(buf *buffer, m telegraf.Metric) { // Serialize : Serialize based on Wavefront format func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) { + s.mu.Lock() s.scratch.Reset() s.serialize(&s.scratch, m) - return s.scratch.Copy(), nil + out := s.scratch.Copy() + s.mu.Unlock() + return out, nil } func (s *WavefrontSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { + s.mu.Lock() s.scratch.Reset() for _, m := range metrics { s.serialize(&s.scratch, m) } - return s.scratch.Copy(), nil + out := s.scratch.Copy() + s.mu.Unlock() + return out, nil } func findSourceTag(mTags map[string]string, s *WavefrontSerializer) string { From f4f19c03f92674584b6e8dd4c31a7964909e6e95 Mon Sep 17 00:00:00 2001 From: Charlie Vieth Date: Fri, 24 May 2019 17:03:28 -0400 Subject: [PATCH 7/7] wavefront/serializer: return bool from buildValue The error it previously returned was treated as a bool - so we should just return a bool instead. --- plugins/serializers/wavefront/wavefront.go | 31 ++++++++++------------ 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/plugins/serializers/wavefront/wavefront.go b/plugins/serializers/wavefront/wavefront.go index 8382da60b8041..67fa1ae3a6834 100755 --- a/plugins/serializers/wavefront/wavefront.go +++ b/plugins/serializers/wavefront/wavefront.go @@ -1,8 +1,6 @@ package wavefront import ( - "errors" - "fmt" "log" "strconv" "strings" @@ -71,8 +69,8 @@ func (s *WavefrontSerializer) serialize(buf *buffer, m telegraf.Metric) { name = pathReplacer.Replace(name) - metricValue, buildError := buildValue(value, name) - if buildError != nil { + metricValue, valid := buildValue(value, name) + if !valid { // bad value continue to next metric continue } @@ -136,28 +134,27 @@ func buildTags(mTags map[string]string, s *WavefrontSerializer) (string, map[str return tagValueReplacer.Replace(source), mTags } -func buildValue(v interface{}, name string) (float64, error) { +func buildValue(v interface{}, name string) (val float64, valid bool) { switch p := v.(type) { case bool: if p { - return 1, nil - } else { - return 0, nil + return 1, true } + return 0, true case int64: - return float64(p), nil + return float64(p), true case uint64: - return float64(p), nil + return float64(p), true case float64: - return p, nil + return p, true case string: - // return an error but don't log - return 0, errors.New("string type not supported") + // return false but don't log + return 0, false default: - // return an error and log a debug message - err := fmt.Errorf("unexpected type: %T, with value: %v, for :%s", v, v, name) - log.Printf("D! Serializer [wavefront] %s\n", err.Error()) - return 0, err + // log a debug message + log.Printf("D! Serializer [wavefront] unexpected type: %T, with value: %v, for :%s\n", + v, v, name) + return 0, false } }