From 9b96d62882d054a33250497908976325fa7dd0c2 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 29 Oct 2019 04:56:10 -0700 Subject: [PATCH] Add `elasticsearch/enrich` metricset (#14243) * WIP: elasticsearch/enrich metricset * Adding non-xpack event creation * More changes * Adding docs.asciidoc * Running make update * Adding missing file * Adding data.json * Implement xpack.enabled: true event creation * Adding missing import * Remove license check as enrich is available in basic * Skip enrich integration test against ES versions < 7.5.0 * Skip system tests for enrich metricset with ES < 7.5.0 * Adding integration test * Updating data.json * Adding CHANGELOG entry * Adding system test * Fixing up system tests * Cleanup! * Only try to create enrich stats if enrich feature is available * Fixing skip logic --- CHANGELOG.next.asciidoc | 1 + metricbeat/docs/fields.asciidoc | 48 +++++++ .../docs/modules/elasticsearch.asciidoc | 4 + .../modules/elasticsearch/enrich.asciidoc | 21 +++ metricbeat/docs/modules_list.asciidoc | 3 +- metricbeat/include/list_common.go | 1 + .../elasticsearch/_meta/config-xpack.yml | 1 + .../module/elasticsearch/elasticsearch.go | 4 + .../elasticsearch_integration_test.go | 110 +++++++++++++++- .../elasticsearch/enrich/_meta/data.json | 38 ++++++ .../elasticsearch/enrich/_meta/docs.asciidoc | 3 + .../elasticsearch/enrich/_meta/fields.yml | 25 ++++ .../enrich/_meta/test/empty.750.json | 1 + .../enrich/_meta/test/enrich_stats.750.json | 28 ++++ .../enrich/_meta/test/ingest_pipeline.json | 13 ++ .../enrich/_meta/test/policy.json | 7 + .../enrich/_meta/test/source_doc.json | 10 ++ .../enrich/_meta/test/target_doc.json | 3 + .../module/elasticsearch/enrich/data.go | 92 ++++++++++++++ .../module/elasticsearch/enrich/data_test.go | 49 +++++++ .../module/elasticsearch/enrich/data_xpack.go | 76 +++++++++++ .../module/elasticsearch/enrich/enrich.go | 120 ++++++++++++++++++ metricbeat/module/elasticsearch/fields.go | 2 +- .../elasticsearch/test_elasticsearch.py | 102 +++++++++++++-- .../elasticsearch-xpack.yml.disabled | 1 + 25 files changed, 746 insertions(+), 17 deletions(-) create mode 100644 metricbeat/docs/modules/elasticsearch/enrich.asciidoc create mode 100644 metricbeat/module/elasticsearch/enrich/_meta/data.json create mode 100644 metricbeat/module/elasticsearch/enrich/_meta/docs.asciidoc create mode 100644 metricbeat/module/elasticsearch/enrich/_meta/fields.yml create mode 100644 metricbeat/module/elasticsearch/enrich/_meta/test/empty.750.json create mode 100644 metricbeat/module/elasticsearch/enrich/_meta/test/enrich_stats.750.json create mode 100644 metricbeat/module/elasticsearch/enrich/_meta/test/ingest_pipeline.json create mode 100644 metricbeat/module/elasticsearch/enrich/_meta/test/policy.json create mode 100644 metricbeat/module/elasticsearch/enrich/_meta/test/source_doc.json create mode 100644 metricbeat/module/elasticsearch/enrich/_meta/test/target_doc.json create mode 100644 metricbeat/module/elasticsearch/enrich/data.go create mode 100644 metricbeat/module/elasticsearch/enrich/data_test.go create mode 100644 metricbeat/module/elasticsearch/enrich/data_xpack.go create mode 100644 metricbeat/module/elasticsearch/enrich/enrich.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 91369e0a1db..49c64840ec8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -453,6 +453,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add ability to filter by tags for cloudwatch metricset. {pull}13758[13758] {issue}13145[13145] - Release cloudwatch, s3_daily_storage, s3_request, sqs and rds metricset as GA. {pull}14114[14114] {issue}14059[14059] - Add Oracle overview dashboard {pull}14021[14021] +- Add `elasticsearch/enrich` metricset. {pull}14243[14243] {issue}14221[14221] *Packetbeat* diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index ba3bc732339..b23c19fe6d8 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -11077,6 +11077,54 @@ type: long Memory used for fielddata. +type: long + +-- + +[float] +=== enrich + +Enrich stats + + + +*`elasticsearch.enrich.queue.size`*:: ++ +-- +Number of search requests in the queue. + + +type: long + +-- + + +*`elasticsearch.enrich.remote_requests.current`*:: ++ +-- +Current number of outstanding remote requests. + + +type: long + +-- + +*`elasticsearch.enrich.remote_requests.total`*:: ++ +-- +Number of outstanding remote requests executed since node startup. + + +type: long + +-- + +*`elasticsearch.enrich.executed_searches.total`*:: ++ +-- +Number of search requests that enrich processors have executed since node startup. + + type: long -- diff --git a/metricbeat/docs/modules/elasticsearch.asciidoc b/metricbeat/docs/modules/elasticsearch.asciidoc index 1226fcd0880..4c54d694034 100644 --- a/metricbeat/docs/modules/elasticsearch.asciidoc +++ b/metricbeat/docs/modules/elasticsearch.asciidoc @@ -66,6 +66,8 @@ The following metricsets are available: * <> +* <> + * <> * <> @@ -86,6 +88,8 @@ include::elasticsearch/ccr.asciidoc[] include::elasticsearch/cluster_stats.asciidoc[] +include::elasticsearch/enrich.asciidoc[] + include::elasticsearch/index.asciidoc[] include::elasticsearch/index_recovery.asciidoc[] diff --git a/metricbeat/docs/modules/elasticsearch/enrich.asciidoc b/metricbeat/docs/modules/elasticsearch/enrich.asciidoc new file mode 100644 index 00000000000..9e4f56ba18b --- /dev/null +++ b/metricbeat/docs/modules/elasticsearch/enrich.asciidoc @@ -0,0 +1,21 @@ +//// +This file is generated! See scripts/mage/docs_collector.go +//// + +[[metricbeat-metricset-elasticsearch-enrich]] +=== Elasticsearch enrich metricset + +include::../../../module/elasticsearch/enrich/_meta/docs.asciidoc[] + + +==== Fields + +For a description of each field in the metricset, see the +<> section. + +Here is an example document generated by this metricset: + +[source,json] +---- +include::../../../module/elasticsearch/enrich/_meta/data.json[] +---- diff --git a/metricbeat/docs/modules_list.asciidoc b/metricbeat/docs/modules_list.asciidoc index 49837f80067..a3bf2303aea 100644 --- a/metricbeat/docs/modules_list.asciidoc +++ b/metricbeat/docs/modules_list.asciidoc @@ -60,8 +60,9 @@ This file is generated! See scripts/mage/docs_collector.go |<> |image:./images/icon-no.png[No prebuilt dashboards] | .1+| .1+| |<> |<> |image:./images/icon-no.png[No prebuilt dashboards] | -.10+| .10+| |<> +.11+| .11+| |<> |<> +|<> |<> |<> |<> diff --git a/metricbeat/include/list_common.go b/metricbeat/include/list_common.go index 989ceb91ca3..b5158ec8380 100644 --- a/metricbeat/include/list_common.go +++ b/metricbeat/include/list_common.go @@ -54,6 +54,7 @@ import ( _ "github.com/elastic/beats/metricbeat/module/elasticsearch" _ "github.com/elastic/beats/metricbeat/module/elasticsearch/ccr" _ "github.com/elastic/beats/metricbeat/module/elasticsearch/cluster_stats" + _ "github.com/elastic/beats/metricbeat/module/elasticsearch/enrich" _ "github.com/elastic/beats/metricbeat/module/elasticsearch/index" _ "github.com/elastic/beats/metricbeat/module/elasticsearch/index_recovery" _ "github.com/elastic/beats/metricbeat/module/elasticsearch/index_summary" diff --git a/metricbeat/module/elasticsearch/_meta/config-xpack.yml b/metricbeat/module/elasticsearch/_meta/config-xpack.yml index 982fbf3bf1f..e31e4862dea 100644 --- a/metricbeat/module/elasticsearch/_meta/config-xpack.yml +++ b/metricbeat/module/elasticsearch/_meta/config-xpack.yml @@ -2,6 +2,7 @@ metricsets: - ccr - cluster_stats + - enrich - index - index_recovery - index_summary diff --git a/metricbeat/module/elasticsearch/elasticsearch.go b/metricbeat/module/elasticsearch/elasticsearch.go index a3d2b2ae39e..b8c905c64f2 100644 --- a/metricbeat/module/elasticsearch/elasticsearch.go +++ b/metricbeat/module/elasticsearch/elasticsearch.go @@ -67,6 +67,7 @@ func validateXPackMetricsets(base mb.BaseModule) error { expectedXPackMetricsets := []string{ "ccr", + "enrich", "cluster_stats", "index", "index_recovery", @@ -86,6 +87,9 @@ func validateXPackMetricsets(base mb.BaseModule) error { // CCRStatsAPIAvailableVersion is the version of Elasticsearch since when the CCR stats API is available. var CCRStatsAPIAvailableVersion = common.MustNewVersion("6.5.0") +// EnrichStatsAPIAvailableVersion is the version of Elasticsearch since when the Enrich stats API is available. +var EnrichStatsAPIAvailableVersion = common.MustNewVersion("7.5.0") + // Global clusterIdCache. Assumption is that the same node id never can belong to a different cluster id. var clusterIDCache = map[string]string{} diff --git a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go index 641dcb89458..f670a7bd615 100644 --- a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go +++ b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go @@ -39,6 +39,7 @@ import ( "github.com/elastic/beats/metricbeat/module/elasticsearch" _ "github.com/elastic/beats/metricbeat/module/elasticsearch/ccr" _ "github.com/elastic/beats/metricbeat/module/elasticsearch/cluster_stats" + _ "github.com/elastic/beats/metricbeat/module/elasticsearch/enrich" _ "github.com/elastic/beats/metricbeat/module/elasticsearch/index" _ "github.com/elastic/beats/metricbeat/module/elasticsearch/index_recovery" _ "github.com/elastic/beats/metricbeat/module/elasticsearch/index_summary" @@ -51,6 +52,7 @@ import ( var metricSets = []string{ "ccr", "cluster_stats", + "enrich", "index", "index_recovery", "index_summary", @@ -81,6 +83,9 @@ func TestFetch(t *testing.T) { err = createCCRStats(host) assert.NoError(t, err) + err = createEnrichStats(host) + assert.NoError(t, err) + for _, metricSet := range metricSets { checkSkip(t, metricSet, version) t.Run(metricSet, func(t *testing.T) { @@ -367,15 +372,98 @@ func checkExists(url string) bool { return false } -func checkSkip(t *testing.T, metricset string, version *common.Version) { - if metricset != "ccr" { - return +func createEnrichStats(host string) error { + err := createEnrichSourceIndex(host) + if err != nil { + return errors.Wrap(err, "error creating enrich source index") } - isCCRStatsAPIAvailable := elastic.IsFeatureAvailable(version, elasticsearch.CCRStatsAPIAvailableVersion) + err = createEnrichPolicy(host) + if err != nil { + return errors.Wrap(err, "error creating enrich policy") + } - if !isCCRStatsAPIAvailable { - t.Skip("elasticsearch CCR stats API is not available until " + elasticsearch.CCRStatsAPIAvailableVersion.String()) + err = executeEnrichPolicy(host) + if err != nil { + return errors.Wrap(err, "error executing enrich policy") + } + + err = createEnrichIngestPipeline(host) + if err != nil { + return errors.Wrap(err, "error creating ingest pipeline with enrich processor") + } + + err = ingestAndEnrichDoc(host) + if err != nil { + return errors.Wrap(err, "error ingesting doc for enrichment") + } + + return nil +} + +func createEnrichSourceIndex(host string) error { + sourceDoc, err := ioutil.ReadFile("enrich/_meta/test/source_doc.json") + if err != nil { + return err + } + + docURL := "/users/_doc/1?refresh=wait_for" + _, _, err = httpPutJSON(host, docURL, sourceDoc) + return err +} + +func createEnrichPolicy(host string) error { + policy, err := ioutil.ReadFile("enrich/_meta/test/policy.json") + if err != nil { + return err + } + + policyURL := "/_enrich/policy/users-policy" + _, _, err = httpPutJSON(host, policyURL, policy) + return err +} + +func executeEnrichPolicy(host string) error { + executeURL := "/_enrich/policy/users-policy/_execute" + _, _, err := httpPostJSON(host, executeURL, nil) + return err +} + +func createEnrichIngestPipeline(host string) error { + pipeline, err := ioutil.ReadFile("enrich/_meta/test/ingest_pipeline.json") + if err != nil { + return err + } + + pipelineURL := "/_ingest/pipeline/user_lookup" + _, _, err = httpPutJSON(host, pipelineURL, pipeline) + return err +} + +func ingestAndEnrichDoc(host string) error { + targetDoc, err := ioutil.ReadFile("enrich/_meta/test/target_doc.json") + if err != nil { + return err + } + + docURL := "/my_index/_doc/my_id?pipeline=user_lookup" + _, _, err = httpPutJSON(host, docURL, targetDoc) + return err +} + +func checkSkip(t *testing.T, metricset string, version *common.Version) { + checkSkipFeature := func(name string, availableVersion *common.Version) { + isAPIAvailable := elastic.IsFeatureAvailable(version, availableVersion) + if !isAPIAvailable { + t.Skipf("elasticsearch %s stats API is not available until %s", name, availableVersion) + } + } + + switch metricset { + case "ccr": + checkSkipFeature("CCR", elasticsearch.CCRStatsAPIAvailableVersion) + case "enrich": + checkSkipFeature("Enrich", elasticsearch.EnrichStatsAPIAvailableVersion) } } @@ -406,7 +494,15 @@ func getElasticsearchVersion(elasticsearchHostPort string) (*common.Version, err } func httpPutJSON(host, path string, body []byte) ([]byte, *http.Response, error) { - req, err := http.NewRequest("PUT", "http://"+host+path, bytes.NewReader(body)) + return httpSendJSON(host, path, "PUT", body) +} + +func httpPostJSON(host, path string, body []byte) ([]byte, *http.Response, error) { + return httpSendJSON(host, path, "POST", body) +} + +func httpSendJSON(host, path, method string, body []byte) ([]byte, *http.Response, error) { + req, err := http.NewRequest(method, "http://"+host+path, bytes.NewReader(body)) if err != nil { return nil, nil, err } diff --git a/metricbeat/module/elasticsearch/enrich/_meta/data.json b/metricbeat/module/elasticsearch/enrich/_meta/data.json new file mode 100644 index 00000000000..cc285111ea5 --- /dev/null +++ b/metricbeat/module/elasticsearch/enrich/_meta/data.json @@ -0,0 +1,38 @@ +{ + "@timestamp": "2017-10-12T08:05:34.853Z", + "elasticsearch": { + "cluster": { + "id": "et6blfihSoytMUvkpYtEKQ", + "name": "docker-cluster" + }, + "enrich": { + "executed_searches": { + "total": 1 + }, + "queue": { + "size": 0 + }, + "remote_requests": { + "current": 0, + "total": 1 + } + }, + "node": { + "id": "l_XOyQ65Teyn4kW4PUFjVg" + } + }, + "event": { + "dataset": "elasticsearch.enrich", + "duration": 115000, + "module": "elasticsearch" + }, + "metricset": { + "name": "enrich", + "period": 10000 + }, + "service": { + "address": "localhost:32780", + "name": "elasticsearch", + "type": "elasticsearch" + } +} \ No newline at end of file diff --git a/metricbeat/module/elasticsearch/enrich/_meta/docs.asciidoc b/metricbeat/module/elasticsearch/enrich/_meta/docs.asciidoc new file mode 100644 index 00000000000..ab604a715e4 --- /dev/null +++ b/metricbeat/module/elasticsearch/enrich/_meta/docs.asciidoc @@ -0,0 +1,3 @@ +This is the `enrich` metricset of the Elasticsearch module. It interrogates the +Enrich Stats API endpoint to fetch information about Enrich coordinator nodes +in the Elasticsearch cluster that are participating in ingest-time enrichment. diff --git a/metricbeat/module/elasticsearch/enrich/_meta/fields.yml b/metricbeat/module/elasticsearch/enrich/_meta/fields.yml new file mode 100644 index 00000000000..4b42a113992 --- /dev/null +++ b/metricbeat/module/elasticsearch/enrich/_meta/fields.yml @@ -0,0 +1,25 @@ +- name: enrich + type: group + description: > + Enrich stats + release: ga + fields: + - name: queue.size + type: long + description: > + Number of search requests in the queue. + - name: remote_requests + type: group + fields: + - name: current + type: long + description: > + Current number of outstanding remote requests. + - name: total + type: long + description: > + Number of outstanding remote requests executed since node startup. + - name: executed_searches.total + type: long + description: > + Number of search requests that enrich processors have executed since node startup. diff --git a/metricbeat/module/elasticsearch/enrich/_meta/test/empty.750.json b/metricbeat/module/elasticsearch/enrich/_meta/test/empty.750.json new file mode 100644 index 00000000000..0967ef424bc --- /dev/null +++ b/metricbeat/module/elasticsearch/enrich/_meta/test/empty.750.json @@ -0,0 +1 @@ +{} diff --git a/metricbeat/module/elasticsearch/enrich/_meta/test/enrich_stats.750.json b/metricbeat/module/elasticsearch/enrich/_meta/test/enrich_stats.750.json new file mode 100644 index 00000000000..061b7385c58 --- /dev/null +++ b/metricbeat/module/elasticsearch/enrich/_meta/test/enrich_stats.750.json @@ -0,0 +1,28 @@ +{ + "executing_policies": [ + { + "name": "my-policy", + "task": { + "id" : 124, + "type" : "direct", + "action" : "cluster:admin/xpack/enrich/execute", + "start_time_in_millis" : 1458585884904, + "running_time_in_nanos" : 47402, + "cancellable" : false, + "parent_task_id" : "oTUltX4IQMOUUVeiohTt8A:123", + "headers" : { + "X-Opaque-Id" : "123456" + } + } + } + ], + "coordinator_stats": [ + { + "node_id": "1sFM8cmSROZYhPxVsiWew", + "queue_size": 0, + "remote_requests_current": 0, + "remote_requests_total": 0, + "executed_searches_total": 0 + } + ] +} diff --git a/metricbeat/module/elasticsearch/enrich/_meta/test/ingest_pipeline.json b/metricbeat/module/elasticsearch/enrich/_meta/test/ingest_pipeline.json new file mode 100644 index 00000000000..19be19b58ce --- /dev/null +++ b/metricbeat/module/elasticsearch/enrich/_meta/test/ingest_pipeline.json @@ -0,0 +1,13 @@ +{ + "description" : "Enriching user details to messages", + "processors" : [ + { + "enrich" : { + "policy_name": "users-policy", + "field" : "email", + "target_field": "user", + "max_matches": "1" + } + } + ] +} diff --git a/metricbeat/module/elasticsearch/enrich/_meta/test/policy.json b/metricbeat/module/elasticsearch/enrich/_meta/test/policy.json new file mode 100644 index 00000000000..d2459c35506 --- /dev/null +++ b/metricbeat/module/elasticsearch/enrich/_meta/test/policy.json @@ -0,0 +1,7 @@ +{ + "match": { + "indices": "users", + "match_field": "email", + "enrich_fields": ["first_name", "last_name", "city", "zip", "state"] + } +} diff --git a/metricbeat/module/elasticsearch/enrich/_meta/test/source_doc.json b/metricbeat/module/elasticsearch/enrich/_meta/test/source_doc.json new file mode 100644 index 00000000000..11f73ce60fe --- /dev/null +++ b/metricbeat/module/elasticsearch/enrich/_meta/test/source_doc.json @@ -0,0 +1,10 @@ +{ + "email": "mardy.brown@asciidocsmith.com", + "first_name": "Mardy", + "last_name": "Brown", + "city": "New Orleans", + "county": "Orleans", + "state": "LA", + "zip": 70116, + "web": "mardy.asciidocsmith.com" +} diff --git a/metricbeat/module/elasticsearch/enrich/_meta/test/target_doc.json b/metricbeat/module/elasticsearch/enrich/_meta/test/target_doc.json new file mode 100644 index 00000000000..45f673d8c69 --- /dev/null +++ b/metricbeat/module/elasticsearch/enrich/_meta/test/target_doc.json @@ -0,0 +1,3 @@ +{ + "email": "mardy.brown@asciidocsmith.com" +} diff --git a/metricbeat/module/elasticsearch/enrich/data.go b/metricbeat/module/elasticsearch/enrich/data.go new file mode 100644 index 00000000000..cc4fb5a24ea --- /dev/null +++ b/metricbeat/module/elasticsearch/enrich/data.go @@ -0,0 +1,92 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package enrich + +import ( + "encoding/json" + + "github.com/joeshaw/multierror" + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" + s "github.com/elastic/beats/libbeat/common/schema" + c "github.com/elastic/beats/libbeat/common/schema/mapstriface" + "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/beats/metricbeat/module/elasticsearch" +) + +var ( + schema = s.Schema{ + "node_id": c.Str("node_id"), + "queue": s.Object{ + "size": c.Int("queue_size"), + }, + "remote_requests": s.Object{ + "current": c.Int("remote_requests_current"), + "total": c.Int("remote_requests_total"), + }, + "executed_searches": s.Object{ + "total": c.Int("executed_searches_total"), + }, + } +) + +type response struct { + ExecutingPolicies []map[string]interface{} `json:"executing_policies"` + CoordinatorStats []map[string]interface{} `json:"coordinator_stats"` +} + +func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) error { + var data response + err := json.Unmarshal(content, &data) + if err != nil { + return errors.Wrap(err, "failure parsing Elasticsearch Enrich Stats API response") + } + + var errs multierror.Errors + for _, stat := range data.CoordinatorStats { + + event := mb.Event{} + event.RootFields = common.MapStr{} + event.RootFields.Put("service.name", elasticsearch.ModuleName) + + event.ModuleFields = common.MapStr{} + event.ModuleFields.Put("cluster.name", info.ClusterName) + event.ModuleFields.Put("cluster.id", info.ClusterID) + + fields, err := schema.Apply(stat) + if err != nil { + errs = append(errs, errors.Wrap(err, "failure applying enrich coordinator stats schema")) + continue + } + + nodeID, err := fields.GetValue("node_id") + if err != nil { + errs = append(errs, errors.Wrap(err, "failure retrieving node ID from Elasticsearch Enrich Stats API response")) + } + + event.ModuleFields.Put("node.id", nodeID) + fields.Delete("node_id") + + event.MetricSetFields = fields + + r.Event(event) + } + + return errs.Err() +} diff --git a/metricbeat/module/elasticsearch/enrich/data_test.go b/metricbeat/module/elasticsearch/enrich/data_test.go new file mode 100644 index 00000000000..08b7a299b1e --- /dev/null +++ b/metricbeat/module/elasticsearch/enrich/data_test.go @@ -0,0 +1,49 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !integration + +package enrich + +import ( + "io/ioutil" + "testing" + + "github.com/stretchr/testify/assert" + + mbtest "github.com/elastic/beats/metricbeat/mb/testing" + "github.com/elastic/beats/metricbeat/module/elasticsearch" +) + +var info = elasticsearch.Info{ + ClusterID: "1234", + ClusterName: "helloworld", +} + +func TestMapper(t *testing.T) { + elasticsearch.TestMapperWithInfo(t, "./_meta/test/enrich_stats.*.json", eventsMapping) +} + +func TestEmpty(t *testing.T) { + input, err := ioutil.ReadFile("./_meta/test/empty.750.json") + assert.NoError(t, err) + + reporter := &mbtest.CapturingReporterV2{} + eventsMapping(reporter, info, input) + assert.Equal(t, 0, len(reporter.GetErrors())) + assert.Equal(t, 0, len(reporter.GetEvents())) +} diff --git a/metricbeat/module/elasticsearch/enrich/data_xpack.go b/metricbeat/module/elasticsearch/enrich/data_xpack.go new file mode 100644 index 00000000000..6b3029f4361 --- /dev/null +++ b/metricbeat/module/elasticsearch/enrich/data_xpack.go @@ -0,0 +1,76 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package enrich + +import ( + "encoding/json" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/helper/elastic" + "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/beats/metricbeat/module/elasticsearch" +) + +func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error { + var data response + err := json.Unmarshal(content, &data) + if err != nil { + return errors.Wrap(err, "failure parsing Elasticsearch Enrich Stats API response") + } + + now := common.Time(time.Now()) + intervalMS := m.Module().Config().Period / time.Millisecond + index := elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch) + + indexExecutingPolicies(r, data, info, now, intervalMS, index) + indexCoordinatorStats(r, data, info, now, intervalMS, index) + return nil +} + +func indexExecutingPolicies(r mb.ReporterV2, enrichData response, esInfo elasticsearch.Info, now common.Time, intervalMS time.Duration, indexName string) { + for _, stat := range enrichData.ExecutingPolicies { + event := mb.Event{} + event.RootFields = common.MapStr{ + "cluster_uuid": esInfo.ClusterID, + "timestamp": now, + "interval_ms": intervalMS, + "type": "enrich_executing_policy_stats", + "enrich_executing_policy_stats": stat, + } + event.Index = indexName + r.Event(event) + } +} + +func indexCoordinatorStats(r mb.ReporterV2, enrichData response, esInfo elasticsearch.Info, now common.Time, intervalMS time.Duration, indexName string) { + for _, stat := range enrichData.CoordinatorStats { + event := mb.Event{} + event.RootFields = common.MapStr{ + "cluster_uuid": esInfo.ClusterID, + "timestamp": now, + "interval_ms": intervalMS, + "type": "enrich_coordinator_stats", + "enrich_coordinator_stats": stat, + } + event.Index = indexName + r.Event(event) + } +} diff --git a/metricbeat/module/elasticsearch/enrich/enrich.go b/metricbeat/module/elasticsearch/enrich/enrich.go new file mode 100644 index 00000000000..d2acf780b9a --- /dev/null +++ b/metricbeat/module/elasticsearch/enrich/enrich.go @@ -0,0 +1,120 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package enrich + +import ( + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/helper/elastic" + "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/beats/metricbeat/module/elasticsearch" +) + +func init() { + mb.Registry.MustAddMetricSet(elasticsearch.ModuleName, "enrich", New, + mb.WithHostParser(elasticsearch.HostParser), + ) +} + +const ( + enrichStatsPath = "/_enrich/_stats" +) + +// MetricSet type defines all fields of the MetricSet +type MetricSet struct { + *elasticsearch.MetricSet + lastLicenseMessageTimestamp time.Time +} + +// New create a new instance of the MetricSet +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + ms, err := elasticsearch.NewMetricSet(base, enrichStatsPath) + if err != nil { + return nil, err + } + return &MetricSet{MetricSet: ms}, nil +} + +// Fetch gathers stats for each enrich coordinator node +func (m *MetricSet) Fetch(r mb.ReporterV2) error { + isMaster, err := elasticsearch.IsMaster(m.HTTP, m.GetServiceURI()) + if err != nil { + return errors.Wrap(err, "error determining if connected Elasticsearch node is master") + } + + // Not master, no event sent + if !isMaster { + m.Logger().Debug("trying to fetch enrich stats from a non-master node") + return nil + } + + info, err := elasticsearch.GetInfo(m.HTTP, m.GetServiceURI()) + if err != nil { + return err + } + + enrichUnavailableMessage, err := m.checkEnrichAvailability(info.Version.Number) + if err != nil { + return errors.Wrap(err, "error determining if Enrich is available") + } + + if enrichUnavailableMessage != "" { + if time.Since(m.lastLicenseMessageTimestamp) > 10*time.Minute { + m.lastLicenseMessageTimestamp = time.Now() + m.Logger().Debug(enrichUnavailableMessage) + } + return nil + } + + content, err := m.HTTP.FetchContent() + if err != nil { + return err + } + + if m.XPack { + err = eventsMappingXPack(r, m, *info, content) + if err != nil { + // Since this is an x-pack code path, we log the error but don't + // return it. Otherwise it would get reported into `metricbeat-*` + // indices. + m.Logger().Error(err) + return nil + } + } else { + return eventsMapping(r, *info, content) + } + + return nil +} + +func (m *MetricSet) checkEnrichAvailability(currentElasticsearchVersion *common.Version) (message string, err error) { + isAvailable := elastic.IsFeatureAvailable(currentElasticsearchVersion, elasticsearch.EnrichStatsAPIAvailableVersion) + + if !isAvailable { + metricsetName := m.FullyQualifiedName() + message = "the " + metricsetName + " is only supported with Elasticsearch >= " + + elasticsearch.EnrichStatsAPIAvailableVersion.String() + ". " + + "You are currently running Elasticsearch " + currentElasticsearchVersion.String() + "." + return + } + + return "", nil +} diff --git a/metricbeat/module/elasticsearch/fields.go b/metricbeat/module/elasticsearch/fields.go index 90b49186eb6..c107604bfb3 100644 --- a/metricbeat/module/elasticsearch/fields.go +++ b/metricbeat/module/elasticsearch/fields.go @@ -32,5 +32,5 @@ func init() { // AssetElasticsearch returns asset data. // This is the base64 encoded gzipped contents of ../metricbeat/module/elasticsearch. func AssetElasticsearch() string { - return "eJzsXN1u2zYUvvdTHOSqBRI9gC92061dBjQr1nTAMAwqLR7bTPijkJQT7+kHUpIjS5QlW3LsYvZdLec73/nlOSTVG3jE9RSQE2NZYpDoZDkBsMxynMLVL9XvryYAFE2iWWqZklP4aQIAsPUbEIpmHCcAGjkSg1NYkAmAQWuZXJgp/H1lDL+6hqultenVP+7ZUmkbJ0rO2WIKc8KN+/s5Q07N1Iu4AUkENmm6j12nTohWWVp8E+C4DVeFTHhmLOrI/WvzsER9xPWz0rTyfRA7/2zbocD1UqJJq1hGjyGU0R0ijSUWRxbsMcNipaLDpd0pinD7cwv6cN95/NxVDbMluoFdjbYO5A9aGXNTOkZjyllC3A+9zUzlt9sJU37qcVulxpFQ1FuP2hi2QVXhmKT40njabs8e2pefOyIQ1Lxg3CKp5CHIS2zwKZaqlQxXcnEYk8/khYlMgMGnDGWCIDMxQ+3IqRR17holwS6xZGuWpKZ3yXSuOFfPP5YLSs4dTvBKR7lxjuCGu43VnaG9MHhmdslyy+/mtnGUiZ81sxblURm+ist5IYV3ZSIjfQ9MWuVZb0yb6zPXSuyOo6pSlgmMDZMJxq60xhoJjYQ5gmb3TOA1MAnCXIOXuM3eiYc52mSJDSVa6S+4mhEeJ0tMHlPFpD0C8U9eBrzKgBXhGbp03Tb9zrXPDCrnRSEfXrwdQlZ376487zBQlVlm4N1CI8prWKMzzDVopO+jIBG3hIZ5hCpYBwu3kBrPgfn2IOpVADduUlkgcHaETWfQ3CtLeKXGe2Vd7JcR0cJEEPd0VCqvBSUHv0HOFmzGsTcpSiw5EiUH3cGjskaxZLyIuc3hzjhmCoV7OMgXn2bJbrdNDzZf/VrSbp3upiJsoRHWkZqdcu13mOmVUqqZIJo1ougYtHJZ6256m67O2dMlRCRQKL2OZmsbYDokxD57YMiMW2GVrohsLF31VmjvJasOsP9SVZuvYNhCdev41IatqjTr/DdaR01VYqK3ygCqkkygtD7KNn1sexp4chQ5Wgx19aPTyyXtTdNYpTEy7F9sSYUOqnOlBbFTaPvj3qo4CuXM4Dk7BTzqDvK48Lq+WRDkxDZiu4ntrDJvZdpX/qJSm2brfD4rqAbMHa5XkcZErVCvT1u4WD2rWm3ZYad8ES6V8htdIXkOfrxCWRPqkMJiixUuKHmmFEdSH5A7JN/rDIHVls6wbGPJYkSd/yi19biBMKvKtkQv0EYtXj5I/r2H9J1wu5dzsUtl6uVkJMEOGQilGo2Bd4nKOIUZwu2XzZdK+x85Pi1TXUFy3KW7SnJ7AQ/Hhsp0gqP656uH3O2fQuy4/qkKHsM/Bclx/VMlGd7NzquzyUStYJygOLfNAJdebxR6l17v0ut18t+v14PLlHbJ3Evm/oCZu9nW5tGDmg1Z9wU/xkR2ULfzTbKnDEFweFCz9m7QEjtii/WbmuWQYWmUWBL7GDZRqlWCxiCN3QynaRyK7UMn0i8leL51j6tmGIc4MbkinNGYEouj8rlfVo/Sc4WNP9IFZHaJGggIZgyTC0cI8yhxjTLJ/22XxELeS0tlXT+dEm2QBmaMRli7hndIUNf+fv+wXqE2TNWn7AFh5i+kFKhhrz6sRO/ltyum//wMt3Ku+h28dGndpXkPQiWpoAGqDIqivESSRkwye7Ly/CuSFByDrYrsdOhe+6pKCPJyWh0EeTlcBank6V1xp+TNCO4odTmlRzaq9PfK63Tt14dIcJU8Eh5u1Q/aHLydl+DgsJH6qyS50YKVefitC4cSD79yMfLJta/RrHJ8XUc+q6kHX5ixfvUt54rznXjOaE740SaEcvjqGONhxMHxoMHQC+2oYQ8r4YwapUrx0bLWlc5ieHK4ByWu4u1pEb7m0ctKv3PapNb0Wzs/2L5F2+pO6HIp9AjJulItP/FXbttCsEo4RfJ4Joy/IHnsSzk+H0N72qKftV03cSa0v+WNzc4atVZZkMjgnPvLAV+y7hwYX7Lu3LLOZHrFVqr9BYABife1wL7k3jkwvuTeqXOvtQNeJFGiOMfEKj1aF/zpA2xAw1nXowcuee3aAxzYDr9KgEVyaGFomxWhj9N7EN0UoIMjq0PIVhd0ZKvnDdH/2+7BXJybwL0ZGJKEHxlHMGtjUUAYuisJ/Sn8SXYcNlbReJrz5pIAWRHGyYy/LYv6i2UpSsrkIrbEPE7qovfY6fweAvwOiZKWMGmAQPEA3IMqUjVJD9kaNahtrHTbC8X7nwjeekhoQlYugCnNbDidDjkRDcBtX7cLShpw0y6Cj0oDvhCRcqdQZm8ESVNWo771iieT8VOGGTbf7Tz46JUJv5PmYRsRWn/rdO+QLF5m9cEzKMaOdk3ZLpkBZvy+Yo8ry8GXmsc59/ZMdt+WHvMywr3fSCUW+8jWyFVCrCsqof81YQQq/t5p5X1uYkqhxYvE0eS/AAAA//89GKel" + return "eJzsXN1u47oRvvdTDHK1B0j0AL7ozfacbQpsuuhmCxRFoaWlsc2EPwpJOXafviAp2bJMWbIkx14c+y6W9c03v5yhqDzAK26mgIxoQxONRCXLCYChhuEU7n6vfn83AUhRJ4pmhkoxhb9MAAD2fgNcpjnDCYBChkTjFBZkAqDRGCoWegr/udOa3d3D3dKY7O6/9tpSKhMnUszpYgpzwrS9f06RpXrqRDyAIBwPadqP2WRWiJJ5VnwT4LgPV4VMWK4Nqsj+tb1Yor7i5l2qtPJ9ENt/9u1Q4Dop0aRRLE3PIZSmR0RqQwyOLNhhhsUKmQ6X9iRThMe/NqAP953D9646MFuiDrCr0daC/FlJrR9KxyjMGE2I/aGzma78dj9hyk89bqvUGJIU1d6lJoZNUFU4KlJcH1xttmcH7cvPE+EIcl4wbpBU8uBkHWt8i4VsJMOkWPRj8pWsKc85aHzLUSQIIuczVJaczFB510gBZoklW70kNb1LpnPJmHz/tVxQcm5xglM68sY5gxuetla3hnbC4J2aJfWWP85t6ygdvytqDIqzMtyJ87wwhU9lImP6G1BhpGO9Na3XZ64kPx5HVaUM5RhrKhKMbWmNFZI04voMmj1TjvdABXB9D07iPnsrHuZokiUeKNFIf8HkjLA4WWLymkkqzBmIf3EyYCcDVoTlaNN13/RH1z49qJwXhXx48bYIed29x/K8xUBVZrmGTwuFKO5hg9Yw96Aw/S0KErFLaJhHqIK1sLALqXYcqGsPok4FcOsmmQcC50jYtAbNszSEVWq8U9bGfhkRDUw4sVdHpbIrKB78ARld0BnDzqRSYsiZKFnoFh6VNYom40XMo4e74pgpFO7gIFd8Dkt2s206sPnu1pJm67Q3FWELjbCO1OzktT9iph2lTFFOFD2IonPQ8rI27fS2XZ21p02IiCOXahPNNibAdEiIfXXAkGu7wkpVEXmwdKFQdDvsQp8163eHMMKS9ZZjjpGm/8Ng8gdM0bZa7GLHz5HKduXaOC/ZpsNLDJJRyKXBuLxjtBY8yZXCs+TLZ49cnThyow0RKRWLQp+tBZpzx9j4Pm/D20wLcI1JbjAtGke7ZNjIUibPwn4qb4i9i1FHIQXGjyCzJKZIHsiUTFBrqTQsyQq7KdE0k52cf3WA0xOvtsEBwzrFR8unttsBLQHWP59Smejoo5agVCY5R7ErIM72zbnkyKXI0GBorB6dnpd0Mk1tpPKFt2EtaqE6l4oTM4WmmzurYimUQ7vjbBVwqEfI48Lp+mFB4IltxbYTO7rMf5Rpd/x5pTmYbfwGSUE1YO5wvYoUJnKFanPZwkXrWdW3zvsuuFTK7TSH5Fn48QplTahFCostWsyg5JmUDEl9h6pF8rPKEWitdw3L1oYsRtT5n6W2DjcQZlXZhqgFmqjBy73kPztIvyQ3etmLXUpdLycjCbbIQNJUodbwKZE5S2GG8Pht+6VU7keWT8O2SkFy3KW7SnJ/AQ/HhsxVgqP657uDPO6fQuy4/qkKHsM/Bclx/VMlGX6c5KuzznmtYFygODcN4bdebxR6t17v1uu18j+t14PblHbL3Fvm/oKZu32uxKIXORuy7nN2jomsV7fzQ9C3HIEzeJGz5m7QEDNii/V3OfOQYWkpMSR2MayjYtsP09jOcCqNQ7HddyL9VoL7Z2e4OgzjECcqVoTRNE6JwVH5PC+rZ1m8wtqdqQCkZokKCHCqNRULSwh9lNhGmfi/3Yap76WFNLafzojSmAZmjIOwtg3vkKCu3X96WK9QaSrrU/aAMHMnwgrUsFdfVrzz8tsW0//6Co9iLrs9+WzTuk3zDoRKUkEDVBkURXmJJIuooOZi5flvSDKwDPYqstWhfe2rKsHJ+rI6cLLur4KQ4vKueJLiYQR3lLpc0iNbVbp7ZTddu/Uh4kwmr4SFW/Vem4OP8xIcLDam7iyXN1qwMg8/9mRR4uEPkEc+OuJqNK2cH6kjX9XUg2uqjVt9y7nieieeK5oTfrUJoRy+WsZ4GHFw7DUYOqEtNexlxa1Ro0xKNlrW2tJZDE8Wt1fiStacFuFzVp2s9A+WHlI79FszP9g/xt7oTmhzKXQIybpSDT9xZ96bQrBKOEPyeiWMvyF57Uo5vh5DO9q8m7VtN3EltH/4xuZojdrIPEhkcM792wLfsu4aGN+y7tqyTudqRVey+Q2cAYn3vcC+5d41ML7l3qVzr7EDXiRRIhnDxEg1Whf85TNsQcNZ16EHLnkd2wMc2A7vJMAi6VsYmmZF6OL0DkS3Bah3ZLUI2euCzmx13xD9ue0ezMW5DpybgSFJ+AdlCHqjDXIIQ7cloXsKf5Edh61VFF7meXNJgKwIZWTGPpZF/c3ODN37CrEh+nVSF33CTufPEOBPSKQwhAoNBIoLYC9UkapJ2mdrVKMysVRNb/Sf/kTw0UHCIWTlAJhU1ITTqc8T0QDc/nG7oKQBJ+0i+EMqwDXhGbMK5eaBkyyjNep771hTEfvXig6qVu9Hr5S7nTQHexCh9de+Tw7J4m1yFzyDYuxsx5TNkmqg2u0rdjiyHPyvAuM893ZMjp+WHvMwwrPbSCUGu8hWyGRCjC0qoX9bMgIVd+608g8ViC6FFm/yR5P/BwAA//+sJJat" } diff --git a/metricbeat/module/elasticsearch/test_elasticsearch.py b/metricbeat/module/elasticsearch/test_elasticsearch.py index af875ececca..0f89f6511b2 100644 --- a/metricbeat/module/elasticsearch/test_elasticsearch.py +++ b/metricbeat/module/elasticsearch/test_elasticsearch.py @@ -39,10 +39,14 @@ def tearDown(self): self.ccr_unfollow_index() self.es.indices.delete(index='test_index,pied_piper,rats', ignore_unavailable=True) self.delete_ml_job() + self.delete_enrich_ingest_pipeline() + self.delete_enrich_policy() + self.es.indices.delete(index='users,my_index', ignore_unavailable=True) super(Test, self).tearDown() @parameterized.expand([ "ccr", + "enrich", "index", "index_summary", "ml_job", @@ -62,6 +66,8 @@ def test_metricsets(self, metricset): self.create_ml_job() if metricset == "ccr": self.create_ccr_stats() + if metricset == "enrich": + self.create_enrich_stats() self.check_metricset("elasticsearch", metricset, self.get_hosts(), self.FIELDS + ["service"], extras={"index_recovery.active_only": "false"}) @@ -71,16 +77,19 @@ def test_xpack(self): """ elasticsearch-xpack module tests """ - es = Elasticsearch(self.get_hosts()) self.create_ml_job() - self.create_ccr_stats() + if self.is_ccr_available(): + self.create_ccr_stats() + if self.is_enrich_available(): + self.create_enrich_stats() self.render_config_template(modules=[{ "name": "elasticsearch", "metricsets": [ "ccr", "cluster_stats", + "enrich", "index", "index_recovery", "index_summary", @@ -167,6 +176,72 @@ def ccr_unfollow_index(self): self.es.indices.close('rats') self.es.transport.perform_request('POST', '/rats/_ccr/unfollow') + def create_enrich_stats(self): + self.create_enrich_source_index() + self.create_enrich_policy() + self.execute_enrich_policy() + self.create_enrich_ingest_pipeline() + self.ingest_and_enrich_doc() + + def create_enrich_source_index(self): + file = os.path.join(self.beat_path, 'module', 'elasticsearch', 'enrich', + '_meta', 'test', 'source_doc.json') + + source_doc = {} + with open(file, 'r') as f: + source_doc = json.load(f) + + self.es.index(index='users', id='1', doc_type='_doc', body=source_doc, refresh='wait_for') + + def create_enrich_policy(self): + file = os.path.join(self.beat_path, 'module', 'elasticsearch', 'enrich', + '_meta', 'test', 'policy.json') + + policy = {} + with open(file, 'r') as f: + policy = json.load(f) + + policy_url = '/_enrich/policy/users-policy' + self.es.transport.perform_request(method='PUT', url=policy_url, body=policy) + + def execute_enrich_policy(self): + execute_url = '/_enrich/policy/users-policy/_execute' + self.es.transport.perform_request('POST', execute_url) + + def create_enrich_ingest_pipeline(self): + file = os.path.join(self.beat_path, 'module', 'elasticsearch', 'enrich', + '_meta', 'test', 'ingest_pipeline.json') + + pipeline = {} + with open(file, 'r') as f: + pipeline = json.load(f) + + self.es.ingest.put_pipeline(id='user_lookup', body=pipeline) + + def ingest_and_enrich_doc(self): + file = os.path.join(self.beat_path, 'module', 'elasticsearch', 'enrich', + '_meta', 'test', 'target_doc.json') + + target_doc = {} + with open(file, 'r') as f: + target_doc = json.load(f) + + self.es.index(index='my_index', id='my_id', doc_type='_doc', body=target_doc, pipeline='user_lookup') + + def delete_enrich_policy(self): + exists = self.es.indices.exists('my_index') + if not exists: + return + + self.es.transport.perform_request('DELETE', '/_enrich/policy/users-policy') + + def delete_enrich_ingest_pipeline(self): + exists = self.es.indices.exists('my_index') + if not exists: + return + + self.es.ingest.delete_pipeline(id='user_lookup') + def start_trial(self): # Check if trial is already enabled response = self.es.transport.perform_request('GET', self.license_url) @@ -181,14 +256,25 @@ def start_trial(self): print "Trial already enabled. Error: {}".format(e) def check_skip(self, metricset): - if metricset != "ccr": - return + if metricset == 'ccr' and not self.is_ccr_available(): + raise SkipTest("elasticsearch/ccr metricset system test only valid with Elasticsearch versions >= 6.5.0") + if metricset == 'enrich' and not self.is_enrich_available(): + raise SkipTest("elasticsearch/enrich metricset system test only valid with Elasticsearch versions >= 7.5.0") + + def is_ccr_available(self): es_version = self.get_version() - if es_version["major"] <= 6 and es_version["minor"] < 5: - # Skip CCR metricset system test for Elasticsearch versions < 6.5.0 as CCR Stats - # API endpoint is not available - raise SkipTest("elasticsearch/ccr metricset system test only valid with Elasticsearch versions >= 6.5.0") + major = es_version["major"] + minor = es_version["minor"] + + return major > 6 or (major == 6 and minor >= 5) + + def is_enrich_available(self): + es_version = self.get_version() + major = es_version["major"] + minor = es_version["minor"] + + return major > 7 or (major == 7 and minor >= 5) def get_version(self): es_info = self.es.info() diff --git a/metricbeat/modules.d/elasticsearch-xpack.yml.disabled b/metricbeat/modules.d/elasticsearch-xpack.yml.disabled index b542e7f1247..ed428acb1e3 100644 --- a/metricbeat/modules.d/elasticsearch-xpack.yml.disabled +++ b/metricbeat/modules.d/elasticsearch-xpack.yml.disabled @@ -5,6 +5,7 @@ metricsets: - ccr - cluster_stats + - enrich - index - index_recovery - index_summary