From d4eac97d9029cf6031757669fe2f95d101e9640f Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Thu, 15 Mar 2018 11:35:45 -0400 Subject: [PATCH] Send available perfmon data on error (#6542) When the windows/perfmon metricset encountered any error it would discard any valid metrics it had read and send a single error event. With this change it will send one event per counter. If an individual counter returns an error then it will send an event for that counter containing the error.message field and a zero-value measurement field. Depending on your use case you may want to update any metric aggregations you have to ignore error events. I'm also adding more documentation for the configuration options. --- CHANGELOG.asciidoc | 1 + metricbeat/mb/testing/modules.go | 50 ++++++++++-- .../module/windows/perfmon/_meta/data.json | 18 +---- .../windows/perfmon/_meta/docs.asciidoc | 79 ++++++++++++++----- .../perfmon/pdh_integration_windows_test.go | 25 +++--- .../module/windows/perfmon/pdh_windows.go | 79 ++++++++++--------- metricbeat/module/windows/perfmon/perfmon.go | 31 +++++--- 7 files changed, 182 insertions(+), 101 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 69e2e271d711..23f6bbd67ab9 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -142,6 +142,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Fix system.filesystem.used.pct value to match what df reports. {issue}5494[5494] - Fix namespace disambiguation in Kubernetes state_* metricsets. {issue}6281[6281] - Fix Kubernetes overview dashboard views for non default time ranges. {issue}6395{6395} +- Fix Windows perfmon metricset so that it sends metrics when an error occurs. {pull}6542[6542] *Packetbeat* diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index c638a09a87a1..d590da001f17 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -138,6 +138,42 @@ func ReportingFetch(metricSet mb.ReportingMetricSet) ([]common.MapStr, []error) return r.events, r.errs } +// NewReportingMetricSetV2 returns a new ReportingMetricSetV2 instance. Then +// you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet. +func NewReportingMetricSetV2(t testing.TB, config interface{}) mb.ReportingMetricSetV2 { + metricSet := newMetricSet(t, config) + + reportingMetricSetV2, ok := metricSet.(mb.ReportingMetricSetV2) + if !ok { + t.Fatal("MetricSet does not implement ReportingMetricSetV2") + } + + return reportingMetricSetV2 +} + +type capturingReporterV2 struct { + events []mb.Event + errs []error +} + +func (r *capturingReporterV2) Event(event mb.Event) bool { + r.events = append(r.events, event) + return true +} + +func (r *capturingReporterV2) Error(err error) bool { + r.errs = append(r.errs, err) + return true +} + +// ReportingFetchV2 runs the given reporting metricset and returns all of the +// events and errors that occur during that period. +func ReportingFetchV2(metricSet mb.ReportingMetricSetV2) ([]mb.Event, []error) { + r := &capturingReporterV2{} + metricSet.Fetch(r) + return r.events, r.errs +} + // NewPushMetricSet instantiates a new PushMetricSet using the given // configuration. The ModuleFactory and MetricSetFactory are obtained from the // global Registry. @@ -217,16 +253,16 @@ func NewPushMetricSetV2(t testing.TB, config interface{}) mb.PushMetricSetV2 { return pushMetricSet } -// capturingReporterV2 stores all the events and errors from a metricset's +// capturingPushReporterV2 stores all the events and errors from a metricset's // Run method. -type capturingReporterV2 struct { +type capturingPushReporterV2 struct { doneC chan struct{} eventsC chan mb.Event } // report writes an event to the output channel and returns true. If the output // is closed it returns false. -func (r *capturingReporterV2) report(event mb.Event) bool { +func (r *capturingPushReporterV2) report(event mb.Event) bool { select { case <-r.doneC: // Publisher is stopped. @@ -237,17 +273,17 @@ func (r *capturingReporterV2) report(event mb.Event) bool { } // Event stores the passed-in event into the events array -func (r *capturingReporterV2) Event(event mb.Event) bool { +func (r *capturingPushReporterV2) Event(event mb.Event) bool { return r.report(event) } // Error stores the given error into the errors array. -func (r *capturingReporterV2) Error(err error) bool { +func (r *capturingPushReporterV2) Error(err error) bool { return r.report(mb.Event{Error: err}) } // Done returns the Done channel for this reporter. -func (r *capturingReporterV2) Done() <-chan struct{} { +func (r *capturingPushReporterV2) Done() <-chan struct{} { return r.doneC } @@ -255,7 +291,7 @@ func (r *capturingReporterV2) Done() <-chan struct{} { // time and returns all of the events and errors that occur during that period. func RunPushMetricSetV2(timeout time.Duration, waitEvents int, metricSet mb.PushMetricSetV2) []mb.Event { var ( - r = &capturingReporterV2{doneC: make(chan struct{}), eventsC: make(chan mb.Event)} + r = &capturingPushReporterV2{doneC: make(chan struct{}), eventsC: make(chan mb.Event)} wg sync.WaitGroup events []mb.Event ) diff --git a/metricbeat/module/windows/perfmon/_meta/data.json b/metricbeat/module/windows/perfmon/_meta/data.json index 8247ffa254e2..e6b39dd85880 100644 --- a/metricbeat/module/windows/perfmon/_meta/data.json +++ b/metricbeat/module/windows/perfmon/_meta/data.json @@ -1,5 +1,5 @@ { - "@timestamp": "2016-05-23T08:05:34.853Z", + "@timestamp": "2017-10-12T08:05:34.853Z", "beat": { "hostname": "host.example.com", "name": "host.example.com" @@ -9,25 +9,13 @@ "name": "perfmon", "rtt": 115 }, - "type": "metricsets", "windows": { "perfmon": { - "disk": { - "bytes": { - "read": { - "total": 0 - } - } - }, "processor": { + "name": "_Total", "time": { - "idle": { - "average": { - "ns": 670661.5894039735 - } - }, "total": { - "pct": 3.135058464112306 + "pct": 1.4663385364361736 } } } diff --git a/metricbeat/module/windows/perfmon/_meta/docs.asciidoc b/metricbeat/module/windows/perfmon/_meta/docs.asciidoc index dd8d1e01653b..28c05d3709d0 100644 --- a/metricbeat/module/windows/perfmon/_meta/docs.asciidoc +++ b/metricbeat/module/windows/perfmon/_meta/docs.asciidoc @@ -1,33 +1,76 @@ -The `perfmon` metricset of the Windows module reads Windows -performance counters. +The `perfmon` metricset of the Windows module reads Windows performance +counters. [float] === Configuration You must configure queries for the Windows performance counters that you wish -to collect. The example below collects processor time and disk writes. -`ignore_non_existent_counters` ignores failures for non-existent counters without -to interrupt the service. With `format` you can set the output format for a specific counter. -Possible values are `float` and `long`. If nothing is selected the default value is `float`. -With `instance_name`, you can specify the name of the instance. Use this setting when: -- You want to use an instance name that is different from the computed name. For example, `Total` instead of `_Total`. -- You specify a counter that has no instance. For example, `\TCPIP Performance Diagnostics\IPv4 NBLs/sec indicated without prevalidation`. -For wildcard queries this setting has no effect. - +to collect. The example below collects processor time and disk writes every +10 seconds. If either of the counters do not exist it will ignore the error. [source,yaml] ---- - module: windows - metricsets: ["perfmon"] + metricsets: [perfmon] period: 10s perfmon.ignore_non_existent_counters: true perfmon.counters: - - instance_label: "processor.name" - instance_name: "Total" - measurement_label: "processor.time.total.pct" + - instance_label: processor.name + instance_name: total + measurement_label: processor.time.total.pct query: '\Processor Information(_Total)\% Processor Time' - - instance_label: "diskio.name" - measurement_label: "diskio.write.bytes" + + - instance_label: physical_disk.name + measurement_label: physical_disk.write.per_sec query: '\PhysicalDisk(*)\Disk Writes/sec' - format: "long" + + - instance_label: physical_disk.name + measurement_label: physical_disk.write.time.pct + query: '\PhysicalDisk(*)\% Disk Write Time' ---- + +*`ignore_non_existent_counters`*:: A boolean option that causes the +metricset to ignore errors caused by counters that do not exist when set to +true. Instead of an error, a message will be logged at the info level stating +that the counter does not exist. + +*`counters`*:: Counters specifies a list of queries to perform. Each individual +counter requires three config options - `instance_label`, `measurement_label`, +and `query`. + +[float] +==== Counter Configuration + +Each item in the `counters` list specifies a perfmon query to perform. In the +events generated by the metricset these configuration options map to the field +values as shown below. + +---- +"%[instance_label]": "%[instance_name] or ", +"%[measurement_label]": , +---- + +*`instance_label`*:: The label used to identify the counter instance. This +field is required. + +*`instance_name`*:: The instance name to use in the event when the counter's +path (`query`) does not include an instance or when you want to override the +instance name. For example with `\Processor Information(_Total)` the +instance name would be `_Total` and by setting `instance_name: total` you can +override the value. ++ +The setting has no effect with wildcard queries (e.g. +`\PhysicalDisk(*)\Disk Writes/sec`). + +*`measurement_label`*:: The label used for the value returned by the query. +This field is required. + +*`query`*:: The perfmon query. This is the counter path specified in +Performance Data Helper (PDH) syntax. This field is required. For example +`\Processor Information(_Total)\% Processor Time`. An asterisk can be used in +place of an instance name to perform a wildcard query that generates an event +for each counter instance (e.g. `\PhysicalDisk(*)\Disk Writes/sec`). + +*`format`*:: Format of the measurement value. The value can be either `float` or +`long`. The default is `float`. + diff --git a/metricbeat/module/windows/perfmon/pdh_integration_windows_test.go b/metricbeat/module/windows/perfmon/pdh_integration_windows_test.go index e4f6d6ccdb7d..f6f101f92d16 100644 --- a/metricbeat/module/windows/perfmon/pdh_integration_windows_test.go +++ b/metricbeat/module/windows/perfmon/pdh_integration_windows_test.go @@ -7,10 +7,11 @@ import ( "time" "unsafe" - mbtest "github.com/elastic/beats/metricbeat/mb/testing" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/metricbeat/mb" + mbtest "github.com/elastic/beats/metricbeat/mb/testing" ) const processorTimeCounter = `\Processor Information(_Total)\% Processor Time` @@ -38,16 +39,20 @@ func TestData(t *testing.T) { }, } - f := mbtest.NewEventsFetcher(t, config) - - f.Fetch() - + ms := mbtest.NewReportingMetricSetV2(t, config) + mbtest.ReportingFetchV2(ms) time.Sleep(60 * time.Millisecond) - err := mbtest.WriteEvents(f, t) - if err != nil { - t.Fatal("write", err) + events, errs := mbtest.ReportingFetchV2(ms) + if len(errs) > 0 { + t.Fatal(errs) } + if len(events) == 0 { + t.Fatal("no events received") + } + + beatEvent := mbtest.StandardizeEvent(ms, events[0], mb.AddMetricSetInfo) + mbtest.WriteEventToDataJSON(t, beatEvent) } func TestQuery(t *testing.T) { @@ -306,7 +311,7 @@ func TestWildcardQuery(t *testing.T) { t.Fatal(err) } - pctKey, err := values[0].HasKey("processor.time.pct") + pctKey, err := values[0].MetricSetFields.HasKey("processor.time.pct") if err != nil { t.Fatal(err) } diff --git a/metricbeat/module/windows/perfmon/pdh_windows.go b/metricbeat/module/windows/perfmon/pdh_windows.go index f8f9f6e08119..a5d598926978 100644 --- a/metricbeat/module/windows/perfmon/pdh_windows.go +++ b/metricbeat/module/windows/perfmon/pdh_windows.go @@ -10,13 +10,12 @@ import ( "unicode/utf16" "unsafe" - "github.com/elastic/beats/libbeat/logp" - - "github.com/joeshaw/multierror" "github.com/pkg/errors" "golang.org/x/sys/windows" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/winlogbeat/sys" ) @@ -311,9 +310,10 @@ func (q *Query) Close() error { type PerfmonReader struct { query *Query // PDH Query - instanceLabel map[string]string // Mapping of counter path to key used in output. - measurement map[string]string - executed bool // Indicates if the query has been executed. + instanceLabel map[string]string // Mapping of counter path to key used for the label (e.g. processor.name) + measurement map[string]string // Mapping of counter path to key used for the value (e.g. processor.cpu_time). + executed bool // Indicates if the query has been executed. + log *logp.Logger } // NewPerfmonReader creates a new instance of PerfmonReader. @@ -327,6 +327,7 @@ func NewPerfmonReader(config Config) (*PerfmonReader, error) { query: query, instanceLabel: map[string]string{}, measurement: map[string]string{}, + log: logp.NewLogger("perfmon"), } for _, counter := range config.CounterConfig { @@ -339,13 +340,16 @@ func NewPerfmonReader(config Config) (*PerfmonReader, error) { } if err := query.AddCounter(counter.Query, format, counter.InstanceName); err != nil { if config.IgnoreNECounters { - if err == PDH_CSTATUS_NO_COUNTER { - logp.Info(`ignore non existent counter (path="%v")`, counter.Query) + switch err { + case PDH_CSTATUS_NO_COUNTER, PDH_CSTATUS_NO_COUNTERNAME, + PDH_CSTATUS_NO_INSTANCE, PDH_CSTATUS_NO_OBJECT: + r.log.Infow("Ignoring non existent counter", "error", err, + logp.Namespace("perfmon"), "query", counter.Query) continue } } query.Close() - return nil, errors.Wrapf(err, `failed to add counter (path="%v")`, counter.Query) + return nil, errors.Wrapf(err, `failed to add counter (query="%v")`, counter.Query) } r.instanceLabel[counter.Query] = counter.InstanceLabel @@ -356,9 +360,9 @@ func NewPerfmonReader(config Config) (*PerfmonReader, error) { return r, nil } -func (r *PerfmonReader) Read() ([]common.MapStr, error) { +func (r *PerfmonReader) Read() ([]mb.Event, error) { if err := r.query.Execute(); err != nil { - return nil, err + return nil, errors.Wrap(err, "failed querying counter values") } // Get the values. @@ -368,38 +372,37 @@ func (r *PerfmonReader) Read() ([]common.MapStr, error) { } // Write the values into the map. - result := make([]common.MapStr, 0, len(values)) - var errs multierror.Errors - - for counterPath, counter := range values { - for _, val := range counter { - ev := common.MapStr{} - instanceKey := r.instanceLabel[counterPath] - ev.Put(instanceKey, val.Instance) - measurementKey := r.measurement[counterPath] - ev.Put(measurementKey, val.Measurement) - - if val.Err != nil { - switch val.Err { - case PDH_CALC_NEGATIVE_DENOMINATOR: - case PDH_INVALID_DATA: - if r.executed { - errs = append(errs, errors.Wrapf(val.Err, "key=%v", measurementKey)) - } - default: - errs = append(errs, errors.Wrapf(val.Err, "key=%v", measurementKey)) - } + events := make([]mb.Event, 0, len(values)) + + for counterPath, values := range values { + for _, val := range values { + if val.Err != nil && !r.executed { + r.log.Debugw("Ignoring the first measurement because the data isn't ready", + "error", val.Err, logp.Namespace("perfmon"), "query", counterPath) + continue } - result = append(result, ev) - } - } + event := mb.Event{ + MetricSetFields: common.MapStr{}, + Error: errors.Wrapf(val.Err, "failed on query=%v", counterPath), + } + + if val.Instance != "" { + event.MetricSetFields.Put(r.instanceLabel[counterPath], val.Instance) + } - if !r.executed { - r.executed = true + if val.Measurement != nil { + event.MetricSetFields.Put(r.measurement[counterPath], val.Measurement) + } else { + event.MetricSetFields.Put(r.measurement[counterPath], 0) + } + + events = append(events, event) + } } - return result, errs.Err() + r.executed = true + return events, nil } func (e PdhErrno) Error() string { diff --git a/metricbeat/module/windows/perfmon/perfmon.go b/metricbeat/module/windows/perfmon/perfmon.go index 113549fece47..c7cef78c1d33 100644 --- a/metricbeat/module/windows/perfmon/perfmon.go +++ b/metricbeat/module/windows/perfmon/perfmon.go @@ -3,22 +3,21 @@ package perfmon import ( - "fmt" "strings" "github.com/pkg/errors" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" ) // CounterConfig for perfmon counters. type CounterConfig struct { - InstanceLabel string `config:"instance_label" validate:"required"` + InstanceLabel string `config:"instance_label" validate:"required"` InstanceName string `config:"instance_name"` MeasurementLabel string `config:"measurement_label" validate:"required"` - Query string `config:"query" validate:"required"` + Query string `config:"query" validate:"required"` Format string `config:"format"` } @@ -37,14 +36,14 @@ func init() { type MetricSet struct { mb.BaseMetricSet reader *PerfmonReader + log *logp.Logger } // New create a new instance of the MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Beta("The perfmon metricset is beta") - config := Config{} - + var config Config if err := base.Module().UnpackConfig(&config); err != nil { return nil, err } @@ -56,28 +55,34 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { value.Format = "float" case "float", "long": default: - err := fmt.Errorf("format '%s' for counter '%s' are not valid", value.Format, value.InstanceLabel) - return nil, errors.Wrap(err, "initialization failed") + return nil, errors.Errorf("initialization failed: format '%s' "+ + "for counter '%s' is invalid (must be float or long)", + value.Format, value.InstanceLabel) } } reader, err := NewPerfmonReader(config) if err != nil { - return nil, errors.Wrap(err, "initialization failed") + return nil, errors.Wrap(err, "initialization of reader failed") } return &MetricSet{ BaseMetricSet: base, reader: reader, + log: logp.NewLogger("perfmon"), }, nil } -func (m *MetricSet) Fetch() ([]common.MapStr, error) { - data, err := m.reader.Read() +func (m *MetricSet) Fetch(report mb.ReporterV2) { + events, err := m.reader.Read() if err != nil { - return nil, errors.Wrap(err, "failed reading counters") + m.log.Debugw("Failed reading counters", "error", err) + err = errors.Wrap(err, "failed reading counters") + report.Error(err) } - return data, nil + for _, event := range events { + report.Event(event) + } }