Skip to content

Commit

Permalink
Statsdreceiver optimizations (#33683)
Browse files Browse the repository at this point in the history
**Description:**
Optimize statsdreceiver code to reduce heap usage
Also reduce failCnt increment to reduce memory footprint in cases of
tons of malformed statsd messages

**Link to tracking Issue:** <Issue number if applicable>

**Testing:**
- Tested internally and saw a 17% reduction in object allocation from
our workloads (with ~2.5k/s statsd metrics input) mostly from reduction
of strings.Split
Screenshot is from `go tool pprof -http:8081 -diff_base baseline.heap.gz
optimized.heap.gz`
![Screenshot 2024-06-20 at 12 45
17 PM](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/142453/e8065097-2533-4934-9b78-c0e828e075ac)

Note: I also saw lots of allocations from attribute.NewSet but idk the
best way to go about reducing that. (Screenshot below is from
unoptimized statsdreceiver)
![Screenshot 2024-06-20 at 12 44
32 PM](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/142453/293553c0-92c8-465b-872a-ddb7b68299a0)

I tried changing statsDMetricDescription's attrs field to
`map[string]any` in the hopes of using attributes.FromRaw in
buildGaugeMetric but it would require replacing statsDMetricDescription
as the type of the map key in instruments.gauges, etc.
  • Loading branch information
hardproblems authored Jun 22, 2024
1 parent 93a78c6 commit 46e828e
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 43 deletions.
27 changes: 27 additions & 0 deletions .chloggen/feature-statsdreceiver-optimizations.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: statsdreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Optimize statsdreceiver to reduce object allocations

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33683]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
40 changes: 17 additions & 23 deletions receiver/statsdreceiver/internal/protocol/statsd_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,41 +356,41 @@ func (p *StatsDParser) Aggregate(line string, addr net.Addr) error {
func parseMessageToMetric(line string, enableMetricType bool, enableSimpleTags bool) (statsDMetric, error) {
result := statsDMetric{}

parts := strings.Split(line, "|")
if len(parts) < 2 {
nameValue, rest, foundName := strings.Cut(line, "|")
if !foundName {
return result, fmt.Errorf("invalid message format: %s", line)
}

separatorIndex := strings.IndexByte(parts[0], ':')
if separatorIndex < 0 {
return result, fmt.Errorf("invalid <name>:<value> format: %s", parts[0])
name, valueStr, foundValue := strings.Cut(nameValue, ":")
if !foundValue {
return result, fmt.Errorf("invalid <name>:<value> format: %s", nameValue)
}

result.description.name = parts[0][0:separatorIndex]
if result.description.name == "" {
if name == "" {
return result, errEmptyMetricName
}
valueStr := parts[0][separatorIndex+1:]
result.description.name = name
if valueStr == "" {
return result, errEmptyMetricValue
}
if strings.HasPrefix(valueStr, "-") || strings.HasPrefix(valueStr, "+") {
result.addition = true
}

inType := MetricType(parts[1])
var metricType, additionalParts, _ = strings.Cut(rest, "|")
inType := MetricType(metricType)
switch inType {
case CounterType, GaugeType, HistogramType, TimingType, DistributionType:
result.description.metricType = inType
default:
return result, fmt.Errorf("unsupported metric type: %s", inType)
}

additionalParts := parts[2:]

var kvs []attribute.KeyValue

for _, part := range additionalParts {
var part string
part, additionalParts, _ = strings.Cut(additionalParts, "|")
for ; len(part) > 0; part, additionalParts, _ = strings.Cut(additionalParts, "|") {
switch {
case strings.HasPrefix(part, "@"):
sampleRateStr := strings.TrimPrefix(part, "@")
Expand All @@ -402,30 +402,24 @@ func parseMessageToMetric(line string, enableMetricType bool, enableSimpleTags b

result.sampleRate = f
case strings.HasPrefix(part, "#"):
tagsStr := strings.TrimPrefix(part, "#")
var tagsStr = strings.TrimPrefix(part, "#")

// handle an empty tag set
// where the tags part was still sent (some clients do this)
if len(tagsStr) == 0 {
continue
}

tagSets := strings.Split(tagsStr, ",")

for _, tagSet := range tagSets {
tagParts := strings.SplitN(tagSet, ":", 2)
k := tagParts[0]
var tagSet string
tagSet, tagsStr, _ = strings.Cut(tagsStr, ",")
for ; len(tagSet) > 0; tagSet, tagsStr, _ = strings.Cut(tagsStr, ",") {
k, v, _ := strings.Cut(tagSet, ":")
if k == "" {
return result, fmt.Errorf("invalid tag format: %q", tagSet)
}

// support both simple tags (w/o value) and dimension tags (w/ value).
// dogstatsd notably allows simple tags.
var v string
if len(tagParts) == 2 {
v = tagParts[1]
}

if v == "" && !enableSimpleTags {
return result, fmt.Errorf("invalid tag format: %q", tagSet)
}
Expand Down
59 changes: 59 additions & 0 deletions receiver/statsdreceiver/internal/transport/split_bytes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"

import "bytes"

// SplitBytes iterates over a byte buffer, returning chunks split by a given
// delimiter byte. It does not perform any allocations, and does not modify the
// buffer it is given. It is not safe for use by concurrent goroutines.
//
// sb := NewSplitBytes(buf, '\n')
// for sb.Next() {
// fmt.Printf("%q\n", sb.Chunk())
// }
//
// The sequence of chunks returned by SplitBytes is equivalent to calling
// bytes.Split, except without allocating an intermediate slice.
type SplitBytes struct {
buf []byte
delim byte
currentChunk []byte
lastChunk bool
}

// NewSplitBytes initializes a SplitBytes struct with the provided buffer and delimiter.
func NewSplitBytes(buf []byte, delim byte) *SplitBytes {
return &SplitBytes{
buf: buf,
delim: delim,
}
}

// Next advances SplitBytes to the next chunk, returning true if a new chunk
// actually exists and false otherwise.
func (sb *SplitBytes) Next() bool {
if sb.lastChunk {
// we do not check the length here, this ensures that we return the
// last chunk in the sequence (even if it's empty)
return false
}

next := bytes.IndexByte(sb.buf, sb.delim)
if next == -1 {
// no newline, consume the entire buffer
sb.currentChunk = sb.buf
sb.buf = nil
sb.lastChunk = true
} else {
sb.currentChunk = sb.buf[:next]
sb.buf = sb.buf[next+1:]
}
return true
}

// Chunk returns the current chunk.
func (sb *SplitBytes) Chunk() []byte {
return sb.currentChunk
}
25 changes: 7 additions & 18 deletions receiver/statsdreceiver/internal/transport/udp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@
package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"

import (
"bytes"
"errors"
"fmt"
"io"
"net"
"strings"

"go.opentelemetry.io/collector/consumer"
)
Expand Down Expand Up @@ -53,9 +50,7 @@ func (u *udpServer) ListenAndServe(
for {
n, addr, err := u.packetConn.ReadFrom(buf)
if n > 0 {
bufCopy := make([]byte, n)
copy(bufCopy, buf)
u.handlePacket(bufCopy, addr, transferChan)
u.handlePacket(n, buf, addr, transferChan)
}
if err != nil {
reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v",
Expand All @@ -80,22 +75,16 @@ func (u *udpServer) Close() error {

// handlePacket is helper that parses the buffer and split it line by line to be parsed upstream.
func (u *udpServer) handlePacket(
numBytes int,
data []byte,
addr net.Addr,
transferChan chan<- Metric,
) {
buf := bytes.NewBuffer(data)
for {
bytes, err := buf.ReadBytes((byte)('\n'))
if errors.Is(err, io.EOF) {
if len(bytes) == 0 {
// Completed without errors.
break
}
}
line := strings.TrimSpace(string(bytes))
if line != "" {
transferChan <- Metric{line, addr}
splitPacket := NewSplitBytes(data[:numBytes], '\n')
for splitPacket.Next() {
chunk := splitPacket.Chunk()
if len(chunk) > 0 {
transferChan <- Metric{string(chunk), addr}
}
}
}
8 changes: 6 additions & 2 deletions receiver/statsdreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (r *statsdReceiver) Start(ctx context.Context, _ component.Host) error {
}
}()
go func() {
var successCnt int64
var failCnt, successCnt int64
for {
select {
case <-ticker.C:
Expand All @@ -137,7 +137,11 @@ func (r *statsdReceiver) Start(ctx context.Context, _ component.Host) error {
case metric := <-transferChan:
err := r.parser.Aggregate(metric.Raw, metric.Addr)
if err != nil {
r.reporter.RecordParseFailure()
failCnt++
if failCnt%100 == 0 {
r.reporter.RecordParseFailure()
failCnt = 0
}
r.reporter.OnDebugf("Error aggregating pmetric", zap.Error(err))
} else {
successCnt++
Expand Down

0 comments on commit 46e828e

Please sign in to comment.