From ad024c6f700c5febb3d10efe20a4e3cfefe2e88d Mon Sep 17 00:00:00 2001 From: Alirie Gray Date: Mon, 29 Apr 2019 13:27:55 -0700 Subject: [PATCH 01/15] WIP: add native ping --- plugins/inputs/ping/ping.go | 155 +++++++++++++++++++---------- plugins/inputs/ping/ping_native.go | 43 ++++++++ 2 files changed, 146 insertions(+), 52 deletions(-) create mode 100644 plugins/inputs/ping/ping_native.go diff --git a/plugins/inputs/ping/ping.go b/plugins/inputs/ping/ping.go index 28e967a85c8e9..d1dba2c381646 100644 --- a/plugins/inputs/ping/ping.go +++ b/plugins/inputs/ping/ping.go @@ -8,16 +8,18 @@ import ( "net" "os/exec" "regexp" - "runtime" "strconv" "strings" "sync" - "syscall" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" + ping "github.com/sparrc/go-ping" + "golang.org/x/net/icmp" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" ) // HostPinger is a function that runs the "ping" function using a list of @@ -95,10 +97,48 @@ func (_ *Ping) SampleConfig() string { } func (p *Ping) Gather(acc telegraf.Accumulator) error { + // var conn *icmp.PacketConn + // var err error + // if p.ipv4 { + // if conn, err = ping.Listen(ipv4Proto[p.network], p.source); conn == nil { + // return err + // } + // conn.IPv4PacketConn().SetControlMessage(ipv4.FlagTTL, true) + // } else { + // if conn, err = ping.Listen(ipv6Proto[p.network], p.source); conn == nil { + // return err + // } + // conn.IPv6PacketConn().SetControlMessage(ipv6.FlagHopLimit, true) + // } + // defer conn.Close() + // Spin off a go routine for each url to ping for _, url := range p.Urls { + pinger, err := ping.NewPinger(url) + if err != nil { + acc.AddError(fmt.Errorf("%s: %s", err, url)) + acc.AddFields("ping", map[string]interface{}{"result_code": 2}, map[string]string{"url": url}) + continue + } + + var conn *icmp.PacketConn + if conn, err = pinger.Listen(); err != nil { + // close(pinger.done) + return err // TODO: verify what to return if unrecoverable + } + + if pinger.GetIpv4() { + conn.IPv4PacketConn().SetControlMessage(ipv4.FlagTTL, true) + } else { + conn.IPv6PacketConn().SetControlMessage(ipv6.FlagHopLimit, true) + } + + // defer conn.Close() + p.wg.Add(1) - go p.pingToURL(url, acc) + + go p.pingToURL(url, pinger, conn, acc) + // go p.pingHostNative(pinger) } p.wg.Wait() @@ -106,7 +146,7 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error { return nil } -func (p *Ping) pingToURL(u string, acc telegraf.Accumulator) { +func (p *Ping) pingToURL(u string, pinger *ping.Pinger, conn *icmp.PacketConn, acc telegraf.Accumulator) { defer p.wg.Done() tags := map[string]string{"url": u} fields := map[string]interface{}{"result_code": 0} @@ -118,41 +158,52 @@ func (p *Ping) pingToURL(u string, acc telegraf.Accumulator) { acc.AddFields("ping", fields, tags) return } + // TODO: make this conditional: + + // args := p.args(u, runtime.GOOS) + // totalTimeout := 60.0 + // if len(p.Arguments) == 0 { + // totalTimeout = float64(p.Count)*p.Timeout + float64(p.Count-1)*p.PingInterval + // } + + // out, err := p.pingHost(p.Binary, totalTimeout, args...) + // if err != nil { + // // Some implementations of ping return a 1 exit code on + // // timeout, if this occurs we will not exit and try to parse + // // the output. + // status := -1 + // if exitError, ok := err.(*exec.ExitError); ok { + // if ws, ok := exitError.Sys().(syscall.WaitStatus); ok { + // status = ws.ExitStatus() + // fields["result_code"] = status + // } + // } + + // if status != 1 { + // // Combine go err + stderr output + // out = strings.TrimSpace(out) + // if len(out) > 0 { + // acc.AddError(fmt.Errorf("host %s: %s, %s", u, out, err)) + // } else { + // acc.AddError(fmt.Errorf("host %s: %s", u, err)) + // } + // fields["result_code"] = 2 + // acc.AddFields("ping", fields, tags) + // return + // } + // } + + pinger.Count = p.Count + pinger.Interval = time.Duration(p.PingInterval) * time.Second + pinger.Timeout = time.Duration(p.Timeout) * time.Second + pinger.SetPrivileged(true) + + fmt.Printf("pinger timeout: %v \n", pinger.Timeout.Seconds()) + + results, err := p.pingHostNative(pinger, conn) + + // trans, rec, ttl, min, avg, max, stddev, err := processPingOutput(out) - args := p.args(u, runtime.GOOS) - totalTimeout := 60.0 - if len(p.Arguments) == 0 { - totalTimeout = float64(p.Count)*p.Timeout + float64(p.Count-1)*p.PingInterval - } - - out, err := p.pingHost(p.Binary, totalTimeout, args...) - if err != nil { - // Some implementations of ping return a 1 exit code on - // timeout, if this occurs we will not exit and try to parse - // the output. - status := -1 - if exitError, ok := err.(*exec.ExitError); ok { - if ws, ok := exitError.Sys().(syscall.WaitStatus); ok { - status = ws.ExitStatus() - fields["result_code"] = status - } - } - - if status != 1 { - // Combine go err + stderr output - out = strings.TrimSpace(out) - if len(out) > 0 { - acc.AddError(fmt.Errorf("host %s: %s, %s", u, out, err)) - } else { - acc.AddError(fmt.Errorf("host %s: %s", u, err)) - } - fields["result_code"] = 2 - acc.AddFields("ping", fields, tags) - return - } - } - - trans, rec, ttl, min, avg, max, stddev, err := processPingOutput(out) if err != nil { // fatal error acc.AddError(fmt.Errorf("%s: %s", err, u)) @@ -161,24 +212,24 @@ func (p *Ping) pingToURL(u string, acc telegraf.Accumulator) { return } // Calculate packet loss percentage - loss := float64(trans-rec) / float64(trans) * 100.0 - fields["packets_transmitted"] = trans - fields["packets_received"] = rec - fields["percent_packet_loss"] = loss - if ttl >= 0 { - fields["ttl"] = ttl + // loss := float64(trans-rec) / float64(trans) * 100.0 + fields["packets_transmitted"] = results.transmitted + fields["packets_received"] = results.received + fields["percent_packet_loss"] = results.pktLoss + if results.ttl >= 0 { + fields["ttl"] = results.ttl } - if min >= 0 { - fields["minimum_response_ms"] = min + if results.min >= 0 { + fields["minimum_response_ms"] = results.min } - if avg >= 0 { - fields["average_response_ms"] = avg + if results.avg >= 0 { + fields["average_response_ms"] = results.avg } - if max >= 0 { - fields["maximum_response_ms"] = max + if results.max >= 0 { + fields["maximum_response_ms"] = results.max } - if stddev >= 0 { - fields["standard_deviation_ms"] = stddev + if results.stddev >= 0 { + fields["standard_deviation_ms"] = results.stddev } acc.AddFields("ping", fields, tags) } diff --git a/plugins/inputs/ping/ping_native.go b/plugins/inputs/ping/ping_native.go new file mode 100644 index 0000000000000..035ec1d2560e8 --- /dev/null +++ b/plugins/inputs/ping/ping_native.go @@ -0,0 +1,43 @@ +package ping + +import ( + "fmt" + + ping "github.com/sparrc/go-ping" + "golang.org/x/net/icmp" +) + +type pingResults struct { + transmitted int + received int + pktLoss float64 + ttl int + min float64 + avg float64 + max float64 + stddev float64 +} + +// TODO: add privileged flag +func (p *Ping) pingHostNative(pinger *ping.Pinger, conn *icmp.PacketConn) (*pingResults, error) { + results := pingResults{} + + pinger.OnRecv = func(pkt *ping.Packet) { + fmt.Printf("packet: %#v \n", pkt) + results.ttl = pkt.Ttl + } + pinger.OnFinish = func(stats *ping.Statistics) { + results.received = stats.PacketsRecv + results.transmitted = stats.PacketsSent + results.pktLoss = stats.PacketLoss + results.min = stats.MinRtt.Seconds() * 1000 + results.avg = stats.AvgRtt.Seconds() * 1000 + results.max = stats.MaxRtt.Seconds() * 1000 + results.stddev = stats.StdDevRtt.Seconds() * 1000 + + fmt.Printf("stats: %#v \n", stats) + } + pinger.DoPing(conn) + + return &results, nil +} From 2d173e289ff6667b6972cbb9caf6da01b29d6755 Mon Sep 17 00:00:00 2001 From: greg linton Date: Mon, 6 May 2019 11:33:57 -0600 Subject: [PATCH 02/15] Attempt to record accurate ping time --- plugins/inputs/ping/ping.go | 40 ++++++++++-------------------- plugins/inputs/ping/ping_native.go | 18 ++++++-------- 2 files changed, 21 insertions(+), 37 deletions(-) diff --git a/plugins/inputs/ping/ping.go b/plugins/inputs/ping/ping.go index d1dba2c381646..549c163ff68a3 100644 --- a/plugins/inputs/ping/ping.go +++ b/plugins/inputs/ping/ping.go @@ -13,13 +13,14 @@ import ( "sync" "time" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/plugins/inputs" ping "github.com/sparrc/go-ping" "golang.org/x/net/icmp" "golang.org/x/net/ipv4" "golang.org/x/net/ipv6" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" ) // HostPinger is a function that runs the "ping" function using a list of @@ -97,21 +98,6 @@ func (_ *Ping) SampleConfig() string { } func (p *Ping) Gather(acc telegraf.Accumulator) error { - // var conn *icmp.PacketConn - // var err error - // if p.ipv4 { - // if conn, err = ping.Listen(ipv4Proto[p.network], p.source); conn == nil { - // return err - // } - // conn.IPv4PacketConn().SetControlMessage(ipv4.FlagTTL, true) - // } else { - // if conn, err = ping.Listen(ipv6Proto[p.network], p.source); conn == nil { - // return err - // } - // conn.IPv6PacketConn().SetControlMessage(ipv6.FlagHopLimit, true) - // } - // defer conn.Close() - // Spin off a go routine for each url to ping for _, url := range p.Urls { pinger, err := ping.NewPinger(url) @@ -121,9 +107,14 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error { continue } - var conn *icmp.PacketConn - if conn, err = pinger.Listen(); err != nil { - // close(pinger.done) + pinger.Count = p.Count + pinger.Interval = time.Duration(p.PingInterval) * time.Second + pinger.Timeout = time.Duration(p.Timeout*float64(p.Count)) * time.Second + pinger.Size = 64 + pinger.SetPrivileged(false) + + conn, err := pinger.Listen() + if err != nil { return err // TODO: verify what to return if unrecoverable } @@ -193,12 +184,7 @@ func (p *Ping) pingToURL(u string, pinger *ping.Pinger, conn *icmp.PacketConn, a // } // } - pinger.Count = p.Count - pinger.Interval = time.Duration(p.PingInterval) * time.Second - pinger.Timeout = time.Duration(p.Timeout) * time.Second - pinger.SetPrivileged(true) - - fmt.Printf("pinger timeout: %v \n", pinger.Timeout.Seconds()) + // fmt.Printf("pinger timeout: %v \n", pinger.Timeout.Seconds()) results, err := p.pingHostNative(pinger, conn) diff --git a/plugins/inputs/ping/ping_native.go b/plugins/inputs/ping/ping_native.go index 035ec1d2560e8..2aef2e3d5f555 100644 --- a/plugins/inputs/ping/ping_native.go +++ b/plugins/inputs/ping/ping_native.go @@ -1,7 +1,7 @@ package ping import ( - "fmt" + "time" ping "github.com/sparrc/go-ping" "golang.org/x/net/icmp" @@ -20,24 +20,22 @@ type pingResults struct { // TODO: add privileged flag func (p *Ping) pingHostNative(pinger *ping.Pinger, conn *icmp.PacketConn) (*pingResults, error) { - results := pingResults{} + results := &pingResults{} pinger.OnRecv = func(pkt *ping.Packet) { - fmt.Printf("packet: %#v \n", pkt) + // fmt.Printf("packet: %#v \n", pkt) results.ttl = pkt.Ttl } pinger.OnFinish = func(stats *ping.Statistics) { results.received = stats.PacketsRecv results.transmitted = stats.PacketsSent results.pktLoss = stats.PacketLoss - results.min = stats.MinRtt.Seconds() * 1000 - results.avg = stats.AvgRtt.Seconds() * 1000 - results.max = stats.MaxRtt.Seconds() * 1000 - results.stddev = stats.StdDevRtt.Seconds() * 1000 - - fmt.Printf("stats: %#v \n", stats) + results.min = float64(stats.MinRtt.Nanoseconds()) / float64(time.Millisecond) + results.avg = float64(stats.AvgRtt.Nanoseconds()) / float64(time.Millisecond) + results.max = float64(stats.MaxRtt.Nanoseconds()) / float64(time.Millisecond) + results.stddev = float64(stats.StdDevRtt.Nanoseconds()) / float64(time.Millisecond) } pinger.DoPing(conn) - return &results, nil + return results, nil } From f097abbd36409cd3d25ace9ea5ce73f641590d7a Mon Sep 17 00:00:00 2001 From: greg linton Date: Tue, 7 May 2019 17:41:20 -0600 Subject: [PATCH 03/15] Conditionally use system ping --- plugins/inputs/ping/README.md | 4 +- plugins/inputs/ping/ping.go | 199 ++++++++++++++++------------- plugins/inputs/ping/ping_native.go | 2 - 3 files changed, 112 insertions(+), 93 deletions(-) diff --git a/plugins/inputs/ping/README.md b/plugins/inputs/ping/README.md index 5d3904e929c38..3493285955723 100644 --- a/plugins/inputs/ping/README.md +++ b/plugins/inputs/ping/README.md @@ -36,8 +36,8 @@ apt-get install iputils-ping ## Specify the ping executable binary, default is "ping" # binary = "ping" - ## Arguments for ping command - ## when arguments is not empty, other options (ping_interval, timeout, etc) will be ignored + ## Arguments for ping command. When arguments is not empty, system binary will be used and + ## other options (ping_interval, timeout, etc) will be ignored # arguments = ["-c", "3"] ``` diff --git a/plugins/inputs/ping/ping.go b/plugins/inputs/ping/ping.go index 549c163ff68a3..91f5e28e523ca 100644 --- a/plugins/inputs/ping/ping.go +++ b/plugins/inputs/ping/ping.go @@ -8,9 +8,11 @@ import ( "net" "os/exec" "regexp" + "runtime" "strconv" "strings" "sync" + "syscall" "time" ping "github.com/sparrc/go-ping" @@ -52,8 +54,8 @@ type Ping struct { // Ping executable binary Binary string - // Arguments for ping command. - // when `Arguments` is not empty, other options (ping_interval, timeout, etc) will be ignored + // Arguments for ping command. When arguments is not empty, system binary will be used and + // other options (ping_interval, timeout, etc) will be ignored Arguments []string // host ping function @@ -88,8 +90,8 @@ const sampleConfig = ` ## Specify the ping executable binary, default is "ping" # binary = "ping" - ## Arguments for ping command - ## when arguments is not empty, other options (ping_interval, timeout, etc) will be ignored + ## Arguments for ping command. When arguments is not empty, system binary will be used and + ## other options (ping_interval, timeout, etc) will be ignored # arguments = ["-c", "3"] ` @@ -100,36 +102,51 @@ func (_ *Ping) SampleConfig() string { func (p *Ping) Gather(acc telegraf.Accumulator) error { // Spin off a go routine for each url to ping for _, url := range p.Urls { - pinger, err := ping.NewPinger(url) - if err != nil { - acc.AddError(fmt.Errorf("%s: %s", err, url)) - acc.AddFields("ping", map[string]interface{}{"result_code": 2}, map[string]string{"url": url}) - continue - } + if len(p.Arguments) > 0 { + p.wg.Add(1) - pinger.Count = p.Count - pinger.Interval = time.Duration(p.PingInterval) * time.Second - pinger.Timeout = time.Duration(p.Timeout*float64(p.Count)) * time.Second - pinger.Size = 64 - pinger.SetPrivileged(false) + go p.pingToURL(url, nil, nil, acc) + } else { + pinger, err := ping.NewPinger(url) + if err != nil { + acc.AddError(fmt.Errorf("%s: %s", err, url)) + acc.AddFields("ping", map[string]interface{}{"result_code": 2}, map[string]string{"url": url}) + continue + } - conn, err := pinger.Listen() - if err != nil { - return err // TODO: verify what to return if unrecoverable - } + pinger.Count = p.Count + if p.PingInterval <= 0 { + p.PingInterval = 1 + } + pinger.Interval = time.Nanosecond * time.Duration(p.PingInterval*1000000000) + if p.Deadline != 0 { + pinger.Timeout = time.Duration(p.Deadline) * time.Second + } else { + if p.Timeout <= 0 { + p.Timeout = 1 + } + pinger.Timeout = time.Duration(p.Timeout*float64(p.Count)) * time.Second + } + pinger.Size = 64 - if pinger.GetIpv4() { - conn.IPv4PacketConn().SetControlMessage(ipv4.FlagTTL, true) - } else { - conn.IPv6PacketConn().SetControlMessage(ipv6.FlagHopLimit, true) - } + // TODO: determine need for privileged flag + pinger.SetPrivileged(false) + + conn, err := pinger.Listen() + if err != nil { + return err // TODO: verify what to return if unrecoverable + } - // defer conn.Close() + if pinger.GetIpv4() { + conn.IPv4PacketConn().SetControlMessage(ipv4.FlagTTL, true) + } else { + conn.IPv6PacketConn().SetControlMessage(ipv6.FlagHopLimit, true) + } - p.wg.Add(1) + p.wg.Add(1) - go p.pingToURL(url, pinger, conn, acc) - // go p.pingHostNative(pinger) + go p.pingToURL(url, pinger, conn, acc) + } } p.wg.Wait() @@ -149,73 +166,77 @@ func (p *Ping) pingToURL(u string, pinger *ping.Pinger, conn *icmp.PacketConn, a acc.AddFields("ping", fields, tags) return } - // TODO: make this conditional: - - // args := p.args(u, runtime.GOOS) - // totalTimeout := 60.0 - // if len(p.Arguments) == 0 { - // totalTimeout = float64(p.Count)*p.Timeout + float64(p.Count-1)*p.PingInterval - // } - - // out, err := p.pingHost(p.Binary, totalTimeout, args...) - // if err != nil { - // // Some implementations of ping return a 1 exit code on - // // timeout, if this occurs we will not exit and try to parse - // // the output. - // status := -1 - // if exitError, ok := err.(*exec.ExitError); ok { - // if ws, ok := exitError.Sys().(syscall.WaitStatus); ok { - // status = ws.ExitStatus() - // fields["result_code"] = status - // } - // } - - // if status != 1 { - // // Combine go err + stderr output - // out = strings.TrimSpace(out) - // if len(out) > 0 { - // acc.AddError(fmt.Errorf("host %s: %s, %s", u, out, err)) - // } else { - // acc.AddError(fmt.Errorf("host %s: %s", u, err)) - // } - // fields["result_code"] = 2 - // acc.AddFields("ping", fields, tags) - // return - // } - // } - - // fmt.Printf("pinger timeout: %v \n", pinger.Timeout.Seconds()) - - results, err := p.pingHostNative(pinger, conn) - - // trans, rec, ttl, min, avg, max, stddev, err := processPingOutput(out) - if err != nil { - // fatal error - acc.AddError(fmt.Errorf("%s: %s", err, u)) - fields["result_code"] = 2 - acc.AddFields("ping", fields, tags) - return + var trans, rec, ttl int + var loss, min, avg, max, stddev float64 + + if len(p.Arguments) > 0 { + out, err := p.pingHost(p.Binary, 60.0, p.args(u, runtime.GOOS)...) + if err != nil { + // Some implementations of ping return a 1 exit code on + // timeout, if this occurs we will not exit and try to parse + // the output. + status := -1 + if exitError, ok := err.(*exec.ExitError); ok { + if ws, ok := exitError.Sys().(syscall.WaitStatus); ok { + status = ws.ExitStatus() + fields["result_code"] = status + } + } + + if status != 1 { + // Combine go err + stderr output + out = strings.TrimSpace(out) + if len(out) > 0 { + acc.AddError(fmt.Errorf("host %s: %s, %s", u, out, err)) + } else { + acc.AddError(fmt.Errorf("host %s: %s", u, err)) + } + fields["result_code"] = 2 + acc.AddFields("ping", fields, tags) + return + } + } + trans, rec, ttl, min, avg, max, stddev, err = processPingOutput(out) + if err != nil { + // fatal error + acc.AddError(fmt.Errorf("%s: %s", err, u)) + fields["result_code"] = 2 + acc.AddFields("ping", fields, tags) + return + } + + // Calculate packet loss percentage + loss = float64(trans-rec) / float64(trans) * 100.0 + } else { + results, err := p.pingHostNative(pinger, conn) + if err != nil { + // fatal error + acc.AddError(fmt.Errorf("%s: %s", err, u)) + fields["result_code"] = 2 + acc.AddFields("ping", fields, tags) + return + } + trans, rec, ttl, loss, min, avg, max, stddev = results.transmitted, results.received, results.ttl, results.pktLoss, results.min, results.avg, results.max, results.stddev } - // Calculate packet loss percentage - // loss := float64(trans-rec) / float64(trans) * 100.0 - fields["packets_transmitted"] = results.transmitted - fields["packets_received"] = results.received - fields["percent_packet_loss"] = results.pktLoss - if results.ttl >= 0 { - fields["ttl"] = results.ttl + + fields["packets_transmitted"] = trans + fields["packets_received"] = rec + fields["percent_packet_loss"] = loss + if ttl >= 0 { + fields["ttl"] = ttl } - if results.min >= 0 { - fields["minimum_response_ms"] = results.min + if min >= 0 { + fields["minimum_response_ms"] = min } - if results.avg >= 0 { - fields["average_response_ms"] = results.avg + if avg >= 0 { + fields["average_response_ms"] = avg } - if results.max >= 0 { - fields["maximum_response_ms"] = results.max + if max >= 0 { + fields["maximum_response_ms"] = max } - if results.stddev >= 0 { - fields["standard_deviation_ms"] = results.stddev + if stddev >= 0 { + fields["standard_deviation_ms"] = stddev } acc.AddFields("ping", fields, tags) } diff --git a/plugins/inputs/ping/ping_native.go b/plugins/inputs/ping/ping_native.go index 2aef2e3d5f555..c934b870c2157 100644 --- a/plugins/inputs/ping/ping_native.go +++ b/plugins/inputs/ping/ping_native.go @@ -18,12 +18,10 @@ type pingResults struct { stddev float64 } -// TODO: add privileged flag func (p *Ping) pingHostNative(pinger *ping.Pinger, conn *icmp.PacketConn) (*pingResults, error) { results := &pingResults{} pinger.OnRecv = func(pkt *ping.Packet) { - // fmt.Printf("packet: %#v \n", pkt) results.ttl = pkt.Ttl } pinger.OnFinish = func(stats *ping.Statistics) { From f2ec8e4208f93747b2ebb13f691961d8f29263f7 Mon Sep 17 00:00:00 2001 From: greg linton Date: Mon, 13 May 2019 12:24:18 -0600 Subject: [PATCH 04/15] WIP --- plugins/inputs/ping/ping.go | 348 +++++++++---------------- plugins/inputs/ping/ping_native.go | 39 --- plugins/inputs/ping/ping_notwindows.go | 213 +++++++++++++++ plugins/inputs/ping/ping_windows.go | 104 +------- 4 files changed, 335 insertions(+), 369 deletions(-) delete mode 100644 plugins/inputs/ping/ping_native.go create mode 100644 plugins/inputs/ping/ping_notwindows.go diff --git a/plugins/inputs/ping/ping.go b/plugins/inputs/ping/ping.go index 91f5e28e523ca..c83bd4ca98260 100644 --- a/plugins/inputs/ping/ping.go +++ b/plugins/inputs/ping/ping.go @@ -1,24 +1,15 @@ -// +build !windows - package ping import ( - "errors" + "context" "fmt" "net" "os/exec" - "regexp" - "runtime" - "strconv" - "strings" "sync" - "syscall" "time" ping "github.com/sparrc/go-ping" "golang.org/x/net/icmp" - "golang.org/x/net/ipv4" - "golang.org/x/net/ipv6" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -39,7 +30,7 @@ type Ping struct { // Number of pings to send (ping -c ) Count int - // Ping timeout, in seconds. 0 means no timeout (ping -W ) + // Per-ping timeout, in seconds. 0 means no timeout (ping -W ) Timeout float64 // Ping deadline, in seconds. 0 means no deadline. (ping -w ) @@ -60,6 +51,11 @@ type Ping struct { // host ping function pingHost HostPinger + + // listenAddr is the address associated with the interface defined. + listenAddr string + + ctx context.Context } func (_ *Ping) Description() string { @@ -74,7 +70,6 @@ const sampleConfig = ` # count = 1 ## Interval, in s, at which to ping. 0 == default (ping -i ) - ## Not available in Windows. # ping_interval = 1.0 ## Per-ping timeout, in s. 0 == no timeout (ping -W ) @@ -83,8 +78,7 @@ const sampleConfig = ` ## Total-ping deadline, in s. 0 == no deadline (ping -w ) # deadline = 10 - ## Interface or source address to send ping from (ping -I ) - ## on Darwin and Freebsd only source address possible: (ping -S ) + ## Interface or source address to send ping from (ping -I[-S] ) # interface = "" ## Specify the ping executable binary, default is "ping" @@ -100,52 +94,60 @@ func (_ *Ping) SampleConfig() string { } func (p *Ping) Gather(acc telegraf.Accumulator) error { - // Spin off a go routine for each url to ping + if p.Interface != "" && p.listenAddr != "" { + p.listenAddr = getAddr(p.Interface) + } + for _, url := range p.Urls { + _, err := net.LookupHost(url) + if err != nil { + acc.AddFields("ping", map[string]interface{}{"result_code": 1}, map[string]string{"url": url}) + // todo: return err? + acc.AddError(err) + return nil + } + if len(p.Arguments) > 0 { p.wg.Add(1) - go p.pingToURL(url, nil, nil, acc) + go p.pingToURL(url, acc) } else { pinger, err := ping.NewPinger(url) if err != nil { - acc.AddError(fmt.Errorf("%s: %s", err, url)) + acc.AddError(fmt.Errorf("%v: %s", err, url)) acc.AddFields("ping", map[string]interface{}{"result_code": 2}, map[string]string{"url": url}) continue } pinger.Count = p.Count - if p.PingInterval <= 0 { + + if p.PingInterval < 0.2 { p.PingInterval = 1 } pinger.Interval = time.Nanosecond * time.Duration(p.PingInterval*1000000000) - if p.Deadline != 0 { + + if p.Deadline > 0 { pinger.Timeout = time.Duration(p.Deadline) * time.Second - } else { - if p.Timeout <= 0 { - p.Timeout = 1 - } - pinger.Timeout = time.Duration(p.Timeout*float64(p.Count)) * time.Second } + + if p.Timeout <= 0 { + p.Timeout = 1 + } + pinger.Deadline = time.Nanosecond * time.Duration(p.Timeout*1000000000) + pinger.Size = 64 // TODO: determine need for privileged flag - pinger.SetPrivileged(false) + pinger.SetPrivileged(true) - conn, err := pinger.Listen() + conn, err := pinger.Listen(p.listenAddr) if err != nil { - return err // TODO: verify what to return if unrecoverable - } - - if pinger.GetIpv4() { - conn.IPv4PacketConn().SetControlMessage(ipv4.FlagTTL, true) - } else { - conn.IPv6PacketConn().SetControlMessage(ipv6.FlagHopLimit, true) + return err } p.wg.Add(1) - go p.pingToURL(url, pinger, conn, acc) + go p.pingToURLNative(url, pinger, conn, acc) } } @@ -154,91 +156,39 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error { return nil } -func (p *Ping) pingToURL(u string, pinger *ping.Pinger, conn *icmp.PacketConn, acc telegraf.Accumulator) { - defer p.wg.Done() - tags := map[string]string{"url": u} - fields := map[string]interface{}{"result_code": 0} +func getAddr(iface string) string { + if addr := net.ParseIP(iface); addr != nil { + return addr.String() + } - _, err := net.LookupHost(u) + ifaces, err := net.Interfaces() if err != nil { - acc.AddError(err) - fields["result_code"] = 1 - acc.AddFields("ping", fields, tags) - return + return "" } - var trans, rec, ttl int - var loss, min, avg, max, stddev float64 - - if len(p.Arguments) > 0 { - out, err := p.pingHost(p.Binary, 60.0, p.args(u, runtime.GOOS)...) - if err != nil { - // Some implementations of ping return a 1 exit code on - // timeout, if this occurs we will not exit and try to parse - // the output. - status := -1 - if exitError, ok := err.(*exec.ExitError); ok { - if ws, ok := exitError.Sys().(syscall.WaitStatus); ok { - status = ws.ExitStatus() - fields["result_code"] = status - } + var ip net.IP + for i := range ifaces { + if ifaces[i].Name == iface { + addrs, err := ifaces[i].Addrs() + if err != nil { + return "" } - - if status != 1 { - // Combine go err + stderr output - out = strings.TrimSpace(out) - if len(out) > 0 { - acc.AddError(fmt.Errorf("host %s: %s, %s", u, out, err)) - } else { - acc.AddError(fmt.Errorf("host %s: %s", u, err)) + if len(addrs) > 0 { + switch v := addrs[0].(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP } - fields["result_code"] = 2 - acc.AddFields("ping", fields, tags) - return + if len(ip) == 0 { + return "" + } + return ip.String() } } - trans, rec, ttl, min, avg, max, stddev, err = processPingOutput(out) - if err != nil { - // fatal error - acc.AddError(fmt.Errorf("%s: %s", err, u)) - fields["result_code"] = 2 - acc.AddFields("ping", fields, tags) - return - } - - // Calculate packet loss percentage - loss = float64(trans-rec) / float64(trans) * 100.0 - } else { - results, err := p.pingHostNative(pinger, conn) - if err != nil { - // fatal error - acc.AddError(fmt.Errorf("%s: %s", err, u)) - fields["result_code"] = 2 - acc.AddFields("ping", fields, tags) - return - } - trans, rec, ttl, loss, min, avg, max, stddev = results.transmitted, results.received, results.ttl, results.pktLoss, results.min, results.avg, results.max, results.stddev } - fields["packets_transmitted"] = trans - fields["packets_received"] = rec - fields["percent_packet_loss"] = loss - if ttl >= 0 { - fields["ttl"] = ttl - } - if min >= 0 { - fields["minimum_response_ms"] = min - } - if avg >= 0 { - fields["average_response_ms"] = avg - } - if max >= 0 { - fields["maximum_response_ms"] = max - } - if stddev >= 0 { - fields["standard_deviation_ms"] = stddev - } - acc.AddFields("ping", fields, tags) + return "" } func hostPinger(binary string, timeout float64, args ...string) (string, error) { @@ -252,137 +202,80 @@ func hostPinger(binary string, timeout float64, args ...string) (string, error) return string(out), err } -// args returns the arguments for the 'ping' executable -func (p *Ping) args(url string, system string) []string { - if len(p.Arguments) > 0 { - return append(p.Arguments, url) +type pingResults struct { + transmitted int + received int + pktLoss float64 + ttl int + min float64 + avg float64 + max float64 + stddev float64 +} + +// func getGID() uint64 { +// b := make([]byte, 64) +// b = b[:runtime.Stack(b, false)] +// b = bytes.TrimPrefix(b, []byte("goroutine ")) +// b = b[:bytes.IndexByte(b, ' ')] +// n, _ := strconv.ParseUint(string(b), 10, 64) +// return n +// } + +func (p *Ping) pingToURLNative(u string, pinger *ping.Pinger, conn *icmp.PacketConn, acc telegraf.Accumulator) { + + defer p.wg.Done() + tags := map[string]string{"url": u} + fields := map[string]interface{}{"result_code": 0} + + results, err := p.pingHostNative(pinger, conn) + if err != nil { + acc.AddError(fmt.Errorf("%s: %s", err, u)) + fields["result_code"] = 2 + acc.AddFields("ping", fields, tags) + return } - // build the ping command args based on toml config - args := []string{"-c", strconv.Itoa(p.Count), "-n", "-s", "16"} - if p.PingInterval > 0 { - args = append(args, "-i", strconv.FormatFloat(p.PingInterval, 'f', -1, 64)) + fields["packets_transmitted"] = results.transmitted + fields["packets_received"] = results.received + fields["percent_packet_loss"] = results.pktLoss + if results.ttl > 0 { + fields["ttl"] = results.ttl } - if p.Timeout > 0 { - switch system { - case "darwin": - args = append(args, "-W", strconv.FormatFloat(p.Timeout*1000, 'f', -1, 64)) - case "freebsd", "netbsd", "openbsd": - args = append(args, "-W", strconv.FormatFloat(p.Timeout*1000, 'f', -1, 64)) - case "linux": - args = append(args, "-W", strconv.FormatFloat(p.Timeout, 'f', -1, 64)) - default: - // Not sure the best option here, just assume GNU ping? - args = append(args, "-W", strconv.FormatFloat(p.Timeout, 'f', -1, 64)) - } + if results.min >= 0 { + fields["minimum_response_ms"] = results.min } - if p.Deadline > 0 { - switch system { - case "darwin", "freebsd", "netbsd", "openbsd": - args = append(args, "-t", strconv.Itoa(p.Deadline)) - case "linux": - args = append(args, "-w", strconv.Itoa(p.Deadline)) - default: - // not sure the best option here, just assume gnu ping? - args = append(args, "-w", strconv.Itoa(p.Deadline)) - } + if results.avg >= 0 { + fields["average_response_ms"] = results.avg } - if p.Interface != "" { - switch system { - case "darwin": - args = append(args, "-I", p.Interface) - case "freebsd", "netbsd", "openbsd": - args = append(args, "-s", p.Interface) - case "linux": - args = append(args, "-I", p.Interface) - default: - // not sure the best option here, just assume gnu ping? - args = append(args, "-i", p.Interface) - } + if results.max >= 0 { + fields["maximum_response_ms"] = results.max } - args = append(args, url) - return args -} - -// processPingOutput takes in a string output from the ping command, like: -// -// ping www.google.com (173.194.115.84): 56 data bytes -// 64 bytes from 173.194.115.84: icmp_seq=0 ttl=54 time=52.172 ms -// 64 bytes from 173.194.115.84: icmp_seq=1 ttl=54 time=34.843 ms -// -// --- www.google.com ping statistics --- -// 2 packets transmitted, 2 packets received, 0.0% packet loss -// round-trip min/avg/max/stddev = 34.843/43.508/52.172/8.664 ms -// -// It returns (, , ) -func processPingOutput(out string) (int, int, int, float64, float64, float64, float64, error) { - var trans, recv, ttl int = 0, 0, -1 - var min, avg, max, stddev float64 = -1.0, -1.0, -1.0, -1.0 - // Set this error to nil if we find a 'transmitted' line - err := errors.New("Fatal error processing ping output") - lines := strings.Split(out, "\n") - for _, line := range lines { - // Reading only first TTL, ignoring other TTL messages - if ttl == -1 && strings.Contains(line, "ttl=") { - ttl, err = getTTL(line) - } else if strings.Contains(line, "transmitted") && - strings.Contains(line, "received") { - trans, recv, err = getPacketStats(line, trans, recv) - if err != nil { - return trans, recv, ttl, min, avg, max, stddev, err - } - } else if strings.Contains(line, "min/avg/max") { - min, avg, max, stddev, err = checkRoundTripTimeStats(line, min, avg, max, stddev) - if err != nil { - return trans, recv, ttl, min, avg, max, stddev, err - } - } + if results.stddev >= 0 { + fields["standard_deviation_ms"] = results.stddev } - return trans, recv, ttl, min, avg, max, stddev, err -} -func getPacketStats(line string, trans, recv int) (int, int, error) { - stats := strings.Split(line, ", ") - // Transmitted packets - trans, err := strconv.Atoi(strings.Split(stats[0], " ")[0]) - if err != nil { - return trans, recv, err - } - // Received packets - recv, err = strconv.Atoi(strings.Split(stats[1], " ")[0]) - return trans, recv, err -} - -func getTTL(line string) (int, error) { - ttlLine := regexp.MustCompile(`ttl=(\d+)`) - ttlMatch := ttlLine.FindStringSubmatch(line) - return strconv.Atoi(ttlMatch[1]) + acc.AddFields("ping", fields, tags) } -func checkRoundTripTimeStats(line string, min, avg, max, - stddev float64) (float64, float64, float64, float64, error) { - stats := strings.Split(line, " ")[3] - data := strings.Split(stats, "/") +func (p *Ping) pingHostNative(pinger *ping.Pinger, conn *icmp.PacketConn) (*pingResults, error) { + results := &pingResults{} - min, err := strconv.ParseFloat(data[0], 64) - if err != nil { - return min, avg, max, stddev, err - } - avg, err = strconv.ParseFloat(data[1], 64) - if err != nil { - return min, avg, max, stddev, err - } - max, err = strconv.ParseFloat(data[2], 64) - if err != nil { - return min, avg, max, stddev, err + pinger.OnRecv = func(pkt *ping.Packet) { + results.ttl = pkt.Ttl } - if len(data) == 4 { - stddev, err = strconv.ParseFloat(data[3], 64) - if err != nil { - return min, avg, max, stddev, err - } + + pinger.OnFinish = func(stats *ping.Statistics) { + results.received = stats.PacketsRecv + results.transmitted = stats.PacketsSent + results.pktLoss = stats.PacketLoss + results.min = float64(stats.MinRtt.Nanoseconds()) / float64(time.Millisecond) + results.avg = float64(stats.AvgRtt.Nanoseconds()) / float64(time.Millisecond) + results.max = float64(stats.MaxRtt.Nanoseconds()) / float64(time.Millisecond) + results.stddev = float64(stats.StdDevRtt.Nanoseconds()) / float64(time.Millisecond) } - return min, avg, max, stddev, err + + return results, pinger.DoPing(p.ctx, conn) } func init() { @@ -395,6 +288,7 @@ func init() { Deadline: 10, Binary: "ping", Arguments: []string{}, + ctx: context.Background(), } }) } diff --git a/plugins/inputs/ping/ping_native.go b/plugins/inputs/ping/ping_native.go deleted file mode 100644 index c934b870c2157..0000000000000 --- a/plugins/inputs/ping/ping_native.go +++ /dev/null @@ -1,39 +0,0 @@ -package ping - -import ( - "time" - - ping "github.com/sparrc/go-ping" - "golang.org/x/net/icmp" -) - -type pingResults struct { - transmitted int - received int - pktLoss float64 - ttl int - min float64 - avg float64 - max float64 - stddev float64 -} - -func (p *Ping) pingHostNative(pinger *ping.Pinger, conn *icmp.PacketConn) (*pingResults, error) { - results := &pingResults{} - - pinger.OnRecv = func(pkt *ping.Packet) { - results.ttl = pkt.Ttl - } - pinger.OnFinish = func(stats *ping.Statistics) { - results.received = stats.PacketsRecv - results.transmitted = stats.PacketsSent - results.pktLoss = stats.PacketLoss - results.min = float64(stats.MinRtt.Nanoseconds()) / float64(time.Millisecond) - results.avg = float64(stats.AvgRtt.Nanoseconds()) / float64(time.Millisecond) - results.max = float64(stats.MaxRtt.Nanoseconds()) / float64(time.Millisecond) - results.stddev = float64(stats.StdDevRtt.Nanoseconds()) / float64(time.Millisecond) - } - pinger.DoPing(conn) - - return results, nil -} diff --git a/plugins/inputs/ping/ping_notwindows.go b/plugins/inputs/ping/ping_notwindows.go new file mode 100644 index 0000000000000..ad9040f51690e --- /dev/null +++ b/plugins/inputs/ping/ping_notwindows.go @@ -0,0 +1,213 @@ +// +build !windows + +package ping + +import ( + "errors" + "fmt" + "os/exec" + "regexp" + "runtime" + "strconv" + "strings" + "syscall" + + "github.com/influxdata/telegraf" +) + +func (p *Ping) pingToURL(u string, acc telegraf.Accumulator) { + defer p.wg.Done() + tags := map[string]string{"url": u} + fields := map[string]interface{}{"result_code": 0} + + out, err := p.pingHost(p.Binary, 60.0, p.args(u, runtime.GOOS)...) + if err != nil { + // Some implementations of ping return a 1 exit code on + // timeout, if this occurs we will not exit and try to parse + // the output. + status := -1 + if exitError, ok := err.(*exec.ExitError); ok { + if ws, ok := exitError.Sys().(syscall.WaitStatus); ok { + status = ws.ExitStatus() + fields["result_code"] = status + } + } + + if status != 1 { + // Combine go err + stderr output + out = strings.TrimSpace(out) + if len(out) > 0 { + acc.AddError(fmt.Errorf("host %s: %s, %s", u, out, err)) + } else { + acc.AddError(fmt.Errorf("host %s: %s", u, err)) + } + fields["result_code"] = 2 + acc.AddFields("ping", fields, tags) + return + } + } + trans, rec, ttl, min, avg, max, stddev, err := processPingOutput(out) + if err != nil { + // fatal error + acc.AddError(fmt.Errorf("%s: %s", err, u)) + fields["result_code"] = 2 + acc.AddFields("ping", fields, tags) + return + } + + // Calculate packet loss percentage + loss := float64(trans-rec) / float64(trans) * 100.0 + + fields["packets_transmitted"] = trans + fields["packets_received"] = rec + fields["percent_packet_loss"] = loss + if ttl >= 0 { + fields["ttl"] = ttl + } + if min >= 0 { + fields["minimum_response_ms"] = min + } + if avg >= 0 { + fields["average_response_ms"] = avg + } + if max >= 0 { + fields["maximum_response_ms"] = max + } + if stddev >= 0 { + fields["standard_deviation_ms"] = stddev + } + acc.AddFields("ping", fields, tags) +} + +// args returns the arguments for the 'ping' executable +func (p *Ping) args(url string, system string) []string { + if len(p.Arguments) > 0 { + return append(p.Arguments, url) + } + + // build the ping command args based on toml config + args := []string{"-c", strconv.Itoa(p.Count), "-n", "-s", "16"} + if p.PingInterval > 0 { + args = append(args, "-i", strconv.FormatFloat(p.PingInterval, 'f', -1, 64)) + } + if p.Timeout > 0 { + switch system { + case "darwin": + args = append(args, "-W", strconv.FormatFloat(p.Timeout*1000, 'f', -1, 64)) + case "freebsd", "netbsd", "openbsd": + args = append(args, "-W", strconv.FormatFloat(p.Timeout*1000, 'f', -1, 64)) + case "linux": + args = append(args, "-W", strconv.FormatFloat(p.Timeout, 'f', -1, 64)) + default: + // Not sure the best option here, just assume GNU ping? + args = append(args, "-W", strconv.FormatFloat(p.Timeout, 'f', -1, 64)) + } + } + if p.Deadline > 0 { + switch system { + case "darwin", "freebsd", "netbsd", "openbsd": + args = append(args, "-t", strconv.Itoa(p.Deadline)) + case "linux": + args = append(args, "-w", strconv.Itoa(p.Deadline)) + default: + // not sure the best option here, just assume gnu ping? + args = append(args, "-w", strconv.Itoa(p.Deadline)) + } + } + if p.Interface != "" { + switch system { + case "darwin": + args = append(args, "-I", p.Interface) + case "freebsd", "netbsd", "openbsd": + args = append(args, "-s", p.Interface) + case "linux": + args = append(args, "-I", p.Interface) + default: + // not sure the best option here, just assume gnu ping? + args = append(args, "-i", p.Interface) + } + } + args = append(args, url) + return args +} + +// processPingOutput takes in a string output from the ping command, like: +// +// ping www.google.com (173.194.115.84): 56 data bytes +// 64 bytes from 173.194.115.84: icmp_seq=0 ttl=54 time=52.172 ms +// 64 bytes from 173.194.115.84: icmp_seq=1 ttl=54 time=34.843 ms +// +// --- www.google.com ping statistics --- +// 2 packets transmitted, 2 packets received, 0.0% packet loss +// round-trip min/avg/max/stddev = 34.843/43.508/52.172/8.664 ms +// +// It returns (, , ) +func processPingOutput(out string) (int, int, int, float64, float64, float64, float64, error) { + var trans, recv, ttl int = 0, 0, -1 + var min, avg, max, stddev float64 = -1.0, -1.0, -1.0, -1.0 + // Set this error to nil if we find a 'transmitted' line + err := errors.New("Fatal error processing ping output") + lines := strings.Split(out, "\n") + for _, line := range lines { + // Reading only first TTL, ignoring other TTL messages + if ttl == -1 && strings.Contains(line, "ttl=") { + ttl, err = getTTL(line) + } else if strings.Contains(line, "transmitted") && + strings.Contains(line, "received") { + trans, recv, err = getPacketStats(line, trans, recv) + if err != nil { + return trans, recv, ttl, min, avg, max, stddev, err + } + } else if strings.Contains(line, "min/avg/max") { + min, avg, max, stddev, err = checkRoundTripTimeStats(line, min, avg, max, stddev) + if err != nil { + return trans, recv, ttl, min, avg, max, stddev, err + } + } + } + return trans, recv, ttl, min, avg, max, stddev, err +} + +func getPacketStats(line string, trans, recv int) (int, int, error) { + stats := strings.Split(line, ", ") + // Transmitted packets + trans, err := strconv.Atoi(strings.Split(stats[0], " ")[0]) + if err != nil { + return trans, recv, err + } + // Received packets + recv, err = strconv.Atoi(strings.Split(stats[1], " ")[0]) + return trans, recv, err +} + +func getTTL(line string) (int, error) { + ttlLine := regexp.MustCompile(`ttl=(\d+)`) + ttlMatch := ttlLine.FindStringSubmatch(line) + return strconv.Atoi(ttlMatch[1]) +} + +func checkRoundTripTimeStats(line string, min, avg, max, + stddev float64) (float64, float64, float64, float64, error) { + stats := strings.Split(line, " ")[3] + data := strings.Split(stats, "/") + + min, err := strconv.ParseFloat(data[0], 64) + if err != nil { + return min, avg, max, stddev, err + } + avg, err = strconv.ParseFloat(data[1], 64) + if err != nil { + return min, avg, max, stddev, err + } + max, err = strconv.ParseFloat(data[2], 64) + if err != nil { + return min, avg, max, stddev, err + } + if len(data) == 4 { + stddev, err = strconv.ParseFloat(data[3], 64) + if err != nil { + return min, avg, max, stddev, err + } + } + return min, avg, max, stddev, err +} diff --git a/plugins/inputs/ping/ping_windows.go b/plugins/inputs/ping/ping_windows.go index 6064fabe4b6dc..e9a5af4a7cbbc 100644 --- a/plugins/inputs/ping/ping_windows.go +++ b/plugins/inputs/ping/ping_windows.go @@ -5,103 +5,23 @@ package ping import ( "errors" "fmt" - "net" - "os/exec" "regexp" "strconv" "strings" - "sync" - "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/plugins/inputs" ) -// HostPinger is a function that runs the "ping" function using a list of -// passed arguments. This can be easily switched with a mocked ping function -// for unit test purposes (see ping_test.go) -type HostPinger func(binary string, timeout float64, args ...string) (string, error) - -type Ping struct { - wg sync.WaitGroup - - // Number of pings to send (ping -c ) - Count int - - // Ping timeout, in seconds. 0 means no timeout (ping -W ) - Timeout float64 - - // URLs to ping - Urls []string - - // Ping executable binary - Binary string - - // Arguments for ping command. - // when `Arguments` is not empty, other options (ping_interval, timeout, etc) will be ignored - Arguments []string - - // host ping function - pingHost HostPinger -} - -func (s *Ping) Description() string { - return "Ping given url(s) and return statistics" -} - -const sampleConfig = ` - ## List of urls to ping - urls = ["www.google.com"] - - ## number of pings to send per collection (ping -n ) - # count = 1 - - ## Ping timeout, in seconds. 0.0 means default timeout (ping -w ) - # timeout = 0.0 - - ## Specify the ping executable binary, default is "ping" - # binary = "ping" - - ## Arguments for ping command - ## when arguments is not empty, other options (ping_interval, timeout, etc) will be ignored - # arguments = ["-c", "3"] -` - -func (s *Ping) SampleConfig() string { - return sampleConfig -} - -func (p *Ping) Gather(acc telegraf.Accumulator) error { +func (p *Ping) pingToURL(u string, acc telegraf.Accumulator) { if p.Count < 1 { p.Count = 1 } - // Spin off a go routine for each url to ping - for _, url := range p.Urls { - p.wg.Add(1) - go p.pingToURL(url, acc) - } - - p.wg.Wait() - - return nil -} - -func (p *Ping) pingToURL(u string, acc telegraf.Accumulator) { defer p.wg.Done() tags := map[string]string{"url": u} fields := map[string]interface{}{"result_code": 0} - _, err := net.LookupHost(u) - if err != nil { - acc.AddError(err) - fields["result_code"] = 1 - acc.AddFields("ping", fields, tags) - return - } - args := p.args(u) totalTimeout := 60.0 if len(p.Arguments) == 0 { @@ -151,17 +71,6 @@ func (p *Ping) pingToURL(u string, acc telegraf.Accumulator) { acc.AddFields("ping", fields, tags) } -func hostPinger(binary string, timeout float64, args ...string) (string, error) { - bin, err := exec.LookPath(binary) - if err != nil { - return "", err - } - c := exec.Command(bin, args...) - out, err := internal.CombinedOutputTimeout(c, - time.Second*time.Duration(timeout+1)) - return string(out), err -} - // args returns the arguments for the 'ping' executable func (p *Ping) args(url string) []string { if len(p.Arguments) > 0 { @@ -246,14 +155,3 @@ func (p *Ping) timeout() float64 { } return 4 + 1 } - -func init() { - inputs.Add("ping", func() telegraf.Input { - return &Ping{ - pingHost: hostPinger, - Count: 1, - Binary: "ping", - Arguments: []string{}, - } - }) -} From 14e77945f27989f1b00e53200fc9677cd188acd4 Mon Sep 17 00:00:00 2001 From: greg linton Date: Wed, 12 Jun 2019 14:37:22 -0600 Subject: [PATCH 05/15] Update go-ping --- Gopkg.lock | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/Gopkg.lock b/Gopkg.lock index 79bb78c10f05a..351cd2d48578a 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1048,6 +1048,14 @@ pruneopts = "" revision = "96b86229e9b3ffb4b954144cdc7f98fe3ee1003f" +[[projects]] + branch = "master" + digest = "1:8f832efb46d0a00e227dfe26281c07471d35555b96dc3a83b990cfa6d20faced" + name = "github.com/sparrc/go-ping" + packages = ["."] + pruneopts = "" + revision = "e33cfb8ae7ed4ebbfd07bad9d9994f466c6f4b2e" + [[projects]] branch = "master" digest = "1:4e8f1cae8e6d83af9000d82566efb8823907dae77ba4f1d76ff28fdd197c3c90" @@ -1242,6 +1250,7 @@ "http/httpguts", "http2", "http2/hpack", + "icmp", "idna", "internal/iana", "internal/socket", @@ -1623,6 +1632,7 @@ "github.com/shirou/gopsutil/net", "github.com/shirou/gopsutil/process", "github.com/soniah/gosnmp", + "github.com/sparrc/go-ping", "github.com/streadway/amqp", "github.com/stretchr/testify/assert", "github.com/stretchr/testify/mock", @@ -1645,6 +1655,7 @@ "github.com/wvanbergen/kafka/consumergroup", "golang.org/x/net/context", "golang.org/x/net/html/charset", + "golang.org/x/net/icmp", "golang.org/x/oauth2", "golang.org/x/oauth2/clientcredentials", "golang.org/x/oauth2/google", From f3196530275d59652333f57ce4fba3690e2a6d78 Mon Sep 17 00:00:00 2001 From: greg linton Date: Wed, 12 Jun 2019 15:41:01 -0600 Subject: [PATCH 06/15] Enforce timeout --- plugins/inputs/ping/ping.go | 57 +++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/plugins/inputs/ping/ping.go b/plugins/inputs/ping/ping.go index c83bd4ca98260..bdbb6a373d0bf 100644 --- a/plugins/inputs/ping/ping.go +++ b/plugins/inputs/ping/ping.go @@ -3,13 +3,13 @@ package ping import ( "context" "fmt" + "log" "net" "os/exec" "sync" "time" ping "github.com/sparrc/go-ping" - "golang.org/x/net/icmp" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -72,7 +72,7 @@ const sampleConfig = ` ## Interval, in s, at which to ping. 0 == default (ping -i ) # ping_interval = 1.0 - ## Per-ping timeout, in s. 0 == no timeout (ping -W ) + ## Per-ping timeout, in s. 0 == no timeout (ping -W ) Deprecated in 1.12 # timeout = 1.0 ## Total-ping deadline, in s. 0 == no deadline (ping -w ) @@ -98,11 +98,14 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error { p.listenAddr = getAddr(p.Interface) } + if p.Timeout > 0 { + log.Println("W! [inputs.ping] 'timeout' deprecated in telegraf 1.12") + } + for _, url := range p.Urls { _, err := net.LookupHost(url) if err != nil { acc.AddFields("ping", map[string]interface{}{"result_code": 1}, map[string]string{"url": url}) - // todo: return err? acc.AddError(err) return nil } @@ -130,24 +133,13 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error { pinger.Timeout = time.Duration(p.Deadline) * time.Second } - if p.Timeout <= 0 { - p.Timeout = 1 - } - pinger.Deadline = time.Nanosecond * time.Duration(p.Timeout*1000000000) - - pinger.Size = 64 + pinger.Size = 56 - // TODO: determine need for privileged flag pinger.SetPrivileged(true) - - conn, err := pinger.Listen(p.listenAddr) - if err != nil { - return err - } + pinger.Source = p.listenAddr p.wg.Add(1) - - go p.pingToURLNative(url, pinger, conn, acc) + go p.pingToURLNative(url, pinger, acc) } } @@ -222,19 +214,12 @@ type pingResults struct { // return n // } -func (p *Ping) pingToURLNative(u string, pinger *ping.Pinger, conn *icmp.PacketConn, acc telegraf.Accumulator) { - +func (p *Ping) pingToURLNative(u string, pinger *ping.Pinger, acc telegraf.Accumulator) { defer p.wg.Done() tags := map[string]string{"url": u} fields := map[string]interface{}{"result_code": 0} - results, err := p.pingHostNative(pinger, conn) - if err != nil { - acc.AddError(fmt.Errorf("%s: %s", err, u)) - fields["result_code"] = 2 - acc.AddFields("ping", fields, tags) - return - } + results := p.pingHostNative(pinger) fields["packets_transmitted"] = results.transmitted fields["packets_received"] = results.received @@ -258,7 +243,7 @@ func (p *Ping) pingToURLNative(u string, pinger *ping.Pinger, conn *icmp.PacketC acc.AddFields("ping", fields, tags) } -func (p *Ping) pingHostNative(pinger *ping.Pinger, conn *icmp.PacketConn) (*pingResults, error) { +func (p *Ping) pingHostNative(pinger *ping.Pinger) *pingResults { results := &pingResults{} pinger.OnRecv = func(pkt *ping.Packet) { @@ -275,7 +260,23 @@ func (p *Ping) pingHostNative(pinger *ping.Pinger, conn *icmp.PacketConn) (*ping results.stddev = float64(stats.StdDevRtt.Nanoseconds()) / float64(time.Millisecond) } - return results, pinger.DoPing(p.ctx, conn) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + pinger.Run() + cancel() + cancel() + }() + select { + case <-ctx.Done(): + break + case <-time.After(time.Duration(p.Deadline) * time.Second): + fmt.Println("PING TIMED OUT") + pinger.Stop() + } + + return results } func init() { From 0272ec17f28b2dc69beb260b8470266b32366666 Mon Sep 17 00:00:00 2001 From: greg linton Date: Wed, 26 Jun 2019 16:46:01 -0600 Subject: [PATCH 07/15] Use updated library interface --- plugins/inputs/ping/README.md | 6 + plugins/inputs/ping/ping.go | 251 ++++++++++++++++++++-------------- 2 files changed, 151 insertions(+), 106 deletions(-) diff --git a/plugins/inputs/ping/README.md b/plugins/inputs/ping/README.md index 3493285955723..8e17f1374458e 100644 --- a/plugins/inputs/ping/README.md +++ b/plugins/inputs/ping/README.md @@ -33,12 +33,18 @@ apt-get install iputils-ping ## on Darwin and Freebsd only source address possible: (ping -S ) # interface = "" + ## How to ping. "native" doesn't have external dependencies, while "exec" depends on 'ping'. + # method = "exec" + ## Specify the ping executable binary, default is "ping" # binary = "ping" ## Arguments for ping command. When arguments is not empty, system binary will be used and ## other options (ping_interval, timeout, etc) will be ignored # arguments = ["-c", "3"] + + ## Use only ipv6 addresses when resolving hostnames. + # ipv6 = false ``` #### File Limit diff --git a/plugins/inputs/ping/ping.go b/plugins/inputs/ping/ping.go index bdbb6a373d0bf..12b2b09e2314c 100644 --- a/plugins/inputs/ping/ping.go +++ b/plugins/inputs/ping/ping.go @@ -2,14 +2,13 @@ package ping import ( "context" - "fmt" - "log" + "math" "net" "os/exec" "sync" "time" - ping "github.com/sparrc/go-ping" + "github.com/glinton/ping" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -42,6 +41,9 @@ type Ping struct { // URLs to ping Urls []string + // Method defines how to ping (native or exec) + Method string + // Ping executable binary Binary string @@ -49,6 +51,9 @@ type Ping struct { // other options (ping_interval, timeout, etc) will be ignored Arguments []string + // Whether to resolve addresses using ipv6 or not. + IPv6 bool + // host ping function pingHost HostPinger @@ -58,7 +63,7 @@ type Ping struct { ctx context.Context } -func (_ *Ping) Description() string { +func (*Ping) Description() string { return "Ping given url(s) and return statistics" } @@ -72,7 +77,7 @@ const sampleConfig = ` ## Interval, in s, at which to ping. 0 == default (ping -i ) # ping_interval = 1.0 - ## Per-ping timeout, in s. 0 == no timeout (ping -W ) Deprecated in 1.12 + ## Per-ping timeout, in s. 0 == no timeout (ping -W ) # timeout = 1.0 ## Total-ping deadline, in s. 0 == no deadline (ping -w ) @@ -81,15 +86,21 @@ const sampleConfig = ` ## Interface or source address to send ping from (ping -I[-S] ) # interface = "" + ## How to ping. "native" doesn't have external dependencies, while "exec" depends on 'ping'. + # method = "exec" + ## Specify the ping executable binary, default is "ping" - # binary = "ping" + # binary = "ping" ## Arguments for ping command. When arguments is not empty, system binary will be used and - ## other options (ping_interval, timeout, etc) will be ignored + ## other options (ping_interval, timeout, etc) will be ignored. # arguments = ["-c", "3"] + + ## Use only ipv6 addresses when resolving hostnames. + # ipv6 = false ` -func (_ *Ping) SampleConfig() string { +func (*Ping) SampleConfig() string { return sampleConfig } @@ -98,48 +109,20 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error { p.listenAddr = getAddr(p.Interface) } - if p.Timeout > 0 { - log.Println("W! [inputs.ping] 'timeout' deprecated in telegraf 1.12") - } - - for _, url := range p.Urls { - _, err := net.LookupHost(url) + for _, ip := range p.Urls { + _, err := net.LookupHost(ip) if err != nil { - acc.AddFields("ping", map[string]interface{}{"result_code": 1}, map[string]string{"url": url}) + acc.AddFields("ping", map[string]interface{}{"result_code": 1}, map[string]string{"ip": ip}) acc.AddError(err) return nil } - if len(p.Arguments) > 0 { + if len(p.Arguments) > 0 || p.Method == "exec" { p.wg.Add(1) - - go p.pingToURL(url, acc) + go p.pingToURL(ip, acc) } else { - pinger, err := ping.NewPinger(url) - if err != nil { - acc.AddError(fmt.Errorf("%v: %s", err, url)) - acc.AddFields("ping", map[string]interface{}{"result_code": 2}, map[string]string{"url": url}) - continue - } - - pinger.Count = p.Count - - if p.PingInterval < 0.2 { - p.PingInterval = 1 - } - pinger.Interval = time.Nanosecond * time.Duration(p.PingInterval*1000000000) - - if p.Deadline > 0 { - pinger.Timeout = time.Duration(p.Deadline) * time.Second - } - - pinger.Size = 56 - - pinger.SetPrivileged(true) - pinger.Source = p.listenAddr - p.wg.Add(1) - go p.pingToURLNative(url, pinger, acc) + go p.pingToURLNative(ip, acc) } } @@ -194,89 +177,144 @@ func hostPinger(binary string, timeout float64, args ...string) (string, error) return string(out), err } -type pingResults struct { - transmitted int - received int - pktLoss float64 - ttl int - min float64 - avg float64 - max float64 - stddev float64 -} - -// func getGID() uint64 { -// b := make([]byte, 64) -// b = b[:runtime.Stack(b, false)] -// b = bytes.TrimPrefix(b, []byte("goroutine ")) -// b = b[:bytes.IndexByte(b, ' ')] -// n, _ := strconv.ParseUint(string(b), 10, 64) -// return n -// } - -func (p *Ping) pingToURLNative(u string, pinger *ping.Pinger, acc telegraf.Accumulator) { +func (p *Ping) pingToURLNative(destination string, acc telegraf.Accumulator) { defer p.wg.Done() - tags := map[string]string{"url": u} - fields := map[string]interface{}{"result_code": 0} + ctx := context.Background() - results := p.pingHostNative(pinger) + network := "ip4" + if p.IPv6 { + network = "ip6" + } + + host, err := net.ResolveIPAddr(network, destination) + if err != nil { + acc.AddFields("ping", map[string]interface{}{"result_code": 1}, map[string]string{"source": destination}) + acc.AddError(err) + return + } - fields["packets_transmitted"] = results.transmitted - fields["packets_received"] = results.received - fields["percent_packet_loss"] = results.pktLoss - if results.ttl > 0 { - fields["ttl"] = results.ttl + if p.PingInterval < 0.2 { + p.PingInterval = 0.2 } - if results.min >= 0 { - fields["minimum_response_ms"] = results.min + + tick := time.NewTicker(time.Duration(p.PingInterval * float64(time.Second))) + defer tick.Stop() + + wg := &sync.WaitGroup{} + if p.Deadline > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Duration(p.Deadline)*time.Second) + defer cancel() } - if results.avg >= 0 { - fields["average_response_ms"] = results.avg + + chanLength := 100 + if p.Count > 0 { + chanLength = p.Count } - if results.max >= 0 { - fields["maximum_response_ms"] = results.max + + resps := make(chan *ping.Response, chanLength) + packetsSent := 0 + c := ping.Client{} + + for p.Count <= 0 || packetsSent < p.Count { + select { + case <-ctx.Done(): + goto finish + case <-tick.C: + ctx := context.Background() + if p.Timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Duration(p.Timeout)*time.Second) + defer cancel() + } + + packetsSent++ + wg.Add(1) + go func(seq int) { + defer wg.Done() + resp, err := c.Do(ctx, ping.Request{ + Dst: net.ParseIP(host.String()), + Src: net.ParseIP(p.listenAddr), + Seq: seq, + }) + if err != nil { + // likely a timeout error, ignore + return + } + + resps <- resp + }(packetsSent) + } } - if results.stddev >= 0 { - fields["standard_deviation_ms"] = results.stddev + +finish: + wg.Wait() + close(resps) + + rsps := []*ping.Response{} + for res := range resps { + rsps = append(rsps, res) } + tags, fields := onFin(packetsSent, rsps, destination) acc.AddFields("ping", fields, tags) } -func (p *Ping) pingHostNative(pinger *ping.Pinger) *pingResults { - results := &pingResults{} +func onFin(packetsSent int, resps []*ping.Response, destination string) (map[string]string, map[string]interface{}) { + packetsRcvd := len(resps) + loss := float64(packetsSent-packetsRcvd) / float64(packetsSent) * 100 - pinger.OnRecv = func(pkt *ping.Packet) { - results.ttl = pkt.Ttl + tags := map[string]string{"source": destination} + fields := map[string]interface{}{ + "result_code": 0, + "packets_transmitted": packetsSent, + "packets_received": packetsRcvd, + "percent_packet_loss": loss, } - pinger.OnFinish = func(stats *ping.Statistics) { - results.received = stats.PacketsRecv - results.transmitted = stats.PacketsSent - results.pktLoss = stats.PacketLoss - results.min = float64(stats.MinRtt.Nanoseconds()) / float64(time.Millisecond) - results.avg = float64(stats.AvgRtt.Nanoseconds()) / float64(time.Millisecond) - results.max = float64(stats.MaxRtt.Nanoseconds()) / float64(time.Millisecond) - results.stddev = float64(stats.StdDevRtt.Nanoseconds()) / float64(time.Millisecond) + if packetsRcvd == 0 || packetsSent == 0 { + return tags, fields } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - go func() { - pinger.Run() - cancel() - cancel() - }() - select { - case <-ctx.Done(): - break - case <-time.After(time.Duration(p.Deadline) * time.Second): - fmt.Println("PING TIMED OUT") - pinger.Stop() + ttl := resps[0].TTL + + var min, max, avg, total time.Duration + min = resps[0].RTT + max = resps[0].RTT + + for _, res := range resps { + if res.RTT < min { + min = res.RTT + } + if res.RTT > max { + max = res.RTT + } + total += res.RTT + } + + avg = total / time.Duration(packetsRcvd) + var sumsquares time.Duration + for _, res := range resps { + sumsquares += (res.RTT - avg) * (res.RTT - avg) + } + stdDev := time.Duration(math.Sqrt(float64(sumsquares / time.Duration(packetsRcvd)))) + if ttl > 0 { + fields["ttl"] = ttl + } + if min > 0 { + fields["minimum_response_ms"] = float64(min.Nanoseconds()) / float64(time.Millisecond) + } + if avg > 0 { + fields["average_response_ms"] = float64(avg.Nanoseconds()) / float64(time.Millisecond) + } + if max > 0 { + fields["maximum_response_ms"] = float64(max.Nanoseconds()) / float64(time.Millisecond) + } + if stdDev > 0 { + fields["standard_deviation_ms"] = float64(stdDev.Nanoseconds()) / float64(time.Millisecond) } - return results + return tags, fields } func init() { @@ -287,6 +325,7 @@ func init() { Count: 1, Timeout: 1.0, Deadline: 10, + Method: "exec", Binary: "ping", Arguments: []string{}, ctx: context.Background(), From 42198df8be74e106a020212b5d24f321ad15648a Mon Sep 17 00:00:00 2001 From: greg linton Date: Wed, 26 Jun 2019 18:36:17 -0600 Subject: [PATCH 08/15] Update dependency --- Gopkg.lock | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 842856eb2f71c..05d55c494c424 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -423,6 +423,14 @@ pruneopts = "" revision = "25d852aebe32c875e9c044af3eef9c7dc6bc777f" +[[projects]] + branch = "master" + digest = "1:3a5a86282cf85068c4b25a4db7b79c0d579b2af1367965a89dddc47f598d266f" + name = "github.com/glinton/ping" + packages = ["."] + pruneopts = "" + revision = "48030f186603119bb4c832f074355188df4567e7" + [[projects]] digest = "1:858b7fe7b0f4bc7ef9953926828f2816ea52d01a88d72d1c45bc8c108f23c356" name = "github.com/go-ini/ini" @@ -1072,14 +1080,6 @@ pruneopts = "" revision = "96b86229e9b3ffb4b954144cdc7f98fe3ee1003f" -[[projects]] - branch = "master" - digest = "1:8f832efb46d0a00e227dfe26281c07471d35555b96dc3a83b990cfa6d20faced" - name = "github.com/sparrc/go-ping" - packages = ["."] - pruneopts = "" - revision = "e33cfb8ae7ed4ebbfd07bad9d9994f466c6f4b2e" - [[projects]] branch = "master" digest = "1:4e8f1cae8e6d83af9000d82566efb8823907dae77ba4f1d76ff28fdd197c3c90" @@ -1612,6 +1612,7 @@ "github.com/ericchiang/k8s/apis/resource", "github.com/ericchiang/k8s/util/intstr", "github.com/ghodss/yaml", + "github.com/glinton/ping", "github.com/go-logfmt/logfmt", "github.com/go-redis/redis", "github.com/go-sql-driver/mysql", @@ -1665,7 +1666,6 @@ "github.com/shirou/gopsutil/net", "github.com/shirou/gopsutil/process", "github.com/soniah/gosnmp", - "github.com/sparrc/go-ping", "github.com/streadway/amqp", "github.com/stretchr/testify/assert", "github.com/stretchr/testify/mock", @@ -1688,7 +1688,6 @@ "github.com/wvanbergen/kafka/consumergroup", "golang.org/x/net/context", "golang.org/x/net/html/charset", - "golang.org/x/net/icmp", "golang.org/x/oauth2", "golang.org/x/oauth2/clientcredentials", "golang.org/x/oauth2/google", From 2317bbbe3d6823219e90f7efb6c858f3cfb67a71 Mon Sep 17 00:00:00 2001 From: greg linton Date: Thu, 27 Jun 2019 10:16:24 -0600 Subject: [PATCH 09/15] Update method logic and add test --- plugins/inputs/ping/ping.go | 49 +++++++++++++++++++------------- plugins/inputs/ping/ping_test.go | 33 +++++++++++++++------ 2 files changed, 54 insertions(+), 28 deletions(-) diff --git a/plugins/inputs/ping/ping.go b/plugins/inputs/ping/ping.go index 12b2b09e2314c..f9d5d2383cc66 100644 --- a/plugins/inputs/ping/ping.go +++ b/plugins/inputs/ping/ping.go @@ -117,12 +117,12 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error { return nil } - if len(p.Arguments) > 0 || p.Method == "exec" { + if len(p.Arguments) == 0 && p.Method == "native" { p.wg.Add(1) - go p.pingToURL(ip, acc) + go p.pingToURLNative(ip, acc) } else { p.wg.Add(1) - go p.pingToURLNative(ip, acc) + go p.pingToURL(ip, acc) } } @@ -188,19 +188,24 @@ func (p *Ping) pingToURLNative(destination string, acc telegraf.Accumulator) { host, err := net.ResolveIPAddr(network, destination) if err != nil { - acc.AddFields("ping", map[string]interface{}{"result_code": 1}, map[string]string{"source": destination}) + acc.AddFields("ping", map[string]interface{}{"result_code": 1}, map[string]string{"url": destination}) acc.AddError(err) return } - if p.PingInterval < 0.2 { - p.PingInterval = 0.2 + interval := p.PingInterval + if interval < 0.2 { + interval = 0.2 } - tick := time.NewTicker(time.Duration(p.PingInterval * float64(time.Second))) + timeout := p.Timeout + if timeout == 0 { + timeout = 5 + } + + tick := time.NewTicker(time.Duration(interval * float64(time.Second))) defer tick.Stop() - wg := &sync.WaitGroup{} if p.Deadline > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, time.Duration(p.Deadline)*time.Second) @@ -213,7 +218,19 @@ func (p *Ping) pingToURLNative(destination string, acc telegraf.Accumulator) { } resps := make(chan *ping.Response, chanLength) + rsps := []*ping.Response{} + + r := &sync.WaitGroup{} + r.Add(1) + go func() { + for res := range resps { + rsps = append(rsps, res) + } + r.Done() + }() + packetsSent := 0 + wg := &sync.WaitGroup{} c := ping.Client{} for p.Count <= 0 || packetsSent < p.Count { @@ -221,12 +238,8 @@ func (p *Ping) pingToURLNative(destination string, acc telegraf.Accumulator) { case <-ctx.Done(): goto finish case <-tick.C: - ctx := context.Background() - if p.Timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Duration(p.Timeout)*time.Second) - defer cancel() - } + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout*float64(time.Second))) + defer cancel() packetsSent++ wg.Add(1) @@ -251,11 +264,7 @@ finish: wg.Wait() close(resps) - rsps := []*ping.Response{} - for res := range resps { - rsps = append(rsps, res) - } - + r.Wait() tags, fields := onFin(packetsSent, rsps, destination) acc.AddFields("ping", fields, tags) } @@ -264,7 +273,7 @@ func onFin(packetsSent int, resps []*ping.Response, destination string) (map[str packetsRcvd := len(resps) loss := float64(packetsSent-packetsRcvd) / float64(packetsSent) * 100 - tags := map[string]string{"source": destination} + tags := map[string]string{"url": destination} fields := map[string]interface{}{ "result_code": 0, "packets_transmitted": packetsSent, diff --git a/plugins/inputs/ping/ping_test.go b/plugins/inputs/ping/ping_test.go index 8870d415680af..63ca0e8246560 100644 --- a/plugins/inputs/ping/ping_test.go +++ b/plugins/inputs/ping/ping_test.go @@ -187,10 +187,10 @@ func TestPingGather(t *testing.T) { acc.GatherError(p.Gather) tags := map[string]string{"url": "www.google.com"} fields := map[string]interface{}{ - "packets_transmitted": 5, - "packets_received": 5, - "percent_packet_loss": 0.0, - "ttl": 63, + "packets_transmitted": 5, + "packets_received": 5, + "percent_packet_loss": 0.0, + "ttl": 63, "minimum_response_ms": 35.225, "average_response_ms": 43.628, "maximum_response_ms": 51.806, @@ -229,10 +229,10 @@ func TestLossyPingGather(t *testing.T) { acc.GatherError(p.Gather) tags := map[string]string{"url": "www.google.com"} fields := map[string]interface{}{ - "packets_transmitted": 5, - "packets_received": 3, - "percent_packet_loss": 40.0, - "ttl": 63, + "packets_transmitted": 5, + "packets_received": 3, + "percent_packet_loss": 40.0, + "ttl": 63, "minimum_response_ms": 35.225, "average_response_ms": 44.033, "maximum_response_ms": 51.806, @@ -339,3 +339,20 @@ func TestPingBinary(t *testing.T) { } acc.GatherError(p.Gather) } + +// Test that Gather function works using native ping +func TestPingGatherNative(t *testing.T) { + if testing.Short() { + t.Skip("Skipping test due to permission requirements.") + } + + var acc testutil.Accumulator + p := Ping{ + Urls: []string{"www.google.com", "www.reddit.com"}, + Method: "native", + Count: 5, + } + + assert.NoError(t, acc.GatherError(p.Gather)) + assert.True(t, acc.HasPoint("ping", map[string]string{"url": "www.google.com"}, "packets_transmitted", 5)) +} From b734cab9bd5ca95839622f4884e1205fdf51f543 Mon Sep 17 00:00:00 2001 From: greg linton Date: Thu, 27 Jun 2019 10:24:20 -0600 Subject: [PATCH 10/15] fmt --- plugins/inputs/ping/ping_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/plugins/inputs/ping/ping_test.go b/plugins/inputs/ping/ping_test.go index 63ca0e8246560..e43a8c4da2f5e 100644 --- a/plugins/inputs/ping/ping_test.go +++ b/plugins/inputs/ping/ping_test.go @@ -187,10 +187,10 @@ func TestPingGather(t *testing.T) { acc.GatherError(p.Gather) tags := map[string]string{"url": "www.google.com"} fields := map[string]interface{}{ - "packets_transmitted": 5, - "packets_received": 5, - "percent_packet_loss": 0.0, - "ttl": 63, + "packets_transmitted": 5, + "packets_received": 5, + "percent_packet_loss": 0.0, + "ttl": 63, "minimum_response_ms": 35.225, "average_response_ms": 43.628, "maximum_response_ms": 51.806, @@ -229,10 +229,10 @@ func TestLossyPingGather(t *testing.T) { acc.GatherError(p.Gather) tags := map[string]string{"url": "www.google.com"} fields := map[string]interface{}{ - "packets_transmitted": 5, - "packets_received": 3, - "percent_packet_loss": 40.0, - "ttl": 63, + "packets_transmitted": 5, + "packets_received": 3, + "percent_packet_loss": 40.0, + "ttl": 63, "minimum_response_ms": 35.225, "average_response_ms": 44.033, "maximum_response_ms": 51.806, From 65639134bacc29468f580dc6dfa82b3b786c90c8 Mon Sep 17 00:00:00 2001 From: greg linton Date: Tue, 2 Jul 2019 10:15:49 -0600 Subject: [PATCH 11/15] Update readme --- plugins/inputs/ping/README.md | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/plugins/inputs/ping/README.md b/plugins/inputs/ping/README.md index 8e17f1374458e..073fc8fb69fde 100644 --- a/plugins/inputs/ping/README.md +++ b/plugins/inputs/ping/README.md @@ -9,6 +9,10 @@ use the iputils-ping implementation: apt-get install iputils-ping ``` +When using `method = "native"` a ping is sent and the results are reported in pure go, eliminating the need to execute the system `ping` command. Not using the system binary allows the use of this plugin on non-english systems. + +There is currently no support for TTL on windows with `"native"`; track progress at https://github.com/golang/go/issues/7175 and https://github.com/golang/go/issues/7174 + ### Configuration: ```toml @@ -68,6 +72,21 @@ Set the file number limit: LimitNOFILE=4096 ``` +#### Permission Caveat (non Windows) + +It is preferred that this plugin listen on privileged ICMP sockets. To do so, telegraf can either be run as the root user or the root user can add the capability to access raw sockets to telegraf by running the following commant: + +``` +setcap cap_net_raw=eip /home/test/telegraf +``` + +Another option (doesn't work as well or in all circumstances) is to listen on unprivileged raw sockets (non-Windows only). The system group of the user running telegraf must be allowed to create ICMP Echo sockets. [See man pages icmp(7) for `ping_group_range`](http://man7.org/linux/man-pages/man7/icmp.7.html). On Linux hosts, run the following to give a group the proper permissions: + +``` +sudo sysctl -w net.ipv4.ping_group_range="GROUPID GROUPID" +``` + + ### Metrics: - ping @@ -81,15 +100,15 @@ LimitNOFILE=4096 - average_response_ms (integer) - minimum_response_ms (integer) - maximum_response_ms (integer) - - standard_deviation_ms (integer, Not available on Windows) + - standard_deviation_ms (integer, Available on Windows only with native ping) - errors (float, Windows only) - - reply_received (integer, Windows only) - - percent_reply_loss (float, Windows only) + - reply_received (integer, Windows only*) + - percent_reply_loss (float, Windows only*) - result_code (int, success = 0, no such host = 1, ping error = 2) ##### reply_received vs packets_received -On Windows systems, "Destination net unreachable" reply will increment `packets_received` but not `reply_received`. +On Windows systems, "Destination net unreachable" reply will increment `packets_received` but not `reply_received`* ### Example Output: @@ -102,3 +121,5 @@ ping,url=example.org result_code=0i,average_response_ms=7i,maximum_response_ms=9 ``` ping,url=example.org average_response_ms=23.066,ttl=63,maximum_response_ms=24.64,minimum_response_ms=22.451,packets_received=5i,packets_transmitted=5i,percent_packet_loss=0,result_code=0i,standard_deviation_ms=0.809 1535747258000000000 ``` + +*not when `method = "native"` is used From 05b0e4a679035f993ed54f3c31da9a67be8f0d8b Mon Sep 17 00:00:00 2001 From: greg linton Date: Tue, 2 Jul 2019 12:35:30 -0600 Subject: [PATCH 12/15] Fix typos in readme --- plugins/inputs/ping/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/ping/README.md b/plugins/inputs/ping/README.md index 073fc8fb69fde..8f1e3cf6fb073 100644 --- a/plugins/inputs/ping/README.md +++ b/plugins/inputs/ping/README.md @@ -77,13 +77,13 @@ LimitNOFILE=4096 It is preferred that this plugin listen on privileged ICMP sockets. To do so, telegraf can either be run as the root user or the root user can add the capability to access raw sockets to telegraf by running the following commant: ``` -setcap cap_net_raw=eip /home/test/telegraf +setcap cap_net_raw=eip /path/to/telegraf ``` Another option (doesn't work as well or in all circumstances) is to listen on unprivileged raw sockets (non-Windows only). The system group of the user running telegraf must be allowed to create ICMP Echo sockets. [See man pages icmp(7) for `ping_group_range`](http://man7.org/linux/man-pages/man7/icmp.7.html). On Linux hosts, run the following to give a group the proper permissions: ``` -sudo sysctl -w net.ipv4.ping_group_range="GROUPID GROUPID" +sudo sysctl -w net.ipv4.ping_group_range="GROUP_ID_LOW GROUP_ID_HIGH" ``` From 4a1dddff1f8dc7a2f1f1f48936ccd63725c8dbcf Mon Sep 17 00:00:00 2001 From: greg linton Date: Wed, 3 Jul 2019 16:08:38 -0600 Subject: [PATCH 13/15] Address feedback --- Gopkg.lock | 6 +-- plugins/inputs/ping/ping.go | 59 +++++++++++++++----------- plugins/inputs/ping/ping_notwindows.go | 1 - plugins/inputs/ping/ping_test.go | 26 ++++++------ plugins/inputs/ping/ping_windows.go | 2 - 5 files changed, 51 insertions(+), 43 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 05d55c494c424..f5bc5a0cd4999 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -424,12 +424,12 @@ revision = "25d852aebe32c875e9c044af3eef9c7dc6bc777f" [[projects]] - branch = "master" - digest = "1:3a5a86282cf85068c4b25a4db7b79c0d579b2af1367965a89dddc47f598d266f" + digest = "1:c6f371f2b02c751a83be83139a12a5467e55393feda16d4f8dfa95adfc4efede" name = "github.com/glinton/ping" packages = ["."] pruneopts = "" - revision = "48030f186603119bb4c832f074355188df4567e7" + revision = "1983bc2fd5de3ea00aa5457bbc8774300e889db9" + version = "v0.1.1" [[projects]] digest = "1:858b7fe7b0f4bc7ef9953926828f2816ea52d01a88d72d1c45bc8c108f23c356" diff --git a/plugins/inputs/ping/ping.go b/plugins/inputs/ping/ping.go index f9d5d2383cc66..af07dda995e03 100644 --- a/plugins/inputs/ping/ping.go +++ b/plugins/inputs/ping/ping.go @@ -2,6 +2,7 @@ package ping import ( "context" + "errors" "math" "net" "os/exec" @@ -59,8 +60,6 @@ type Ping struct { // listenAddr is the address associated with the interface defined. listenAddr string - - ctx context.Context } func (*Ping) Description() string { @@ -117,12 +116,18 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error { return nil } - if len(p.Arguments) == 0 && p.Method == "native" { + if p.Method == "native" { p.wg.Add(1) - go p.pingToURLNative(ip, acc) + go func(ip string) { + defer p.wg.Done() + p.pingToURLNative(ip, acc) + }(ip) } else { p.wg.Add(1) - go p.pingToURL(ip, acc) + go func(ip string) { + defer p.wg.Done() + p.pingToURL(ip, acc) + }(ip) } } @@ -178,7 +183,6 @@ func hostPinger(binary string, timeout float64, args ...string) (string, error) } func (p *Ping) pingToURLNative(destination string, acc telegraf.Accumulator) { - defer p.wg.Done() ctx := context.Background() network := "ip4" @@ -212,12 +216,7 @@ func (p *Ping) pingToURLNative(destination string, acc telegraf.Accumulator) { defer cancel() } - chanLength := 100 - if p.Count > 0 { - chanLength = p.Count - } - - resps := make(chan *ping.Response, chanLength) + resps := make(chan *ping.Response) rsps := []*ping.Response{} r := &sync.WaitGroup{} @@ -229,34 +228,34 @@ func (p *Ping) pingToURLNative(destination string, acc telegraf.Accumulator) { r.Done() }() - packetsSent := 0 wg := &sync.WaitGroup{} c := ping.Client{} - for p.Count <= 0 || packetsSent < p.Count { + var i int + for i = 0; i < p.Count; i++ { select { case <-ctx.Done(): goto finish case <-tick.C: - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout*float64(time.Second))) + ctx, cancel := context.WithTimeout(ctx, time.Duration(timeout*float64(time.Second))) defer cancel() - packetsSent++ wg.Add(1) go func(seq int) { defer wg.Done() - resp, err := c.Do(ctx, ping.Request{ + resp, err := c.Do(ctx, &ping.Request{ Dst: net.ParseIP(host.String()), Src: net.ParseIP(p.listenAddr), Seq: seq, }) if err != nil { - // likely a timeout error, ignore + acc.AddFields("ping", map[string]interface{}{"result_code": 2}, map[string]string{"url": destination}) + acc.AddError(err) return } resps <- resp - }(packetsSent) + }(i + 1) } } @@ -265,26 +264,30 @@ finish: close(resps) r.Wait() - tags, fields := onFin(packetsSent, rsps, destination) + tags, fields := onFin(i, rsps, destination) acc.AddFields("ping", fields, tags) } func onFin(packetsSent int, resps []*ping.Response, destination string) (map[string]string, map[string]interface{}) { packetsRcvd := len(resps) - loss := float64(packetsSent-packetsRcvd) / float64(packetsSent) * 100 tags := map[string]string{"url": destination} fields := map[string]interface{}{ "result_code": 0, "packets_transmitted": packetsSent, "packets_received": packetsRcvd, - "percent_packet_loss": loss, } - if packetsRcvd == 0 || packetsSent == 0 { + if packetsSent == 0 { + return tags, fields + } + + if packetsRcvd == 0 { + fields["percent_packet_loss"] = float64(100) return tags, fields } + fields["percent_packet_loss"] = float64(packetsSent-packetsRcvd) / float64(packetsSent) * 100 ttl := resps[0].TTL var min, max, avg, total time.Duration @@ -326,6 +329,15 @@ func onFin(packetsSent int, resps []*ping.Response, destination string) (map[str return tags, fields } +// Init ensures the plugin is configured correctly. +func (p *Ping) Init() error { + if p.Count < 1 { + return errors.New("bad number of packets to transmit") + } + + return nil +} + func init() { inputs.Add("ping", func() telegraf.Input { return &Ping{ @@ -337,7 +349,6 @@ func init() { Method: "exec", Binary: "ping", Arguments: []string{}, - ctx: context.Background(), } }) } diff --git a/plugins/inputs/ping/ping_notwindows.go b/plugins/inputs/ping/ping_notwindows.go index b13ef791ec3fa..b39ffdd8fdf8b 100644 --- a/plugins/inputs/ping/ping_notwindows.go +++ b/plugins/inputs/ping/ping_notwindows.go @@ -16,7 +16,6 @@ import ( ) func (p *Ping) pingToURL(u string, acc telegraf.Accumulator) { - defer p.wg.Done() tags := map[string]string{"url": u} fields := map[string]interface{}{"result_code": 0} diff --git a/plugins/inputs/ping/ping_test.go b/plugins/inputs/ping/ping_test.go index e43a8c4da2f5e..d40816e3be758 100644 --- a/plugins/inputs/ping/ping_test.go +++ b/plugins/inputs/ping/ping_test.go @@ -180,17 +180,17 @@ func mockHostPinger(binary string, timeout float64, args ...string) (string, err func TestPingGather(t *testing.T) { var acc testutil.Accumulator p := Ping{ - Urls: []string{"www.google.com", "www.reddit.com"}, + Urls: []string{"localhost", "influxdata.com"}, pingHost: mockHostPinger, } acc.GatherError(p.Gather) - tags := map[string]string{"url": "www.google.com"} + tags := map[string]string{"url": "localhost"} fields := map[string]interface{}{ - "packets_transmitted": 5, - "packets_received": 5, - "percent_packet_loss": 0.0, - "ttl": 63, + "packets_transmitted": 5, + "packets_received": 5, + "percent_packet_loss": 0.0, + "ttl": 63, "minimum_response_ms": 35.225, "average_response_ms": 43.628, "maximum_response_ms": 51.806, @@ -199,7 +199,7 @@ func TestPingGather(t *testing.T) { } acc.AssertContainsTaggedFields(t, "ping", fields, tags) - tags = map[string]string{"url": "www.reddit.com"} + tags = map[string]string{"url": "influxdata.com"} acc.AssertContainsTaggedFields(t, "ping", fields, tags) } @@ -229,10 +229,10 @@ func TestLossyPingGather(t *testing.T) { acc.GatherError(p.Gather) tags := map[string]string{"url": "www.google.com"} fields := map[string]interface{}{ - "packets_transmitted": 5, - "packets_received": 3, - "percent_packet_loss": 40.0, - "ttl": 63, + "packets_transmitted": 5, + "packets_received": 3, + "percent_packet_loss": 40.0, + "ttl": 63, "minimum_response_ms": 35.225, "average_response_ms": 44.033, "maximum_response_ms": 51.806, @@ -348,11 +348,11 @@ func TestPingGatherNative(t *testing.T) { var acc testutil.Accumulator p := Ping{ - Urls: []string{"www.google.com", "www.reddit.com"}, + Urls: []string{"localhost", "127.0.0.2"}, Method: "native", Count: 5, } assert.NoError(t, acc.GatherError(p.Gather)) - assert.True(t, acc.HasPoint("ping", map[string]string{"url": "www.google.com"}, "packets_transmitted", 5)) + assert.True(t, acc.HasPoint("ping", map[string]string{"url": "localhost"}, "packets_transmitted", 5)) } diff --git a/plugins/inputs/ping/ping_windows.go b/plugins/inputs/ping/ping_windows.go index e9a5af4a7cbbc..adfd60480e6e1 100644 --- a/plugins/inputs/ping/ping_windows.go +++ b/plugins/inputs/ping/ping_windows.go @@ -17,8 +17,6 @@ func (p *Ping) pingToURL(u string, acc telegraf.Accumulator) { p.Count = 1 } - defer p.wg.Done() - tags := map[string]string{"url": u} fields := map[string]interface{}{"result_code": 0} From ecdf5f27df6dc59108d0520fcec17ada3dca4828 Mon Sep 17 00:00:00 2001 From: greg linton Date: Wed, 3 Jul 2019 16:25:16 -0600 Subject: [PATCH 14/15] Set ttl only on supported platforms --- plugins/inputs/ping/ping.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/plugins/inputs/ping/ping.go b/plugins/inputs/ping/ping.go index af07dda995e03..469859a345937 100644 --- a/plugins/inputs/ping/ping.go +++ b/plugins/inputs/ping/ping.go @@ -6,6 +6,7 @@ import ( "math" "net" "os/exec" + "runtime" "sync" "time" @@ -310,21 +311,17 @@ func onFin(packetsSent int, resps []*ping.Response, destination string) (map[str sumsquares += (res.RTT - avg) * (res.RTT - avg) } stdDev := time.Duration(math.Sqrt(float64(sumsquares / time.Duration(packetsRcvd)))) - if ttl > 0 { + + // Set TTL only on supported platform. See golang.org/x/net/ipv4/payload_cmsg.go + switch runtime.GOOS { + case "aix", "darwin", "dragonfly", "freebsd", "linux", "netbsd", "openbsd", "solaris": fields["ttl"] = ttl } - if min > 0 { - fields["minimum_response_ms"] = float64(min.Nanoseconds()) / float64(time.Millisecond) - } - if avg > 0 { - fields["average_response_ms"] = float64(avg.Nanoseconds()) / float64(time.Millisecond) - } - if max > 0 { - fields["maximum_response_ms"] = float64(max.Nanoseconds()) / float64(time.Millisecond) - } - if stdDev > 0 { - fields["standard_deviation_ms"] = float64(stdDev.Nanoseconds()) / float64(time.Millisecond) - } + + fields["minimum_response_ms"] = float64(min.Nanoseconds()) / float64(time.Millisecond) + fields["average_response_ms"] = float64(avg.Nanoseconds()) / float64(time.Millisecond) + fields["maximum_response_ms"] = float64(max.Nanoseconds()) / float64(time.Millisecond) + fields["standard_deviation_ms"] = float64(stdDev.Nanoseconds()) / float64(time.Millisecond) return tags, fields } From a5808b45d17bafcf03459493f96e911cacc94fa2 Mon Sep 17 00:00:00 2001 From: greg linton Date: Wed, 3 Jul 2019 16:36:00 -0600 Subject: [PATCH 15/15] fmt tests --- plugins/inputs/ping/ping_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/plugins/inputs/ping/ping_test.go b/plugins/inputs/ping/ping_test.go index d40816e3be758..56303b1b23dbd 100644 --- a/plugins/inputs/ping/ping_test.go +++ b/plugins/inputs/ping/ping_test.go @@ -187,10 +187,10 @@ func TestPingGather(t *testing.T) { acc.GatherError(p.Gather) tags := map[string]string{"url": "localhost"} fields := map[string]interface{}{ - "packets_transmitted": 5, - "packets_received": 5, - "percent_packet_loss": 0.0, - "ttl": 63, + "packets_transmitted": 5, + "packets_received": 5, + "percent_packet_loss": 0.0, + "ttl": 63, "minimum_response_ms": 35.225, "average_response_ms": 43.628, "maximum_response_ms": 51.806, @@ -229,10 +229,10 @@ func TestLossyPingGather(t *testing.T) { acc.GatherError(p.Gather) tags := map[string]string{"url": "www.google.com"} fields := map[string]interface{}{ - "packets_transmitted": 5, - "packets_received": 3, - "percent_packet_loss": 40.0, - "ttl": 63, + "packets_transmitted": 5, + "packets_received": 3, + "percent_packet_loss": 40.0, + "ttl": 63, "minimum_response_ms": 35.225, "average_response_ms": 44.033, "maximum_response_ms": 51.806,