Skip to content

Commit

Permalink
Fix icmp duration (elastic#17920)
Browse files Browse the repository at this point in the history
At some point in the past we apparently stopped nesting [ICMP
fields](https://www.elastic.co/guide/en/beats/heartbeat/master/exported-fields-icmp.html).
This patch fixes that issue.
  • Loading branch information
andrewvc authored Apr 29, 2020
1 parent b552dd8 commit 754eac1
Show file tree
Hide file tree
Showing 4 changed files with 554 additions and 390 deletions.
79 changes: 52 additions & 27 deletions heartbeat/monitors/active/icmp/icmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,46 +32,71 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
)

var debugf = logp.MakeDebug("icmp")

func init() {
monitors.RegisterActive("icmp", create)
}

var debugf = logp.MakeDebug("icmp")

func create(
name string,
cfg *common.Config,
commonConfig *common.Config,
) (jobs []jobs.Job, endpoints int, err error) {
loop, err := getStdLoop()
if err != nil {
logp.Warn("Failed to initialize ICMP loop %v", err)
return nil, 0, err
}

config := DefaultConfig
if err := cfg.Unpack(&config); err != nil {
if err := commonConfig.Unpack(&config); err != nil {
return nil, 0, err
}

ipVersion := config.Mode.Network()
if len(config.Hosts) > 0 && ipVersion == "" {
err := fmt.Errorf("pinging hosts requires ipv4 or ipv6 mode enabled")
jf, err := newJobFactory(config, monitors.NewStdResolver(), loop)
if err != nil {
return nil, 0, err
}
return jf.makeJobs()

var loopErr error
loopInit.Do(func() {
debugf("initializing ICMP loop")
loop, loopErr = newICMPLoop()
})
if loopErr != nil {
logp.Warn("Failed to initialize ICMP loop %v", loopErr)
return nil, 0, loopErr
}

type jobFactory struct {
config Config
resolver monitors.Resolver
loop ICMPLoop
ipVersion string
}

func newJobFactory(config Config, resolver monitors.Resolver, loop ICMPLoop) (*jobFactory, error) {
jf := &jobFactory{config: config, resolver: resolver, loop: loop}
err := jf.checkConfig()
if err != nil {
return nil, err
}
debugf("ICMP loop successfully initialized")

if err := loop.checkNetworkMode(ipVersion); err != nil {
return jf, nil
}

func (jf *jobFactory) checkConfig() error {
jf.ipVersion = jf.config.Mode.Network()
if len(jf.config.Hosts) > 0 && jf.ipVersion == "" {
err := fmt.Errorf("pinging hosts requires ipv4 or ipv6 mode enabled")
return err
}

return nil
}

func (jf *jobFactory) makeJobs() (j []jobs.Job, endpoints int, err error) {
if err := jf.loop.checkNetworkMode(jf.ipVersion); err != nil {
return nil, 0, err
}

pingFactory := monitors.MakePingIPFactory(createPingIPFactory(&config))
pingFactory := jf.pingIPFactory(&jf.config)

for _, host := range config.Hosts {
job, err := monitors.MakeByHostJob(host, config.Mode, monitors.NewStdResolver(), pingFactory)
for _, host := range jf.config.Hosts {
job, err := monitors.MakeByHostJob(host, jf.config.Mode, monitors.NewStdResolver(), pingFactory)

if err != nil {
return nil, 0, err
Expand All @@ -82,25 +107,25 @@ func create(
return nil, 0, err
}

jobs = append(jobs, wrappers.WithURLField(u, job))
j = append(j, wrappers.WithURLField(u, job))
}

return jobs, len(config.Hosts), nil
return j, len(jf.config.Hosts), nil
}

func createPingIPFactory(config *Config) func(*beat.Event, *net.IPAddr) error {
return func(event *beat.Event, ip *net.IPAddr) error {
rtt, n, err := loop.ping(ip, config.Timeout, config.Wait)
func (jf *jobFactory) pingIPFactory(config *Config) func(*net.IPAddr) jobs.Job {
return monitors.MakePingIPFactory(func(event *beat.Event, ip *net.IPAddr) error {
rtt, n, err := jf.loop.ping(ip, config.Timeout, config.Wait)
if err != nil {
return err
}

icmpFields := common.MapStr{"requests": n}
if err == nil {
icmpFields["rtt"] = look.RTT(rtt)
eventext.MergeEventFields(event, icmpFields)
eventext.MergeEventFields(event, common.MapStr{"icmp": icmpFields})
}

return nil
}
})
}
90 changes: 90 additions & 0 deletions heartbeat/monitors/active/icmp/icmp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package icmp

import (
"net"
"net/url"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/heartbeat/hbtest"
"github.com/elastic/beats/v7/heartbeat/look"
"github.com/elastic/beats/v7/heartbeat/monitors"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/beats/v7/heartbeat/scheduler/schedule"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/go-lookslike"
"github.com/elastic/go-lookslike/testslike"
)

func TestICMPFields(t *testing.T) {
host := "localhost"
hostURL := &url.URL{Scheme: "icmp", Host: host}
ip := "127.0.0.1"
cfg := Config{
Hosts: []string{host},
Mode: monitors.IPSettings{IPv4: true, IPv6: false, Mode: monitors.PingAny},
}
testMockLoop, e := execTestICMPCheck(t, cfg)

validator := lookslike.Strict(
lookslike.Compose(
hbtest.BaseChecks(ip, "up", "icmp"),
hbtest.SummaryChecks(1, 0),
hbtest.URLChecks(t, hostURL),
hbtest.ResolveChecks(ip),
lookslike.MustCompile(map[string]interface{}{
"icmp.requests": 1,
"icmp.rtt": look.RTT(testMockLoop.pingRtt),
}),
),
)
testslike.Test(t, validator, e.Fields)
}

func execTestICMPCheck(t *testing.T, cfg Config) (mockLoop, *beat.Event) {
tl := mockLoop{pingRtt: time.Microsecond * 1000, pingRequests: 1}
jf, err := newJobFactory(cfg, monitors.NewStdResolver(), tl)
require.NoError(t, err)
j, endpoints, err := jf.makeJobs()
require.Len(t, j, 1)
require.Equal(t, 1, endpoints)
e := &beat.Event{}
sched, _ := schedule.Parse("@every 1s")
wrapped := wrappers.WrapCommon(j, "test", "", "icmp", sched, time.Duration(0))
wrapped[0](e)
return tl, e
}

type mockLoop struct {
pingRtt time.Duration
pingRequests int
pingErr error
checkNetworkModeErr error
}

func (t mockLoop) checkNetworkMode(mode string) error {
return t.checkNetworkModeErr
}

func (t mockLoop) ping(addr *net.IPAddr, timeout time.Duration, interval time.Duration) (time.Duration, int, error) {
return t.pingRtt, t.pingRequests, t.pingErr
}
Loading

0 comments on commit 754eac1

Please sign in to comment.