From d900443e67ab2b1e56ab31a5def097603963150d Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Mon, 4 May 2020 16:15:32 -0500 Subject: [PATCH] Fix icmp duration (#17920) (#18077) At some point in the past we apparently stopped nesting [ICMP fields](https://www.elastic.co/guide/en/beats/heartbeat/master/exported-fields-icmp.html). This patch fixes that issue. (cherry picked from commit 754eac174cdb2a335ce578c39130b163b40b49d1) --- heartbeat/monitors/active/icmp/icmp.go | 79 ++-- heartbeat/monitors/active/icmp/icmp_test.go | 90 +++++ heartbeat/monitors/active/icmp/loop.go | 370 +----------------- heartbeat/monitors/active/icmp/stdloop.go | 405 ++++++++++++++++++++ 4 files changed, 554 insertions(+), 390 deletions(-) create mode 100644 heartbeat/monitors/active/icmp/icmp_test.go create mode 100644 heartbeat/monitors/active/icmp/stdloop.go diff --git a/heartbeat/monitors/active/icmp/icmp.go b/heartbeat/monitors/active/icmp/icmp.go index 1cb19c907988..45fdf8a54b32 100644 --- a/heartbeat/monitors/active/icmp/icmp.go +++ b/heartbeat/monitors/active/icmp/icmp.go @@ -32,46 +32,71 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" ) +var debugf = logp.MakeDebug("icmp") + func init() { monitors.RegisterActive("icmp", create) } -var debugf = logp.MakeDebug("icmp") - func create( name string, - cfg *common.Config, + commonConfig *common.Config, ) (jobs []jobs.Job, endpoints int, err error) { + loop, err := getStdLoop() + if err != nil { + logp.Warn("Failed to initialize ICMP loop %v", err) + return nil, 0, err + } + config := DefaultConfig - if err := cfg.Unpack(&config); err != nil { + if err := commonConfig.Unpack(&config); err != nil { return nil, 0, err } - ipVersion := config.Mode.Network() - if len(config.Hosts) > 0 && ipVersion == "" { - err := fmt.Errorf("pinging hosts requires ipv4 or ipv6 mode enabled") + jf, err := newJobFactory(config, monitors.NewStdResolver(), loop) + if err != nil { return nil, 0, err } + return jf.makeJobs() - var loopErr error - loopInit.Do(func() { - debugf("initializing ICMP loop") - loop, loopErr = newICMPLoop() - }) - if loopErr != nil { - logp.Warn("Failed to initialize ICMP loop %v", loopErr) - return nil, 0, loopErr +} + +type jobFactory struct { + config Config + resolver monitors.Resolver + loop ICMPLoop + ipVersion string +} + +func newJobFactory(config Config, resolver monitors.Resolver, loop ICMPLoop) (*jobFactory, error) { + jf := &jobFactory{config: config, resolver: resolver, loop: loop} + err := jf.checkConfig() + if err != nil { + return nil, err } - debugf("ICMP loop successfully initialized") - if err := loop.checkNetworkMode(ipVersion); err != nil { + return jf, nil +} + +func (jf *jobFactory) checkConfig() error { + jf.ipVersion = jf.config.Mode.Network() + if len(jf.config.Hosts) > 0 && jf.ipVersion == "" { + err := fmt.Errorf("pinging hosts requires ipv4 or ipv6 mode enabled") + return err + } + + return nil +} + +func (jf *jobFactory) makeJobs() (j []jobs.Job, endpoints int, err error) { + if err := jf.loop.checkNetworkMode(jf.ipVersion); err != nil { return nil, 0, err } - pingFactory := monitors.MakePingIPFactory(createPingIPFactory(&config)) + pingFactory := jf.pingIPFactory(&jf.config) - for _, host := range config.Hosts { - job, err := monitors.MakeByHostJob(host, config.Mode, monitors.NewStdResolver(), pingFactory) + for _, host := range jf.config.Hosts { + job, err := monitors.MakeByHostJob(host, jf.config.Mode, monitors.NewStdResolver(), pingFactory) if err != nil { return nil, 0, err @@ -82,15 +107,15 @@ func create( return nil, 0, err } - jobs = append(jobs, wrappers.WithURLField(u, job)) + j = append(j, wrappers.WithURLField(u, job)) } - return jobs, len(config.Hosts), nil + return j, len(jf.config.Hosts), nil } -func createPingIPFactory(config *Config) func(*beat.Event, *net.IPAddr) error { - return func(event *beat.Event, ip *net.IPAddr) error { - rtt, n, err := loop.ping(ip, config.Timeout, config.Wait) +func (jf *jobFactory) pingIPFactory(config *Config) func(*net.IPAddr) jobs.Job { + return monitors.MakePingIPFactory(func(event *beat.Event, ip *net.IPAddr) error { + rtt, n, err := jf.loop.ping(ip, config.Timeout, config.Wait) if err != nil { return err } @@ -98,9 +123,9 @@ func createPingIPFactory(config *Config) func(*beat.Event, *net.IPAddr) error { icmpFields := common.MapStr{"requests": n} if err == nil { icmpFields["rtt"] = look.RTT(rtt) - eventext.MergeEventFields(event, icmpFields) + eventext.MergeEventFields(event, common.MapStr{"icmp": icmpFields}) } return nil - } + }) } diff --git a/heartbeat/monitors/active/icmp/icmp_test.go b/heartbeat/monitors/active/icmp/icmp_test.go new file mode 100644 index 000000000000..11e7dae5380c --- /dev/null +++ b/heartbeat/monitors/active/icmp/icmp_test.go @@ -0,0 +1,90 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package icmp + +import ( + "net" + "net/url" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/heartbeat/hbtest" + "github.com/elastic/beats/v7/heartbeat/look" + "github.com/elastic/beats/v7/heartbeat/monitors" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" + "github.com/elastic/beats/v7/heartbeat/scheduler/schedule" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/go-lookslike" + "github.com/elastic/go-lookslike/testslike" +) + +func TestICMPFields(t *testing.T) { + host := "localhost" + hostURL := &url.URL{Scheme: "icmp", Host: host} + ip := "127.0.0.1" + cfg := Config{ + Hosts: []string{host}, + Mode: monitors.IPSettings{IPv4: true, IPv6: false, Mode: monitors.PingAny}, + } + testMockLoop, e := execTestICMPCheck(t, cfg) + + validator := lookslike.Strict( + lookslike.Compose( + hbtest.BaseChecks(ip, "up", "icmp"), + hbtest.SummaryChecks(1, 0), + hbtest.URLChecks(t, hostURL), + hbtest.ResolveChecks(ip), + lookslike.MustCompile(map[string]interface{}{ + "icmp.requests": 1, + "icmp.rtt": look.RTT(testMockLoop.pingRtt), + }), + ), + ) + testslike.Test(t, validator, e.Fields) +} + +func execTestICMPCheck(t *testing.T, cfg Config) (mockLoop, *beat.Event) { + tl := mockLoop{pingRtt: time.Microsecond * 1000, pingRequests: 1} + jf, err := newJobFactory(cfg, monitors.NewStdResolver(), tl) + require.NoError(t, err) + j, endpoints, err := jf.makeJobs() + require.Len(t, j, 1) + require.Equal(t, 1, endpoints) + e := &beat.Event{} + sched, _ := schedule.Parse("@every 1s") + wrapped := wrappers.WrapCommon(j, "test", "", "icmp", sched, time.Duration(0)) + wrapped[0](e) + return tl, e +} + +type mockLoop struct { + pingRtt time.Duration + pingRequests int + pingErr error + checkNetworkModeErr error +} + +func (t mockLoop) checkNetworkMode(mode string) error { + return t.checkNetworkModeErr +} + +func (t mockLoop) ping(addr *net.IPAddr, timeout time.Duration, interval time.Duration) (time.Duration, int, error) { + return t.pingRtt, t.pingRequests, t.pingErr +} diff --git a/heartbeat/monitors/active/icmp/loop.go b/heartbeat/monitors/active/icmp/loop.go index 414686c1d314..de4d0ef4dfc2 100644 --- a/heartbeat/monitors/active/icmp/loop.go +++ b/heartbeat/monitors/active/icmp/loop.go @@ -18,371 +18,15 @@ package icmp import ( - "bytes" - "encoding/binary" - "errors" - "fmt" - "math/rand" "net" - "os" - "runtime" - "sync" "time" - - "golang.org/x/net/icmp" - "golang.org/x/net/ipv4" - "golang.org/x/net/ipv6" -) - -type icmpLoop struct { - conn4, conn6 *icmp.PacketConn - recv chan packet - - mutex sync.Mutex - requests map[requestID]*requestContext -} - -type timeoutError struct { -} - -const ( - // iana types - protocolICMP = 1 - protocolIPv6ICMP = 58 -) - -type packet struct { - ts time.Time - addr net.Addr - - Type icmp.Type // type, either ipv4.ICMPType or ipv6.ICMPType - Code int // code - Checksum int // checksum - Echo icmp.Echo -} - -type requestID struct { - addr string - proto int - id int - seq int -} - -type requestContext struct { - l *icmpLoop - id requestID - ts time.Time - result chan requestResult -} - -type requestResult struct { - packet packet - err error -} - -var ( - loopInit sync.Once - loop *icmpLoop ) -func noPingCapabilityError(message string) error { - return fmt.Errorf(fmt.Sprintf("Insufficient privileges to perform ICMP ping. %s", message)) -} - -func newICMPLoop() (*icmpLoop, error) { - // Log errors at info level, as the loop is setup globally when ICMP module is loaded - // first (not yet configured). - // With multiple configurations using the icmp loop, we have to postpose - // IPv4/IPv6 checking - conn4 := createListener("IPv4", "ip4:icmp") - conn6 := createListener("IPv6", "ip6:ipv6-icmp") - unprivilegedPossible := false - l := &icmpLoop{ - conn4: conn4, - conn6: conn6, - recv: make(chan packet, 16), - requests: map[requestID]*requestContext{}, - } - - if l.conn4 == nil && l.conn6 == nil { - switch runtime.GOOS { - case "linux", "darwin": - unprivilegedPossible = true - //This is non-privileged ICMP, not udp - l.conn4 = createListener("Unprivileged IPv4", "udp4") - l.conn6 = createListener("Unprivileged IPv6", "udp6") - } - } - - if l.conn4 != nil { - go l.runICMPRecv(l.conn4, protocolICMP) - } - if l.conn6 != nil { - go l.runICMPRecv(l.conn6, protocolIPv6ICMP) - } - - if l.conn4 == nil && l.conn6 == nil { - if unprivilegedPossible { - var buffer bytes.Buffer - path, _ := os.Executable() - buffer.WriteString("You can run without root by setting cap_net_raw:\n sudo setcap cap_net_raw+eip ") - buffer.WriteString(path + " \n") - buffer.WriteString("Your system allows the use of unprivileged ping by setting net.ipv4.ping_group_range \n sysctl -w net.ipv4.ping_group_range=' ' ") - return nil, noPingCapabilityError(buffer.String()) - } - return nil, noPingCapabilityError("You must provide the appropriate permissions to this executable") - } - - return l, nil -} - -func (l *icmpLoop) checkNetworkMode(mode string) error { - ip4, ip6 := false, false - switch mode { - case "ip4": - ip4 = true - case "ip6": - ip6 = true - case "ip": - ip4, ip6 = true, true - default: - return fmt.Errorf("'%v' is not supported", mode) - } - - if ip4 && l.conn4 == nil { - return errors.New("failed to initiate IPv4 support. Check log details for permission configuration") - } - if ip6 && l.conn6 == nil { - return errors.New("failed to initiate IPv6 support. Check log details for permission configuration") - } - - return nil -} - -func (l *icmpLoop) runICMPRecv(conn *icmp.PacketConn, proto int) { - for { - bytes := make([]byte, 512) - conn.SetReadDeadline(time.Now().Add(1 * time.Second)) - _, addr, err := conn.ReadFrom(bytes) - if err != nil { - if neterr, ok := err.(*net.OpError); ok { - if neterr.Timeout() { - continue - } else { - // TODO: report error and quit loop? - return - } - } - } - - ts := time.Now() - m, err := icmp.ParseMessage(proto, bytes) - if err != nil { - continue - } - - // process echo reply only - if m.Type != ipv4.ICMPTypeEchoReply && m.Type != ipv6.ICMPTypeEchoReply { - continue - } - echo, ok := m.Body.(*icmp.Echo) - if !ok { - continue - } - - id := requestID{ - addr: addr.String(), - proto: proto, - id: echo.ID, - seq: echo.Seq, - } - - l.mutex.Lock() - ctx := l.requests[id] - if ctx != nil { - delete(l.requests, id) - } - l.mutex.Unlock() - - // no return context available for echo reply -> handle next message - if ctx == nil { - continue - } - - ctx.result <- requestResult{ - packet: packet{ - ts: ts, - addr: addr, - - Type: m.Type, - Code: m.Code, - Checksum: m.Checksum, - Echo: *echo, - }, - } - } -} - -func (l *icmpLoop) ping( - addr *net.IPAddr, - timeout time.Duration, - interval time.Duration, -) (time.Duration, int, error) { - - var err error - toTimer := time.NewTimer(timeout) - defer toTimer.Stop() - - ticker := time.NewTicker(interval) - defer ticker.Stop() - - done := false - doneSignal := make(chan struct{}) - - success := false - var rtt time.Duration - - // results accepts first response received only - results := make(chan time.Duration, 1) - requests := 0 - - awaitResponse := func(ctx *requestContext) { - select { - case <-doneSignal: - ctx.Stop() - - case r := <-ctx.result: - // ctx is removed from request tables automatically a response is - // received. No need to stop it. - - // try to push RTT. The first result available will be reported - select { - case results <- r.packet.ts.Sub(ctx.ts): - default: - } - } - } - - for !done { - var ctx *requestContext - ctx, err = l.sendEchoRequest(addr) - if err != nil { - close(doneSignal) - break - } - go awaitResponse(ctx) - requests++ - - select { - case <-toTimer.C: - // no response for any active request received. Finish loop - // and remove all requests from request table. - done = true - close(doneSignal) - - case <-ticker.C: - // No response yet. Send another request with every tick - - case rtt = <-results: - success = true - - done = true - close(doneSignal) - } - } - - if err != nil { - return 0, 0, err - } - - if !success { - return 0, requests, timeoutError{} - } - - return rtt, requests, nil -} - -func (l *icmpLoop) sendEchoRequest(addr *net.IPAddr) (*requestContext, error) { - var conn *icmp.PacketConn - var proto int - var typ icmp.Type - - if l == nil { - panic("icmp loop not initialized") - } - - if isIPv4(addr.IP) { - conn = l.conn4 - proto = protocolICMP - typ = ipv4.ICMPTypeEcho - } else if isIPv6(addr.IP) { - conn = l.conn6 - proto = protocolIPv6ICMP - typ = ipv6.ICMPTypeEchoRequest - } else { - return nil, fmt.Errorf("%v is unknown ip address", addr) - } - - id := requestID{ - addr: addr.String(), - proto: proto, - id: rand.Intn(0xffff), - seq: rand.Intn(0xffff), - } - - ctx := &requestContext{ - l: l, - id: id, - result: make(chan requestResult, 1), - } - - l.mutex.Lock() - l.requests[id] = ctx - l.mutex.Unlock() - - payloadBuf := make([]byte, 0, 8) - payload := bytes.NewBuffer(payloadBuf) - ts := time.Now() - binary.Write(payload, binary.BigEndian, ts.UnixNano()) - - msg := &icmp.Message{ - Type: typ, - Body: &icmp.Echo{ - ID: id.id, - Seq: id.seq, - Data: payload.Bytes(), - }, - } - encoded, _ := msg.Marshal(nil) - - _, err := conn.WriteTo(encoded, addr) - if err != nil { - return nil, err - } - - ctx.ts = ts - return ctx, nil -} - -func createListener(name, network string) *icmp.PacketConn { - conn, err := icmp.ListenPacket(network, "") - - // XXX: need to check for conn == nil, as 'err != nil' seems always to be - // true, even if error value itself is `nil`. Checking for conn suppresses - // misleading log message. - if conn == nil && err != nil { - return nil - } - return conn -} - -// timeoutError implements net.Error interface -func (timeoutError) Error() string { return "ping timeout" } -func (timeoutError) Timeout() bool { return true } -func (timeoutError) Temporary() bool { return true } - -func (r *requestContext) Stop() { - r.l.mutex.Lock() - delete(r.l.requests, r.id) - r.l.mutex.Unlock() +type ICMPLoop interface { + checkNetworkMode(mode string) error + ping( + addr *net.IPAddr, + timeout time.Duration, + interval time.Duration, + ) (time.Duration, int, error) } diff --git a/heartbeat/monitors/active/icmp/stdloop.go b/heartbeat/monitors/active/icmp/stdloop.go new file mode 100644 index 000000000000..932154c61ada --- /dev/null +++ b/heartbeat/monitors/active/icmp/stdloop.go @@ -0,0 +1,405 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package icmp + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "math/rand" + "net" + "os" + "runtime" + "sync" + "time" + + "golang.org/x/net/icmp" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" +) + +type stdICMPLoop struct { + conn4, conn6 *icmp.PacketConn + recv chan packet + + mutex sync.Mutex + requests map[requestID]*requestContext +} + +type timeoutError struct { +} + +const ( + // iana types + protocolICMP = 1 + protocolIPv6ICMP = 58 +) + +type packet struct { + ts time.Time + addr net.Addr + + Type icmp.Type // type, either ipv4.ICMPType or ipv6.ICMPType + Code int // code + Checksum int // checksum + Echo icmp.Echo +} + +type requestID struct { + addr string + proto int + id int + seq int +} + +type requestContext struct { + l *stdICMPLoop + id requestID + ts time.Time + result chan requestResult +} + +type requestResult struct { + packet packet + err error +} + +// stdLoop is a singleton for our main ICMP loop since it doesn't +// make sense to have multiples. While having a singleton is ugly +// is mandatory for the ICMP interface in go, where all monitors +// must share a single loop. +// These vars should not be used directly, but rather getStdLoop +// should be invoked to initialize and return stdLoop. +var ( + stdICMPLoopInit sync.Once + stdICMPLoopSingleton *stdICMPLoop +) + +func getStdLoop() (*stdICMPLoop, error) { + var loopErr error + stdICMPLoopInit.Do(func() { + debugf("initializing ICMP loop") + stdICMPLoopSingleton, loopErr = newICMPLoop() + if loopErr == nil { + debugf("ICMP loop successfully initialized") + } + }) + return stdICMPLoopSingleton, loopErr +} + +func noPingCapabilityError(message string) error { + return fmt.Errorf(fmt.Sprintf("Insufficient privileges to perform ICMP ping. %s", message)) +} + +func newICMPLoop() (*stdICMPLoop, error) { + // Log errors at info level, as the loop is setup globally when ICMP module is loaded + // first (not yet configured). + // With multiple configurations using the icmp loop, we have to postpose + // IPv4/IPv6 checking + conn4 := createListener("IPv4", "ip4:icmp") + conn6 := createListener("IPv6", "ip6:ipv6-icmp") + unprivilegedPossible := false + l := &stdICMPLoop{ + conn4: conn4, + conn6: conn6, + recv: make(chan packet, 16), + requests: map[requestID]*requestContext{}, + } + + if l.conn4 == nil && l.conn6 == nil { + switch runtime.GOOS { + case "linux", "darwin": + unprivilegedPossible = true + //This is non-privileged ICMP, not udp + l.conn4 = createListener("Unprivileged IPv4", "udp4") + l.conn6 = createListener("Unprivileged IPv6", "udp6") + } + } + + if l.conn4 != nil { + go l.runICMPRecv(l.conn4, protocolICMP) + } + if l.conn6 != nil { + go l.runICMPRecv(l.conn6, protocolIPv6ICMP) + } + + if l.conn4 == nil && l.conn6 == nil { + if unprivilegedPossible { + var buffer bytes.Buffer + path, _ := os.Executable() + buffer.WriteString("You can run without root by setting cap_net_raw:\n sudo setcap cap_net_raw+eip ") + buffer.WriteString(path + " \n") + buffer.WriteString("Your system allows the use of unprivileged ping by setting net.ipv4.ping_group_range \n sysctl -w net.ipv4.ping_group_range=' ' ") + return nil, noPingCapabilityError(buffer.String()) + } + return nil, noPingCapabilityError("You must provide the appropriate permissions to this executable") + } + + return l, nil +} + +func (l *stdICMPLoop) checkNetworkMode(mode string) error { + ip4, ip6 := false, false + switch mode { + case "ip4": + ip4 = true + case "ip6": + ip6 = true + case "ip": + ip4, ip6 = true, true + default: + return fmt.Errorf("'%v' is not supported", mode) + } + + if ip4 && l.conn4 == nil { + return errors.New("failed to initiate IPv4 support. Check log details for permission configuration") + } + if ip6 && l.conn6 == nil { + return errors.New("failed to initiate IPv6 support. Check log details for permission configuration") + } + + return nil +} + +func (l *stdICMPLoop) runICMPRecv(conn *icmp.PacketConn, proto int) { + for { + bytes := make([]byte, 512) + conn.SetReadDeadline(time.Now().Add(1 * time.Second)) + _, addr, err := conn.ReadFrom(bytes) + if err != nil { + if neterr, ok := err.(*net.OpError); ok { + if neterr.Timeout() { + continue + } else { + // TODO: report error and quit loop? + return + } + } + } + + ts := time.Now() + m, err := icmp.ParseMessage(proto, bytes) + if err != nil { + continue + } + + // process echo reply only + if m.Type != ipv4.ICMPTypeEchoReply && m.Type != ipv6.ICMPTypeEchoReply { + continue + } + echo, ok := m.Body.(*icmp.Echo) + if !ok { + continue + } + + id := requestID{ + addr: addr.String(), + proto: proto, + id: echo.ID, + seq: echo.Seq, + } + + l.mutex.Lock() + ctx := l.requests[id] + if ctx != nil { + delete(l.requests, id) + } + l.mutex.Unlock() + + // no return context available for echo reply -> handle next message + if ctx == nil { + continue + } + + ctx.result <- requestResult{ + packet: packet{ + ts: ts, + addr: addr, + + Type: m.Type, + Code: m.Code, + Checksum: m.Checksum, + Echo: *echo, + }, + } + } +} + +func (l *stdICMPLoop) ping( + addr *net.IPAddr, + timeout time.Duration, + interval time.Duration, +) (time.Duration, int, error) { + var err error + toTimer := time.NewTimer(timeout) + defer toTimer.Stop() + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + done := false + doneSignal := make(chan struct{}) + + success := false + var rtt time.Duration + + // results accepts first response received only + results := make(chan time.Duration, 1) + requests := 0 + + awaitResponse := func(ctx *requestContext) { + select { + case <-doneSignal: + ctx.Stop() + + case r := <-ctx.result: + // ctx is removed from request tables automatically a response is + // received. No need to stop it. + + // try to push RTT. The first result available will be reported + select { + case results <- r.packet.ts.Sub(ctx.ts): + default: + } + } + } + + for !done { + var ctx *requestContext + ctx, err = l.sendEchoRequest(addr) + if err != nil { + close(doneSignal) + break + } + go awaitResponse(ctx) + requests++ + + select { + case <-toTimer.C: + // no response for any active request received. Finish loop + // and remove all pingRequests from request table. + done = true + close(doneSignal) + + case <-ticker.C: + // No response yet. Send another request with every tick + + case rtt = <-results: + success = true + + done = true + close(doneSignal) + } + } + + if err != nil { + return 0, 0, err + } + + if !success { + return 0, requests, timeoutError{} + } + + return rtt, requests, nil +} + +func (l *stdICMPLoop) sendEchoRequest(addr *net.IPAddr) (*requestContext, error) { + var conn *icmp.PacketConn + var proto int + var typ icmp.Type + + if l == nil { + panic("icmp loop not initialized") + } + + if isIPv4(addr.IP) { + conn = l.conn4 + proto = protocolICMP + typ = ipv4.ICMPTypeEcho + } else if isIPv6(addr.IP) { + conn = l.conn6 + proto = protocolIPv6ICMP + typ = ipv6.ICMPTypeEchoRequest + } else { + return nil, fmt.Errorf("%v is unknown ip address", addr) + } + + id := requestID{ + addr: addr.String(), + proto: proto, + id: rand.Intn(0xffff), + seq: rand.Intn(0xffff), + } + + ctx := &requestContext{ + l: l, + id: id, + result: make(chan requestResult, 1), + } + + l.mutex.Lock() + l.requests[id] = ctx + l.mutex.Unlock() + + payloadBuf := make([]byte, 0, 8) + payload := bytes.NewBuffer(payloadBuf) + ts := time.Now() + binary.Write(payload, binary.BigEndian, ts.UnixNano()) + + msg := &icmp.Message{ + Type: typ, + Body: &icmp.Echo{ + ID: id.id, + Seq: id.seq, + Data: payload.Bytes(), + }, + } + encoded, _ := msg.Marshal(nil) + + _, err := conn.WriteTo(encoded, addr) + if err != nil { + return nil, err + } + + ctx.ts = ts + return ctx, nil +} + +func createListener(name, network string) *icmp.PacketConn { + conn, err := icmp.ListenPacket(network, "") + + // XXX: need to check for conn == nil, as 'err != nil' seems always to be + // true, even if error value itself is `nil`. Checking for conn suppresses + // misleading log message. + if conn == nil && err != nil { + return nil + } + return conn +} + +// timeoutError implements net.Error interface +func (timeoutError) Error() string { return "ping timeout" } +func (timeoutError) Timeout() bool { return true } +func (timeoutError) Temporary() bool { return true } + +func (r *requestContext) Stop() { + r.l.mutex.Lock() + delete(r.l.requests, r.id) + r.l.mutex.Unlock() +}