diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b3eaf0a08dc6..7e5c18ceb3a8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -373,6 +373,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add aggregation aligner as a config parameter for googlecloud stackdriver metricset. {issue}17141[[17141] {pull}17719[17719] - Move the perfmon metricset to GA. {issue}16608[16608] {pull}17879[17879] - Add static mapping for metricsets under aws module. {pull}17614[17614] {pull}17650[17650] +- Collect new `bulk` indexing metrics from Elasticsearch when `xpack.enabled:true` is set. {issue} {pull}17992[17992] *Packetbeat* diff --git a/metricbeat/module/elasticsearch/elasticsearch.go b/metricbeat/module/elasticsearch/elasticsearch.go index c2264f9d6a8d..46825ee00840 100644 --- a/metricbeat/module/elasticsearch/elasticsearch.go +++ b/metricbeat/module/elasticsearch/elasticsearch.go @@ -28,6 +28,8 @@ import ( "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/common" + s "github.com/elastic/beats/v7/libbeat/common/schema" + c "github.com/elastic/beats/v7/libbeat/common/schema/mapstriface" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/helper/elastic" @@ -63,6 +65,9 @@ var CCRStatsAPIAvailableVersion = common.MustNewVersion("6.5.0") // EnrichStatsAPIAvailableVersion is the version of Elasticsearch since when the Enrich stats API is available. var EnrichStatsAPIAvailableVersion = common.MustNewVersion("7.5.0") +// BulkStatsAvailableVersion is the version since when bulk indexing stats are available +var BulkStatsAvailableVersion = common.MustNewVersion("7.8.0") + // Global clusterIdCache. Assumption is that the same node id never can belong to a different cluster id. var clusterIDCache = map[string]string{} @@ -107,6 +112,14 @@ type licenseWrapper struct { License License `json:"license"` } +var BulkStatsDict = c.Dict("bulk", s.Schema{ + "total_operations": c.Int("total_operations"), + "total_time_in_millis": c.Int("total_time_in_millis"), + "total_size_in_bytes": c.Int("total_size_in_bytes"), + "avg_time_in_millis": c.Int("avg_time_in_millis"), + "avg_size_in_bytes": c.Int("avg_size_in_bytes"), +}, c.DictOptional) + // GetClusterID fetches cluster id for given nodeID. func GetClusterID(http *helper.HTTP, uri string, nodeID string) (string, error) { // Check if cluster id already cached. If yes, return it. diff --git a/metricbeat/module/elasticsearch/index/data_xpack.go b/metricbeat/module/elasticsearch/index/data_xpack.go index 35e9119fdf7e..6c73b4ee2e1c 100644 --- a/metricbeat/module/elasticsearch/index/data_xpack.go +++ b/metricbeat/module/elasticsearch/index/data_xpack.go @@ -65,6 +65,7 @@ type indexStats struct { IndexTimeInMillis int `json:"index_time_in_millis"` ThrottleTimeInMillis int `json:"throttle_time_in_millis"` } `json:"indexing"` + Bulk bulkStats `json:"bulk"` Merges struct { TotalSizeInBytes int `json:"total_size_in_bytes"` } `json:"merges"` @@ -120,6 +121,14 @@ type shardStats struct { Relocating int `json:"relocating"` } +type bulkStats struct { + TotalOperations int `json:"total_operations"` + TotalTimeInMillis int `json:"total_time_in_millis"` + TotalSizeInBytes int `json:"total_size_in_bytes"` + AvgTimeInMillis int `json:"throttle_time_in_millis"` + AvgSizeInBytes int `json:"avg_size_in_bytes"` +} + func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error { clusterStateMetrics := []string{"metadata", "routing_table"} clusterState, err := elasticsearch.GetClusterState(m.HTTP, m.HTTP.GetURI(), clusterStateMetrics) diff --git a/metricbeat/module/elasticsearch/index/index.go b/metricbeat/module/elasticsearch/index/index.go index cd2dc3ffca0d..3454b0d65542 100644 --- a/metricbeat/module/elasticsearch/index/index.go +++ b/metricbeat/module/elasticsearch/index/index.go @@ -18,8 +18,12 @@ package index import ( + "net/url" + "strings" + "github.com/pkg/errors" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/module/elasticsearch" ) @@ -67,14 +71,22 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { return nil } - content, err := m.HTTP.FetchContent() + info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI) if err != nil { + return errors.Wrap(err, "failed to get info from Elasticsearch") + } + + if err := m.updateServicePath(*info.Version.Number); err != nil { + if m.XPack { + m.Logger().Error(err) + return nil + } return err } - info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI) + content, err := m.HTTP.FetchContent() if err != nil { - return errors.Wrap(err, "failed to get info from Elasticsearch") + return err } if m.XPack { @@ -92,3 +104,35 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { return nil } + +func (m *MetricSet) updateServicePath(esVersion common.Version) error { + p, err := getServicePath(esVersion) + if err != nil { + return err + } + + m.SetServiceURI(p) + return nil + +} + +func getServicePath(esVersion common.Version) (string, error) { + currPath := statsPath + if esVersion.LessThan(elasticsearch.BulkStatsAvailableVersion) { + // Can't request bulk stats so don't change service URI + return currPath, nil + } + + u, err := url.Parse(currPath) + if err != nil { + return "", err + } + + if strings.HasSuffix(u.Path, ",bulk") { + // Bulk stats already being requested so don't change service URI + return currPath, nil + } + + u.Path += ",bulk" + return u.String(), nil +} diff --git a/metricbeat/module/elasticsearch/index/index_test.go b/metricbeat/module/elasticsearch/index/index_test.go new file mode 100644 index 000000000000..8c4106e9944e --- /dev/null +++ b/metricbeat/module/elasticsearch/index/index_test.go @@ -0,0 +1,70 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package index + +import ( + "strings" + "testing" + "testing/quick" + + "github.com/elastic/beats/v7/libbeat/common" + + "github.com/stretchr/testify/require" +) + +func TestGetServiceURI(t *testing.T) { + tests := map[string]struct { + esVersion *common.Version + expectedPath string + }{ + "bulk_stats_unavailable": { + esVersion: common.MustNewVersion("7.7.0"), + expectedPath: statsPath, + }, + "bulk_stats_available": { + esVersion: common.MustNewVersion("7.8.0"), + expectedPath: strings.Replace(statsPath, statsMetrics, statsMetrics+",bulk", 1), + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + newURI, err := getServicePath(*test.esVersion) + require.NoError(t, err) + require.Equal(t, test.expectedPath, newURI) + }) + } +} + +func TestGetServiceURIMultipleCalls(t *testing.T) { + err := quick.Check(func(r uint) bool { + numCalls := 2 + (r % 10) // between 2 and 11 + + var uri string + var err error + for i := uint(0); i < numCalls; i++ { + uri, err = getServicePath(*common.MustNewVersion("7.8.0")) + if err != nil { + return false + } + } + + return err == nil && uri == strings.Replace(statsPath, statsMetrics, statsMetrics+",bulk", 1) + }, nil) + require.NoError(t, err) +} diff --git a/metricbeat/module/elasticsearch/index_summary/data_xpack.go b/metricbeat/module/elasticsearch/index_summary/data_xpack.go index d1e00ea64b86..4e35744133d4 100644 --- a/metricbeat/module/elasticsearch/index_summary/data_xpack.go +++ b/metricbeat/module/elasticsearch/index_summary/data_xpack.go @@ -51,6 +51,7 @@ var ( "is_throttled": c.Bool("is_throttled"), "throttle_time_in_millis": c.Int("throttle_time_in_millis"), }), + "bulk": elasticsearch.BulkStatsDict, "search": c.Dict("search", s.Schema{ "query_total": c.Int("query_total"), "query_time_in_millis": c.Int("query_time_in_millis"), diff --git a/metricbeat/module/elasticsearch/node_stats/data_xpack.go b/metricbeat/module/elasticsearch/node_stats/data_xpack.go index f7f612b11eef..533401031766 100644 --- a/metricbeat/module/elasticsearch/node_stats/data_xpack.go +++ b/metricbeat/module/elasticsearch/node_stats/data_xpack.go @@ -49,6 +49,7 @@ var ( "index_time_in_millis": c.Int("index_time_in_millis"), "throttle_time_in_millis": c.Int("throttle_time_in_millis"), }), + "bulk": elasticsearch.BulkStatsDict, "search": c.Dict("search", s.Schema{ "query_total": c.Int("query_total"), "query_time_in_millis": c.Int("query_time_in_millis"),