Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add elasticsearch/enrich metricset #14243

Merged
merged 20 commits into from
Oct 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add ability to filter by tags for cloudwatch metricset. {pull}13758[13758] {issue}13145[13145]
- Release cloudwatch, s3_daily_storage, s3_request, sqs and rds metricset as GA. {pull}14114[14114] {issue}14059[14059]
- Add Oracle overview dashboard {pull}14021[14021]
- Add `elasticsearch/enrich` metricset. {pull}14243[14243] {issue}14221[14221]

*Packetbeat*

Expand Down
48 changes: 48 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -11077,6 +11077,54 @@ type: long
Memory used for fielddata.


type: long

--

[float]
=== enrich

Enrich stats



*`elasticsearch.enrich.queue.size`*::
+
--
Number of search requests in the queue.


type: long

--


*`elasticsearch.enrich.remote_requests.current`*::
+
--
Current number of outstanding remote requests.


type: long

--

*`elasticsearch.enrich.remote_requests.total`*::
+
--
Number of outstanding remote requests executed since node startup.


type: long

--

*`elasticsearch.enrich.executed_searches.total`*::
+
--
Number of search requests that enrich processors have executed since node startup.


type: long

--
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/docs/modules/elasticsearch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ The following metricsets are available:

* <<metricbeat-metricset-elasticsearch-cluster_stats,cluster_stats>>

* <<metricbeat-metricset-elasticsearch-enrich,enrich>>

* <<metricbeat-metricset-elasticsearch-index,index>>

* <<metricbeat-metricset-elasticsearch-index_recovery,index_recovery>>
Expand All @@ -86,6 +88,8 @@ include::elasticsearch/ccr.asciidoc[]

include::elasticsearch/cluster_stats.asciidoc[]

include::elasticsearch/enrich.asciidoc[]

include::elasticsearch/index.asciidoc[]

include::elasticsearch/index_recovery.asciidoc[]
Expand Down
21 changes: 21 additions & 0 deletions metricbeat/docs/modules/elasticsearch/enrich.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
////
This file is generated! See scripts/mage/docs_collector.go
////

[[metricbeat-metricset-elasticsearch-enrich]]
=== Elasticsearch enrich metricset

include::../../../module/elasticsearch/enrich/_meta/docs.asciidoc[]


==== Fields

For a description of each field in the metricset, see the
<<exported-fields-elasticsearch,exported fields>> section.

Here is an example document generated by this metricset:

[source,json]
----
include::../../../module/elasticsearch/enrich/_meta/data.json[]
----
3 changes: 2 additions & 1 deletion metricbeat/docs/modules_list.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ This file is generated! See scripts/mage/docs_collector.go
|<<metricbeat-module-dropwizard,Dropwizard>> |image:./images/icon-no.png[No prebuilt dashboards] |
.1+| .1+| |<<metricbeat-metricset-dropwizard-collector,collector>>
|<<metricbeat-module-elasticsearch,Elasticsearch>> |image:./images/icon-no.png[No prebuilt dashboards] |
.10+| .10+| |<<metricbeat-metricset-elasticsearch-ccr,ccr>>
.11+| .11+| |<<metricbeat-metricset-elasticsearch-ccr,ccr>>
|<<metricbeat-metricset-elasticsearch-cluster_stats,cluster_stats>>
|<<metricbeat-metricset-elasticsearch-enrich,enrich>>
|<<metricbeat-metricset-elasticsearch-index,index>>
|<<metricbeat-metricset-elasticsearch-index_recovery,index_recovery>>
|<<metricbeat-metricset-elasticsearch-index_summary,index_summary>>
Expand Down
1 change: 1 addition & 0 deletions metricbeat/include/list_common.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions metricbeat/module/elasticsearch/_meta/config-xpack.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
metricsets:
- ccr
- cluster_stats
- enrich
- index
- index_recovery
- index_summary
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func validateXPackMetricsets(base mb.BaseModule) error {

expectedXPackMetricsets := []string{
"ccr",
"enrich",
"cluster_stats",
"index",
"index_recovery",
Expand All @@ -86,6 +87,9 @@ func validateXPackMetricsets(base mb.BaseModule) error {
// CCRStatsAPIAvailableVersion is the version of Elasticsearch since when the CCR stats API is available.
var CCRStatsAPIAvailableVersion = common.MustNewVersion("6.5.0")

// EnrichStatsAPIAvailableVersion is the version of Elasticsearch since when the Enrich stats API is available.
var EnrichStatsAPIAvailableVersion = common.MustNewVersion("7.5.0")

// Global clusterIdCache. Assumption is that the same node id never can belong to a different cluster id.
var clusterIDCache = map[string]string{}

Expand Down
110 changes: 103 additions & 7 deletions metricbeat/module/elasticsearch/elasticsearch_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/elastic/beats/metricbeat/module/elasticsearch"
_ "github.com/elastic/beats/metricbeat/module/elasticsearch/ccr"
_ "github.com/elastic/beats/metricbeat/module/elasticsearch/cluster_stats"
_ "github.com/elastic/beats/metricbeat/module/elasticsearch/enrich"
_ "github.com/elastic/beats/metricbeat/module/elasticsearch/index"
_ "github.com/elastic/beats/metricbeat/module/elasticsearch/index_recovery"
_ "github.com/elastic/beats/metricbeat/module/elasticsearch/index_summary"
Expand All @@ -51,6 +52,7 @@ import (
var metricSets = []string{
"ccr",
"cluster_stats",
"enrich",
"index",
"index_recovery",
"index_summary",
Expand Down Expand Up @@ -81,6 +83,9 @@ func TestFetch(t *testing.T) {
err = createCCRStats(host)
assert.NoError(t, err)

err = createEnrichStats(host)
assert.NoError(t, err)

for _, metricSet := range metricSets {
checkSkip(t, metricSet, version)
t.Run(metricSet, func(t *testing.T) {
Expand Down Expand Up @@ -367,15 +372,98 @@ func checkExists(url string) bool {
return false
}

func checkSkip(t *testing.T, metricset string, version *common.Version) {
if metricset != "ccr" {
return
func createEnrichStats(host string) error {
err := createEnrichSourceIndex(host)
if err != nil {
return errors.Wrap(err, "error creating enrich source index")
}

isCCRStatsAPIAvailable := elastic.IsFeatureAvailable(version, elasticsearch.CCRStatsAPIAvailableVersion)
err = createEnrichPolicy(host)
if err != nil {
return errors.Wrap(err, "error creating enrich policy")
}

if !isCCRStatsAPIAvailable {
t.Skip("elasticsearch CCR stats API is not available until " + elasticsearch.CCRStatsAPIAvailableVersion.String())
err = executeEnrichPolicy(host)
if err != nil {
return errors.Wrap(err, "error executing enrich policy")
}

err = createEnrichIngestPipeline(host)
if err != nil {
return errors.Wrap(err, "error creating ingest pipeline with enrich processor")
}

err = ingestAndEnrichDoc(host)
if err != nil {
return errors.Wrap(err, "error ingesting doc for enrichment")
}

return nil
}

func createEnrichSourceIndex(host string) error {
sourceDoc, err := ioutil.ReadFile("enrich/_meta/test/source_doc.json")
if err != nil {
return err
}

docURL := "/users/_doc/1?refresh=wait_for"
_, _, err = httpPutJSON(host, docURL, sourceDoc)
return err
}

func createEnrichPolicy(host string) error {
policy, err := ioutil.ReadFile("enrich/_meta/test/policy.json")
if err != nil {
return err
}

policyURL := "/_enrich/policy/users-policy"
_, _, err = httpPutJSON(host, policyURL, policy)
return err
}

func executeEnrichPolicy(host string) error {
executeURL := "/_enrich/policy/users-policy/_execute"
_, _, err := httpPostJSON(host, executeURL, nil)
return err
}

func createEnrichIngestPipeline(host string) error {
pipeline, err := ioutil.ReadFile("enrich/_meta/test/ingest_pipeline.json")
if err != nil {
return err
}

pipelineURL := "/_ingest/pipeline/user_lookup"
_, _, err = httpPutJSON(host, pipelineURL, pipeline)
return err
}

func ingestAndEnrichDoc(host string) error {
targetDoc, err := ioutil.ReadFile("enrich/_meta/test/target_doc.json")
if err != nil {
return err
}

docURL := "/my_index/_doc/my_id?pipeline=user_lookup"
_, _, err = httpPutJSON(host, docURL, targetDoc)
return err
}

func checkSkip(t *testing.T, metricset string, version *common.Version) {
checkSkipFeature := func(name string, availableVersion *common.Version) {
isAPIAvailable := elastic.IsFeatureAvailable(version, availableVersion)
if !isAPIAvailable {
t.Skipf("elasticsearch %s stats API is not available until %s", name, availableVersion)
}
}

switch metricset {
case "ccr":
checkSkipFeature("CCR", elasticsearch.CCRStatsAPIAvailableVersion)
case "enrich":
checkSkipFeature("Enrich", elasticsearch.EnrichStatsAPIAvailableVersion)
}
}

Expand Down Expand Up @@ -406,7 +494,15 @@ func getElasticsearchVersion(elasticsearchHostPort string) (*common.Version, err
}

func httpPutJSON(host, path string, body []byte) ([]byte, *http.Response, error) {
req, err := http.NewRequest("PUT", "http://"+host+path, bytes.NewReader(body))
return httpSendJSON(host, path, "PUT", body)
}

func httpPostJSON(host, path string, body []byte) ([]byte, *http.Response, error) {
return httpSendJSON(host, path, "POST", body)
}

func httpSendJSON(host, path, method string, body []byte) ([]byte, *http.Response, error) {
req, err := http.NewRequest(method, "http://"+host+path, bytes.NewReader(body))
if err != nil {
return nil, nil, err
}
Expand Down
38 changes: 38 additions & 0 deletions metricbeat/module/elasticsearch/enrich/_meta/data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"@timestamp": "2017-10-12T08:05:34.853Z",
"elasticsearch": {
"cluster": {
"id": "et6blfihSoytMUvkpYtEKQ",
"name": "docker-cluster"
},
"enrich": {
"executed_searches": {
"total": 1
},
"queue": {
"size": 0
},
"remote_requests": {
"current": 0,
"total": 1
}
},
"node": {
"id": "l_XOyQ65Teyn4kW4PUFjVg"
}
},
"event": {
"dataset": "elasticsearch.enrich",
"duration": 115000,
"module": "elasticsearch"
},
"metricset": {
"name": "enrich",
"period": 10000
},
"service": {
"address": "localhost:32780",
"name": "elasticsearch",
"type": "elasticsearch"
}
}
3 changes: 3 additions & 0 deletions metricbeat/module/elasticsearch/enrich/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
This is the `enrich` metricset of the Elasticsearch module. It interrogates the
Enrich Stats API endpoint to fetch information about Enrich coordinator nodes
in the Elasticsearch cluster that are participating in ingest-time enrichment.
25 changes: 25 additions & 0 deletions metricbeat/module/elasticsearch/enrich/_meta/fields.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
- name: enrich
type: group
description: >
Enrich stats
release: ga
fields:
- name: queue.size
type: long
description: >
Number of search requests in the queue.
- name: remote_requests
type: group
fields:
- name: current
type: long
description: >
Current number of outstanding remote requests.
- name: total
type: long
description: >
Number of outstanding remote requests executed since node startup.
- name: executed_searches.total
type: long
description: >
Number of search requests that enrich processors have executed since node startup.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"executing_policies": [
{
"name": "my-policy",
"task": {
"id" : 124,
"type" : "direct",
"action" : "cluster:admin/xpack/enrich/execute",
"start_time_in_millis" : 1458585884904,
"running_time_in_nanos" : 47402,
"cancellable" : false,
"parent_task_id" : "oTUltX4IQMOUUVeiohTt8A:123",
"headers" : {
"X-Opaque-Id" : "123456"
}
}
}
],
"coordinator_stats": [
{
"node_id": "1sFM8cmSROZYhPxVsiWew",
"queue_size": 0,
"remote_requests_current": 0,
"remote_requests_total": 0,
"executed_searches_total": 0
}
]
}
Loading