diff --git a/plugins/statsd/statsd.go b/plugins/statsd/statsd.go index 668f0e8b103c0..71e85322d6101 100644 --- a/plugins/statsd/statsd.go +++ b/plugins/statsd/statsd.go @@ -255,101 +255,109 @@ func (s *Statsd) parseStatsdLine(line string) error { s.Lock() defer s.Unlock() - m := metric{} - - // Validate splitting the line on "|" - pipesplit := strings.Split(line, "|") - if len(pipesplit) < 2 { - log.Printf("Error: splitting '|', Unable to parse metric: %s\n", line) + // Validate splitting the line on ":" + bits := strings.Split(line, ":") + if len(bits) < 2 { + log.Printf("Error: splitting ':', Unable to parse metric: %s\n", line) return errors.New("Error Parsing statsd line") - } else if len(pipesplit) > 2 { - sr := pipesplit[2] - errmsg := "Error: parsing sample rate, %s, it must be in format like: " + - "@0.1, @0.5, etc. Ignoring sample rate for line: %s\n" - if strings.Contains(sr, "@") && len(sr) > 1 { - samplerate, err := strconv.ParseFloat(sr[1:], 64) - if err != nil { - log.Printf(errmsg, err.Error(), line) - } else { - // sample rate successfully parsed - m.samplerate = samplerate - } - } else { - log.Printf(errmsg, "", line) - } } - // Validate metric type - switch pipesplit[1] { - case "g", "c", "s", "ms", "h": - m.mtype = pipesplit[1] - default: - log.Printf("Error: Statsd Metric type %s unsupported", pipesplit[1]) - return errors.New("Error Parsing statsd line") - } + // Extract bucket name from individual metric bits + bucketName, bits := bits[0], bits[1:] - // Validate splitting the rest of the line on ":" - colonsplit := strings.Split(pipesplit[0], ":") - if len(colonsplit) != 2 { - log.Printf("Error: splitting ':', Unable to parse metric: %s\n", line) - return errors.New("Error Parsing statsd line") - } - m.bucket = colonsplit[0] + // Add a metric for each bit available + for _, bit := range bits { + m := metric{} + + m.bucket = bucketName - // Parse the value - if strings.ContainsAny(colonsplit[1], "-+") { - if m.mtype != "g" { - log.Printf("Error: +- values are only supported for gauges: %s\n", line) + // Validate splitting the bit on "|" + pipesplit := strings.Split(bit, "|") + if len(pipesplit) < 2 { + log.Printf("Error: splitting '|', Unable to parse metric: %s\n", line) return errors.New("Error Parsing statsd line") + } else if len(pipesplit) > 2 { + sr := pipesplit[2] + errmsg := "Error: parsing sample rate, %s, it must be in format like: " + + "@0.1, @0.5, etc. Ignoring sample rate for line: %s\n" + if strings.Contains(sr, "@") && len(sr) > 1 { + samplerate, err := strconv.ParseFloat(sr[1:], 64) + if err != nil { + log.Printf(errmsg, err.Error(), line) + } else { + // sample rate successfully parsed + m.samplerate = samplerate + } + } else { + log.Printf(errmsg, "", line) + } } - m.additive = true - } - switch m.mtype { - case "g", "ms", "h": - v, err := strconv.ParseFloat(colonsplit[1], 64) - if err != nil { - log.Printf("Error: parsing value to float64: %s\n", line) + // Validate metric type + switch pipesplit[1] { + case "g", "c", "s", "ms", "h": + m.mtype = pipesplit[1] + default: + log.Printf("Error: Statsd Metric type %s unsupported", pipesplit[1]) return errors.New("Error Parsing statsd line") } - m.floatvalue = v - case "c", "s": - v, err := strconv.ParseInt(colonsplit[1], 10, 64) - if err != nil { - log.Printf("Error: parsing value to int64: %s\n", line) - return errors.New("Error Parsing statsd line") + + // Parse the value + if strings.ContainsAny(pipesplit[0], "-+") { + if m.mtype != "g" { + log.Printf("Error: +- values are only supported for gauges: %s\n", line) + return errors.New("Error Parsing statsd line") + } + m.additive = true + } + + switch m.mtype { + case "g", "ms", "h": + v, err := strconv.ParseFloat(pipesplit[0], 64) + if err != nil { + log.Printf("Error: parsing value to float64: %s\n", line) + return errors.New("Error Parsing statsd line") + } + m.floatvalue = v + case "c", "s": + v, err := strconv.ParseInt(pipesplit[0], 10, 64) + if err != nil { + log.Printf("Error: parsing value to int64: %s\n", line) + return errors.New("Error Parsing statsd line") + } + // If a sample rate is given with a counter, divide value by the rate + if m.samplerate != 0 && m.mtype == "c" { + v = int64(float64(v) / m.samplerate) + } + m.intvalue = v } - // If a sample rate is given with a counter, divide value by the rate - if m.samplerate != 0 && m.mtype == "c" { - v = int64(float64(v) / m.samplerate) + + // Parse the name & tags from bucket + m.name, m.tags = s.parseName(m.bucket) + switch m.mtype { + case "c": + m.tags["metric_type"] = "counter" + case "g": + m.tags["metric_type"] = "gauge" + case "s": + m.tags["metric_type"] = "set" + case "ms": + m.tags["metric_type"] = "timing" + case "h": + m.tags["metric_type"] = "histogram" } - m.intvalue = v - } - // Parse the name & tags from bucket - m.name, m.tags = s.parseName(m.bucket) - switch m.mtype { - case "c": - m.tags["metric_type"] = "counter" - case "g": - m.tags["metric_type"] = "gauge" - case "s": - m.tags["metric_type"] = "set" - case "ms": - m.tags["metric_type"] = "timing" - case "h": - m.tags["metric_type"] = "histogram" - } + // Make a unique key for the measurement name/tags + var tg []string + for k, v := range m.tags { + tg = append(tg, fmt.Sprintf("%s=%s", k, v)) + } + sort.Strings(tg) + m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name) - // Make a unique key for the measurement name/tags - var tg []string - for k, v := range m.tags { - tg = append(tg, fmt.Sprintf("%s=%s", k, v)) + s.aggregate(m) } - sort.Strings(tg) - m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name) - s.aggregate(m) return nil } diff --git a/plugins/statsd/statsd_test.go b/plugins/statsd/statsd_test.go index b7e1f2d93e861..991566a915fd1 100644 --- a/plugins/statsd/statsd_test.go +++ b/plugins/statsd/statsd_test.go @@ -326,6 +326,46 @@ func TestParse_MeasurementsWithSameName(t *testing.T) { } } +// Test that measurements with multiple bits, are treated as different outputs +func TestParse_MeasurementsWithMultipleValues(t *testing.T) { + s := NewStatsd() + + // Test that counters work + valid_lines := []string{ + "valid.multiple:0|ms|@0.1:1|ms", + } + + for _, line := range valid_lines { + err := s.parseStatsdLine(line) + if err != nil { + t.Errorf("Parsing line %s should not have resulted in an error\n", line) + } + } + + if len(s.timings) != 1 { + t.Errorf("Expected 1 measurement, found %d", len(s.timings)) + } + + if cachedtiming, ok := s.timings["metric_type=timingvalid_multiple"]; !ok { + t.Errorf("Expected cached measurement with hash 'metric_type=timingvalid_multiple' not found") + } else { + if cachedtiming.name != "valid_multiple" { + t.Errorf("Expected the name to be 'valid_multiple', got %s", cachedtiming.name) + } + + // A 0 at samplerate 0.1 will add 10 values of 0, + // plus the second bit of value 1 + // which adds uup to 11 individual datapoints to be cached + if cachedtiming.stats.n != 11 { + t.Errorf("Expected 11 additions, got %d", cachedtiming.stats.n) + } + + if cachedtiming.stats.upper != 1 { + t.Errorf("Expected max input to be 1, got %f", cachedtiming.stats.upper) + } + } +} + // Valid lines should be parsed and their values should be cached func TestParse_ValidLines(t *testing.T) { s := NewStatsd()