diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 69e2e271d711..23f6bbd67ab9 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -142,6 +142,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Fix system.filesystem.used.pct value to match what df reports. {issue}5494[5494] - Fix namespace disambiguation in Kubernetes state_* metricsets. {issue}6281[6281] - Fix Kubernetes overview dashboard views for non default time ranges. {issue}6395{6395} +- Fix Windows perfmon metricset so that it sends metrics when an error occurs. {pull}6542[6542] *Packetbeat* diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index c638a09a87a1..d590da001f17 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -138,6 +138,42 @@ func ReportingFetch(metricSet mb.ReportingMetricSet) ([]common.MapStr, []error) return r.events, r.errs } +// NewReportingMetricSetV2 returns a new ReportingMetricSetV2 instance. Then +// you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet. +func NewReportingMetricSetV2(t testing.TB, config interface{}) mb.ReportingMetricSetV2 { + metricSet := newMetricSet(t, config) + + reportingMetricSetV2, ok := metricSet.(mb.ReportingMetricSetV2) + if !ok { + t.Fatal("MetricSet does not implement ReportingMetricSetV2") + } + + return reportingMetricSetV2 +} + +type capturingReporterV2 struct { + events []mb.Event + errs []error +} + +func (r *capturingReporterV2) Event(event mb.Event) bool { + r.events = append(r.events, event) + return true +} + +func (r *capturingReporterV2) Error(err error) bool { + r.errs = append(r.errs, err) + return true +} + +// ReportingFetchV2 runs the given reporting metricset and returns all of the +// events and errors that occur during that period. +func ReportingFetchV2(metricSet mb.ReportingMetricSetV2) ([]mb.Event, []error) { + r := &capturingReporterV2{} + metricSet.Fetch(r) + return r.events, r.errs +} + // NewPushMetricSet instantiates a new PushMetricSet using the given // configuration. The ModuleFactory and MetricSetFactory are obtained from the // global Registry. @@ -217,16 +253,16 @@ func NewPushMetricSetV2(t testing.TB, config interface{}) mb.PushMetricSetV2 { return pushMetricSet } -// capturingReporterV2 stores all the events and errors from a metricset's +// capturingPushReporterV2 stores all the events and errors from a metricset's // Run method. -type capturingReporterV2 struct { +type capturingPushReporterV2 struct { doneC chan struct{} eventsC chan mb.Event } // report writes an event to the output channel and returns true. If the output // is closed it returns false. -func (r *capturingReporterV2) report(event mb.Event) bool { +func (r *capturingPushReporterV2) report(event mb.Event) bool { select { case <-r.doneC: // Publisher is stopped. @@ -237,17 +273,17 @@ func (r *capturingReporterV2) report(event mb.Event) bool { } // Event stores the passed-in event into the events array -func (r *capturingReporterV2) Event(event mb.Event) bool { +func (r *capturingPushReporterV2) Event(event mb.Event) bool { return r.report(event) } // Error stores the given error into the errors array. -func (r *capturingReporterV2) Error(err error) bool { +func (r *capturingPushReporterV2) Error(err error) bool { return r.report(mb.Event{Error: err}) } // Done returns the Done channel for this reporter. -func (r *capturingReporterV2) Done() <-chan struct{} { +func (r *capturingPushReporterV2) Done() <-chan struct{} { return r.doneC } @@ -255,7 +291,7 @@ func (r *capturingReporterV2) Done() <-chan struct{} { // time and returns all of the events and errors that occur during that period. func RunPushMetricSetV2(timeout time.Duration, waitEvents int, metricSet mb.PushMetricSetV2) []mb.Event { var ( - r = &capturingReporterV2{doneC: make(chan struct{}), eventsC: make(chan mb.Event)} + r = &capturingPushReporterV2{doneC: make(chan struct{}), eventsC: make(chan mb.Event)} wg sync.WaitGroup events []mb.Event ) diff --git a/metricbeat/module/windows/perfmon/_meta/data.json b/metricbeat/module/windows/perfmon/_meta/data.json index 8247ffa254e2..e6b39dd85880 100644 --- a/metricbeat/module/windows/perfmon/_meta/data.json +++ b/metricbeat/module/windows/perfmon/_meta/data.json @@ -1,5 +1,5 @@ { - "@timestamp": "2016-05-23T08:05:34.853Z", + "@timestamp": "2017-10-12T08:05:34.853Z", "beat": { "hostname": "host.example.com", "name": "host.example.com" @@ -9,25 +9,13 @@ "name": "perfmon", "rtt": 115 }, - "type": "metricsets", "windows": { "perfmon": { - "disk": { - "bytes": { - "read": { - "total": 0 - } - } - }, "processor": { + "name": "_Total", "time": { - "idle": { - "average": { - "ns": 670661.5894039735 - } - }, "total": { - "pct": 3.135058464112306 + "pct": 1.4663385364361736 } } } diff --git a/metricbeat/module/windows/perfmon/_meta/docs.asciidoc b/metricbeat/module/windows/perfmon/_meta/docs.asciidoc index dd8d1e01653b..28c05d3709d0 100644 --- a/metricbeat/module/windows/perfmon/_meta/docs.asciidoc +++ b/metricbeat/module/windows/perfmon/_meta/docs.asciidoc @@ -1,33 +1,76 @@ -The `perfmon` metricset of the Windows module reads Windows -performance counters. +The `perfmon` metricset of the Windows module reads Windows performance +counters. [float] === Configuration You must configure queries for the Windows performance counters that you wish -to collect. The example below collects processor time and disk writes. -`ignore_non_existent_counters` ignores failures for non-existent counters without -to interrupt the service. With `format` you can set the output format for a specific counter. -Possible values are `float` and `long`. If nothing is selected the default value is `float`. -With `instance_name`, you can specify the name of the instance. Use this setting when: -- You want to use an instance name that is different from the computed name. For example, `Total` instead of `_Total`. -- You specify a counter that has no instance. For example, `\TCPIP Performance Diagnostics\IPv4 NBLs/sec indicated without prevalidation`. -For wildcard queries this setting has no effect. - +to collect. The example below collects processor time and disk writes every +10 seconds. If either of the counters do not exist it will ignore the error. [source,yaml] ---- - module: windows - metricsets: ["perfmon"] + metricsets: [perfmon] period: 10s perfmon.ignore_non_existent_counters: true perfmon.counters: - - instance_label: "processor.name" - instance_name: "Total" - measurement_label: "processor.time.total.pct" + - instance_label: processor.name + instance_name: total + measurement_label: processor.time.total.pct query: '\Processor Information(_Total)\% Processor Time' - - instance_label: "diskio.name" - measurement_label: "diskio.write.bytes" + + - instance_label: physical_disk.name + measurement_label: physical_disk.write.per_sec query: '\PhysicalDisk(*)\Disk Writes/sec' - format: "long" + + - instance_label: physical_disk.name + measurement_label: physical_disk.write.time.pct + query: '\PhysicalDisk(*)\% Disk Write Time' ---- + +*`ignore_non_existent_counters`*:: A boolean option that causes the +metricset to ignore errors caused by counters that do not exist when set to +true. Instead of an error, a message will be logged at the info level stating +that the counter does not exist. + +*`counters`*:: Counters specifies a list of queries to perform. Each individual +counter requires three config options - `instance_label`, `measurement_label`, +and `query`. + +[float] +==== Counter Configuration + +Each item in the `counters` list specifies a perfmon query to perform. In the +events generated by the metricset these configuration options map to the field +values as shown below. + +---- +"%[instance_label]": "%[instance_name] or ", +"%[measurement_label]": , +---- + +*`instance_label`*:: The label used to identify the counter instance. This +field is required. + +*`instance_name`*:: The instance name to use in the event when the counter's +path (`query`) does not include an instance or when you want to override the +instance name. For example with `\Processor Information(_Total)` the +instance name would be `_Total` and by setting `instance_name: total` you can +override the value. ++ +The setting has no effect with wildcard queries (e.g. +`\PhysicalDisk(*)\Disk Writes/sec`). + +*`measurement_label`*:: The label used for the value returned by the query. +This field is required. + +*`query`*:: The perfmon query. This is the counter path specified in +Performance Data Helper (PDH) syntax. This field is required. For example +`\Processor Information(_Total)\% Processor Time`. An asterisk can be used in +place of an instance name to perform a wildcard query that generates an event +for each counter instance (e.g. `\PhysicalDisk(*)\Disk Writes/sec`). + +*`format`*:: Format of the measurement value. The value can be either `float` or +`long`. The default is `float`. + diff --git a/metricbeat/module/windows/perfmon/pdh_integration_windows_test.go b/metricbeat/module/windows/perfmon/pdh_integration_windows_test.go index e4f6d6ccdb7d..f6f101f92d16 100644 --- a/metricbeat/module/windows/perfmon/pdh_integration_windows_test.go +++ b/metricbeat/module/windows/perfmon/pdh_integration_windows_test.go @@ -7,10 +7,11 @@ import ( "time" "unsafe" - mbtest "github.com/elastic/beats/metricbeat/mb/testing" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/metricbeat/mb" + mbtest "github.com/elastic/beats/metricbeat/mb/testing" ) const processorTimeCounter = `\Processor Information(_Total)\% Processor Time` @@ -38,16 +39,20 @@ func TestData(t *testing.T) { }, } - f := mbtest.NewEventsFetcher(t, config) - - f.Fetch() - + ms := mbtest.NewReportingMetricSetV2(t, config) + mbtest.ReportingFetchV2(ms) time.Sleep(60 * time.Millisecond) - err := mbtest.WriteEvents(f, t) - if err != nil { - t.Fatal("write", err) + events, errs := mbtest.ReportingFetchV2(ms) + if len(errs) > 0 { + t.Fatal(errs) } + if len(events) == 0 { + t.Fatal("no events received") + } + + beatEvent := mbtest.StandardizeEvent(ms, events[0], mb.AddMetricSetInfo) + mbtest.WriteEventToDataJSON(t, beatEvent) } func TestQuery(t *testing.T) { @@ -306,7 +311,7 @@ func TestWildcardQuery(t *testing.T) { t.Fatal(err) } - pctKey, err := values[0].HasKey("processor.time.pct") + pctKey, err := values[0].MetricSetFields.HasKey("processor.time.pct") if err != nil { t.Fatal(err) } diff --git a/metricbeat/module/windows/perfmon/pdh_windows.go b/metricbeat/module/windows/perfmon/pdh_windows.go index f8f9f6e08119..a5d598926978 100644 --- a/metricbeat/module/windows/perfmon/pdh_windows.go +++ b/metricbeat/module/windows/perfmon/pdh_windows.go @@ -10,13 +10,12 @@ import ( "unicode/utf16" "unsafe" - "github.com/elastic/beats/libbeat/logp" - - "github.com/joeshaw/multierror" "github.com/pkg/errors" "golang.org/x/sys/windows" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/winlogbeat/sys" ) @@ -311,9 +310,10 @@ func (q *Query) Close() error { type PerfmonReader struct { query *Query // PDH Query - instanceLabel map[string]string // Mapping of counter path to key used in output. - measurement map[string]string - executed bool // Indicates if the query has been executed. + instanceLabel map[string]string // Mapping of counter path to key used for the label (e.g. processor.name) + measurement map[string]string // Mapping of counter path to key used for the value (e.g. processor.cpu_time). + executed bool // Indicates if the query has been executed. + log *logp.Logger } // NewPerfmonReader creates a new instance of PerfmonReader. @@ -327,6 +327,7 @@ func NewPerfmonReader(config Config) (*PerfmonReader, error) { query: query, instanceLabel: map[string]string{}, measurement: map[string]string{}, + log: logp.NewLogger("perfmon"), } for _, counter := range config.CounterConfig { @@ -339,13 +340,16 @@ func NewPerfmonReader(config Config) (*PerfmonReader, error) { } if err := query.AddCounter(counter.Query, format, counter.InstanceName); err != nil { if config.IgnoreNECounters { - if err == PDH_CSTATUS_NO_COUNTER { - logp.Info(`ignore non existent counter (path="%v")`, counter.Query) + switch err { + case PDH_CSTATUS_NO_COUNTER, PDH_CSTATUS_NO_COUNTERNAME, + PDH_CSTATUS_NO_INSTANCE, PDH_CSTATUS_NO_OBJECT: + r.log.Infow("Ignoring non existent counter", "error", err, + logp.Namespace("perfmon"), "query", counter.Query) continue } } query.Close() - return nil, errors.Wrapf(err, `failed to add counter (path="%v")`, counter.Query) + return nil, errors.Wrapf(err, `failed to add counter (query="%v")`, counter.Query) } r.instanceLabel[counter.Query] = counter.InstanceLabel @@ -356,9 +360,9 @@ func NewPerfmonReader(config Config) (*PerfmonReader, error) { return r, nil } -func (r *PerfmonReader) Read() ([]common.MapStr, error) { +func (r *PerfmonReader) Read() ([]mb.Event, error) { if err := r.query.Execute(); err != nil { - return nil, err + return nil, errors.Wrap(err, "failed querying counter values") } // Get the values. @@ -368,38 +372,37 @@ func (r *PerfmonReader) Read() ([]common.MapStr, error) { } // Write the values into the map. - result := make([]common.MapStr, 0, len(values)) - var errs multierror.Errors - - for counterPath, counter := range values { - for _, val := range counter { - ev := common.MapStr{} - instanceKey := r.instanceLabel[counterPath] - ev.Put(instanceKey, val.Instance) - measurementKey := r.measurement[counterPath] - ev.Put(measurementKey, val.Measurement) - - if val.Err != nil { - switch val.Err { - case PDH_CALC_NEGATIVE_DENOMINATOR: - case PDH_INVALID_DATA: - if r.executed { - errs = append(errs, errors.Wrapf(val.Err, "key=%v", measurementKey)) - } - default: - errs = append(errs, errors.Wrapf(val.Err, "key=%v", measurementKey)) - } + events := make([]mb.Event, 0, len(values)) + + for counterPath, values := range values { + for _, val := range values { + if val.Err != nil && !r.executed { + r.log.Debugw("Ignoring the first measurement because the data isn't ready", + "error", val.Err, logp.Namespace("perfmon"), "query", counterPath) + continue } - result = append(result, ev) - } - } + event := mb.Event{ + MetricSetFields: common.MapStr{}, + Error: errors.Wrapf(val.Err, "failed on query=%v", counterPath), + } + + if val.Instance != "" { + event.MetricSetFields.Put(r.instanceLabel[counterPath], val.Instance) + } - if !r.executed { - r.executed = true + if val.Measurement != nil { + event.MetricSetFields.Put(r.measurement[counterPath], val.Measurement) + } else { + event.MetricSetFields.Put(r.measurement[counterPath], 0) + } + + events = append(events, event) + } } - return result, errs.Err() + r.executed = true + return events, nil } func (e PdhErrno) Error() string { diff --git a/metricbeat/module/windows/perfmon/perfmon.go b/metricbeat/module/windows/perfmon/perfmon.go index 113549fece47..c7cef78c1d33 100644 --- a/metricbeat/module/windows/perfmon/perfmon.go +++ b/metricbeat/module/windows/perfmon/perfmon.go @@ -3,22 +3,21 @@ package perfmon import ( - "fmt" "strings" "github.com/pkg/errors" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" ) // CounterConfig for perfmon counters. type CounterConfig struct { - InstanceLabel string `config:"instance_label" validate:"required"` + InstanceLabel string `config:"instance_label" validate:"required"` InstanceName string `config:"instance_name"` MeasurementLabel string `config:"measurement_label" validate:"required"` - Query string `config:"query" validate:"required"` + Query string `config:"query" validate:"required"` Format string `config:"format"` } @@ -37,14 +36,14 @@ func init() { type MetricSet struct { mb.BaseMetricSet reader *PerfmonReader + log *logp.Logger } // New create a new instance of the MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Beta("The perfmon metricset is beta") - config := Config{} - + var config Config if err := base.Module().UnpackConfig(&config); err != nil { return nil, err } @@ -56,28 +55,34 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { value.Format = "float" case "float", "long": default: - err := fmt.Errorf("format '%s' for counter '%s' are not valid", value.Format, value.InstanceLabel) - return nil, errors.Wrap(err, "initialization failed") + return nil, errors.Errorf("initialization failed: format '%s' "+ + "for counter '%s' is invalid (must be float or long)", + value.Format, value.InstanceLabel) } } reader, err := NewPerfmonReader(config) if err != nil { - return nil, errors.Wrap(err, "initialization failed") + return nil, errors.Wrap(err, "initialization of reader failed") } return &MetricSet{ BaseMetricSet: base, reader: reader, + log: logp.NewLogger("perfmon"), }, nil } -func (m *MetricSet) Fetch() ([]common.MapStr, error) { - data, err := m.reader.Read() +func (m *MetricSet) Fetch(report mb.ReporterV2) { + events, err := m.reader.Read() if err != nil { - return nil, errors.Wrap(err, "failed reading counters") + m.log.Debugw("Failed reading counters", "error", err) + err = errors.Wrap(err, "failed reading counters") + report.Error(err) } - return data, nil + for _, event := range events { + report.Event(event) + } }