Skip to content

Commit

Permalink
Fix parsing of remote tcp address in statsd input (influxdata#6031)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and bitcharmer committed Oct 18, 2019
1 parent 6578c6e commit 1b2adb9
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 25 deletions.
4 changes: 3 additions & 1 deletion plugins/inputs/statsd/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ func (s *Statsd) parseEventMessage(now time.Time, message string, defaultHostnam
fields := make(map[string]interface{}, 9)
fields["alert_type"] = eventInfo // default event type
fields["text"] = uncommenter.Replace(string(rawText))
tags["source"] = defaultHostname // Use source tag because host is reserved tag key in Telegraf.
if defaultHostname != "" {
tags["source"] = defaultHostname
}
fields["priority"] = priorityNormal
ts := now
if len(message) < 2 {
Expand Down
15 changes: 6 additions & 9 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"log"
"net"
"net/url"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -817,14 +816,12 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) {
s.forget(id)
s.CurrentConnections.Incr(-1)
}()
addr := conn.RemoteAddr()
parsedURL, err := url.Parse(addr.String())
if err != nil {
// this should never happen because the conn handler should give us parsable addresses,
// but if it does we will know
log.Printf("E! [inputs.statsd] failed to parse %s\n", addr)
return // close the connetion and return

var remoteIP string
if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
remoteIP = addr.IP.String()
}

var n int
scanner := bufio.NewScanner(conn)
for {
Expand All @@ -848,7 +845,7 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) {
b.WriteByte('\n')

select {
case s.in <- input{Buffer: b, Time: time.Now(), Addr: parsedURL.Host}:
case s.in <- input{Buffer: b, Time: time.Now(), Addr: remoteIP}:
default:
s.drops++
if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 {
Expand Down
60 changes: 47 additions & 13 deletions plugins/inputs/statsd/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -15,19 +16,6 @@ const (
testMsg = "test.tcp.msg:100|c"
)

func newTestTCPListener() (*Statsd, chan input) {
in := make(chan input, 1500)
listener := &Statsd{
Protocol: "tcp",
ServiceAddress: "localhost:8125",
AllowedPendingMessages: 10000,
MaxTCPConnections: 250,
in: in,
done: make(chan struct{}),
}
return listener, in
}

func NewTestStatsd() *Statsd {
s := Statsd{}

Expand Down Expand Up @@ -1596,3 +1584,49 @@ func testValidateGauge(
}
return nil
}

func TestTCP(t *testing.T) {
statsd := Statsd{
Protocol: "tcp",
ServiceAddress: "localhost:0",
AllowedPendingMessages: 10000,
MaxTCPConnections: 2,
}
var acc testutil.Accumulator
require.NoError(t, statsd.Start(&acc))
defer statsd.Stop()

addr := statsd.TCPlistener.Addr().String()

conn, err := net.Dial("tcp", addr)
_, err = conn.Write([]byte("cpu.time_idle:42|c\n"))
require.NoError(t, err)
err = conn.Close()
require.NoError(t, err)

for {
err = statsd.Gather(&acc)
require.NoError(t, err)

if len(acc.Metrics) > 0 {
break
}
}

testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
testutil.MustMetric(
"cpu_time_idle",
map[string]string{
"metric_type": "counter",
},
map[string]interface{}{
"value": 42,
},
time.Now(),
),
},
acc.GetTelegrafMetrics(),
testutil.IgnoreTime(),
)
}
4 changes: 2 additions & 2 deletions testutil/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func MetricEqual(expected, actual telegraf.Metric) bool {

// RequireMetricEqual halts the test with an error if the metrics are not
// equal.
func RequireMetricEqual(t *testing.T, expected, actual telegraf.Metric) {
func RequireMetricEqual(t *testing.T, expected, actual telegraf.Metric, opts ...cmp.Option) {
t.Helper()

var lhs, rhs *metricDiff
Expand All @@ -154,7 +154,7 @@ func RequireMetricEqual(t *testing.T, expected, actual telegraf.Metric) {
rhs = newMetricDiff(actual)
}

if diff := cmp.Diff(lhs, rhs); diff != "" {
if diff := cmp.Diff(lhs, rhs, opts...); diff != "" {
t.Fatalf("telegraf.Metric\n--- expected\n+++ actual\n%s", diff)
}
}
Expand Down

0 comments on commit 1b2adb9

Please sign in to comment.