Skip to content

Commit

Permalink
Add elasticsearch/enrich metricset (#14243)
Browse files Browse the repository at this point in the history
* WIP: elasticsearch/enrich metricset

* Adding non-xpack event creation

* More changes

* Adding docs.asciidoc

* Running make update

* Adding missing file

* Adding data.json

* Implement xpack.enabled: true event creation

* Adding missing import

* Remove license check as enrich is available in basic

* Skip enrich integration test against ES versions < 7.5.0

* Skip system tests for enrich metricset with ES < 7.5.0

* Adding integration test

* Updating data.json

* Adding CHANGELOG entry

* Adding system test

* Fixing up system tests

* Cleanup!

* Only try to create enrich stats if enrich feature is available

* Fixing skip logic
  • Loading branch information
ycombinator authored Oct 29, 2019
1 parent 6b6408f commit 9b96d62
Show file tree
Hide file tree
Showing 25 changed files with 746 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add ability to filter by tags for cloudwatch metricset. {pull}13758[13758] {issue}13145[13145]
- Release cloudwatch, s3_daily_storage, s3_request, sqs and rds metricset as GA. {pull}14114[14114] {issue}14059[14059]
- Add Oracle overview dashboard {pull}14021[14021]
- Add `elasticsearch/enrich` metricset. {pull}14243[14243] {issue}14221[14221]

*Packetbeat*

Expand Down
48 changes: 48 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -11077,6 +11077,54 @@ type: long
Memory used for fielddata.
type: long
--
[float]
=== enrich
Enrich stats
*`elasticsearch.enrich.queue.size`*::
+
--
Number of search requests in the queue.
type: long
--
*`elasticsearch.enrich.remote_requests.current`*::
+
--
Current number of outstanding remote requests.
type: long
--
*`elasticsearch.enrich.remote_requests.total`*::
+
--
Number of outstanding remote requests executed since node startup.
type: long
--
*`elasticsearch.enrich.executed_searches.total`*::
+
--
Number of search requests that enrich processors have executed since node startup.
type: long
--
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/docs/modules/elasticsearch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ The following metricsets are available:

* <<metricbeat-metricset-elasticsearch-cluster_stats,cluster_stats>>

* <<metricbeat-metricset-elasticsearch-enrich,enrich>>

* <<metricbeat-metricset-elasticsearch-index,index>>

* <<metricbeat-metricset-elasticsearch-index_recovery,index_recovery>>
Expand All @@ -86,6 +88,8 @@ include::elasticsearch/ccr.asciidoc[]

include::elasticsearch/cluster_stats.asciidoc[]

include::elasticsearch/enrich.asciidoc[]

include::elasticsearch/index.asciidoc[]

include::elasticsearch/index_recovery.asciidoc[]
Expand Down
21 changes: 21 additions & 0 deletions metricbeat/docs/modules/elasticsearch/enrich.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
////
This file is generated! See scripts/mage/docs_collector.go
////

[[metricbeat-metricset-elasticsearch-enrich]]
=== Elasticsearch enrich metricset

include::../../../module/elasticsearch/enrich/_meta/docs.asciidoc[]


==== Fields

For a description of each field in the metricset, see the
<<exported-fields-elasticsearch,exported fields>> section.

Here is an example document generated by this metricset:

[source,json]
----
include::../../../module/elasticsearch/enrich/_meta/data.json[]
----
3 changes: 2 additions & 1 deletion metricbeat/docs/modules_list.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ This file is generated! See scripts/mage/docs_collector.go
|<<metricbeat-module-dropwizard,Dropwizard>> |image:./images/icon-no.png[No prebuilt dashboards] |
.1+| .1+| |<<metricbeat-metricset-dropwizard-collector,collector>>
|<<metricbeat-module-elasticsearch,Elasticsearch>> |image:./images/icon-no.png[No prebuilt dashboards] |
.10+| .10+| |<<metricbeat-metricset-elasticsearch-ccr,ccr>>
.11+| .11+| |<<metricbeat-metricset-elasticsearch-ccr,ccr>>
|<<metricbeat-metricset-elasticsearch-cluster_stats,cluster_stats>>
|<<metricbeat-metricset-elasticsearch-enrich,enrich>>
|<<metricbeat-metricset-elasticsearch-index,index>>
|<<metricbeat-metricset-elasticsearch-index_recovery,index_recovery>>
|<<metricbeat-metricset-elasticsearch-index_summary,index_summary>>
Expand Down
1 change: 1 addition & 0 deletions metricbeat/include/list_common.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions metricbeat/module/elasticsearch/_meta/config-xpack.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
metricsets:
- ccr
- cluster_stats
- enrich
- index
- index_recovery
- index_summary
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func validateXPackMetricsets(base mb.BaseModule) error {

expectedXPackMetricsets := []string{
"ccr",
"enrich",
"cluster_stats",
"index",
"index_recovery",
Expand All @@ -86,6 +87,9 @@ func validateXPackMetricsets(base mb.BaseModule) error {
// CCRStatsAPIAvailableVersion is the version of Elasticsearch since when the CCR stats API is available.
var CCRStatsAPIAvailableVersion = common.MustNewVersion("6.5.0")

// EnrichStatsAPIAvailableVersion is the version of Elasticsearch since when the Enrich stats API is available.
var EnrichStatsAPIAvailableVersion = common.MustNewVersion("7.5.0")

// Global clusterIdCache. Assumption is that the same node id never can belong to a different cluster id.
var clusterIDCache = map[string]string{}

Expand Down
110 changes: 103 additions & 7 deletions metricbeat/module/elasticsearch/elasticsearch_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/elastic/beats/metricbeat/module/elasticsearch"
_ "github.com/elastic/beats/metricbeat/module/elasticsearch/ccr"
_ "github.com/elastic/beats/metricbeat/module/elasticsearch/cluster_stats"
_ "github.com/elastic/beats/metricbeat/module/elasticsearch/enrich"
_ "github.com/elastic/beats/metricbeat/module/elasticsearch/index"
_ "github.com/elastic/beats/metricbeat/module/elasticsearch/index_recovery"
_ "github.com/elastic/beats/metricbeat/module/elasticsearch/index_summary"
Expand All @@ -51,6 +52,7 @@ import (
var metricSets = []string{
"ccr",
"cluster_stats",
"enrich",
"index",
"index_recovery",
"index_summary",
Expand Down Expand Up @@ -81,6 +83,9 @@ func TestFetch(t *testing.T) {
err = createCCRStats(host)
assert.NoError(t, err)

err = createEnrichStats(host)
assert.NoError(t, err)

for _, metricSet := range metricSets {
checkSkip(t, metricSet, version)
t.Run(metricSet, func(t *testing.T) {
Expand Down Expand Up @@ -367,15 +372,98 @@ func checkExists(url string) bool {
return false
}

func checkSkip(t *testing.T, metricset string, version *common.Version) {
if metricset != "ccr" {
return
func createEnrichStats(host string) error {
err := createEnrichSourceIndex(host)
if err != nil {
return errors.Wrap(err, "error creating enrich source index")
}

isCCRStatsAPIAvailable := elastic.IsFeatureAvailable(version, elasticsearch.CCRStatsAPIAvailableVersion)
err = createEnrichPolicy(host)
if err != nil {
return errors.Wrap(err, "error creating enrich policy")
}

if !isCCRStatsAPIAvailable {
t.Skip("elasticsearch CCR stats API is not available until " + elasticsearch.CCRStatsAPIAvailableVersion.String())
err = executeEnrichPolicy(host)
if err != nil {
return errors.Wrap(err, "error executing enrich policy")
}

err = createEnrichIngestPipeline(host)
if err != nil {
return errors.Wrap(err, "error creating ingest pipeline with enrich processor")
}

err = ingestAndEnrichDoc(host)
if err != nil {
return errors.Wrap(err, "error ingesting doc for enrichment")
}

return nil
}

func createEnrichSourceIndex(host string) error {
sourceDoc, err := ioutil.ReadFile("enrich/_meta/test/source_doc.json")
if err != nil {
return err
}

docURL := "/users/_doc/1?refresh=wait_for"
_, _, err = httpPutJSON(host, docURL, sourceDoc)
return err
}

func createEnrichPolicy(host string) error {
policy, err := ioutil.ReadFile("enrich/_meta/test/policy.json")
if err != nil {
return err
}

policyURL := "/_enrich/policy/users-policy"
_, _, err = httpPutJSON(host, policyURL, policy)
return err
}

func executeEnrichPolicy(host string) error {
executeURL := "/_enrich/policy/users-policy/_execute"
_, _, err := httpPostJSON(host, executeURL, nil)
return err
}

func createEnrichIngestPipeline(host string) error {
pipeline, err := ioutil.ReadFile("enrich/_meta/test/ingest_pipeline.json")
if err != nil {
return err
}

pipelineURL := "/_ingest/pipeline/user_lookup"
_, _, err = httpPutJSON(host, pipelineURL, pipeline)
return err
}

func ingestAndEnrichDoc(host string) error {
targetDoc, err := ioutil.ReadFile("enrich/_meta/test/target_doc.json")
if err != nil {
return err
}

docURL := "/my_index/_doc/my_id?pipeline=user_lookup"
_, _, err = httpPutJSON(host, docURL, targetDoc)
return err
}

func checkSkip(t *testing.T, metricset string, version *common.Version) {
checkSkipFeature := func(name string, availableVersion *common.Version) {
isAPIAvailable := elastic.IsFeatureAvailable(version, availableVersion)
if !isAPIAvailable {
t.Skipf("elasticsearch %s stats API is not available until %s", name, availableVersion)
}
}

switch metricset {
case "ccr":
checkSkipFeature("CCR", elasticsearch.CCRStatsAPIAvailableVersion)
case "enrich":
checkSkipFeature("Enrich", elasticsearch.EnrichStatsAPIAvailableVersion)
}
}

Expand Down Expand Up @@ -406,7 +494,15 @@ func getElasticsearchVersion(elasticsearchHostPort string) (*common.Version, err
}

func httpPutJSON(host, path string, body []byte) ([]byte, *http.Response, error) {
req, err := http.NewRequest("PUT", "http://"+host+path, bytes.NewReader(body))
return httpSendJSON(host, path, "PUT", body)
}

func httpPostJSON(host, path string, body []byte) ([]byte, *http.Response, error) {
return httpSendJSON(host, path, "POST", body)
}

func httpSendJSON(host, path, method string, body []byte) ([]byte, *http.Response, error) {
req, err := http.NewRequest(method, "http://"+host+path, bytes.NewReader(body))
if err != nil {
return nil, nil, err
}
Expand Down
38 changes: 38 additions & 0 deletions metricbeat/module/elasticsearch/enrich/_meta/data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"@timestamp": "2017-10-12T08:05:34.853Z",
"elasticsearch": {
"cluster": {
"id": "et6blfihSoytMUvkpYtEKQ",
"name": "docker-cluster"
},
"enrich": {
"executed_searches": {
"total": 1
},
"queue": {
"size": 0
},
"remote_requests": {
"current": 0,
"total": 1
}
},
"node": {
"id": "l_XOyQ65Teyn4kW4PUFjVg"
}
},
"event": {
"dataset": "elasticsearch.enrich",
"duration": 115000,
"module": "elasticsearch"
},
"metricset": {
"name": "enrich",
"period": 10000
},
"service": {
"address": "localhost:32780",
"name": "elasticsearch",
"type": "elasticsearch"
}
}
3 changes: 3 additions & 0 deletions metricbeat/module/elasticsearch/enrich/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
This is the `enrich` metricset of the Elasticsearch module. It interrogates the
Enrich Stats API endpoint to fetch information about Enrich coordinator nodes
in the Elasticsearch cluster that are participating in ingest-time enrichment.
25 changes: 25 additions & 0 deletions metricbeat/module/elasticsearch/enrich/_meta/fields.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
- name: enrich
type: group
description: >
Enrich stats
release: ga
fields:
- name: queue.size
type: long
description: >
Number of search requests in the queue.
- name: remote_requests
type: group
fields:
- name: current
type: long
description: >
Current number of outstanding remote requests.
- name: total
type: long
description: >
Number of outstanding remote requests executed since node startup.
- name: executed_searches.total
type: long
description: >
Number of search requests that enrich processors have executed since node startup.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"executing_policies": [
{
"name": "my-policy",
"task": {
"id" : 124,
"type" : "direct",
"action" : "cluster:admin/xpack/enrich/execute",
"start_time_in_millis" : 1458585884904,
"running_time_in_nanos" : 47402,
"cancellable" : false,
"parent_task_id" : "oTUltX4IQMOUUVeiohTt8A:123",
"headers" : {
"X-Opaque-Id" : "123456"
}
}
}
],
"coordinator_stats": [
{
"node_id": "1sFM8cmSROZYhPxVsiWew",
"queue_size": 0,
"remote_requests_current": 0,
"remote_requests_total": 0,
"executed_searches_total": 0
}
]
}
Loading

0 comments on commit 9b96d62

Please sign in to comment.