Skip to content

Commit

Permalink
Collect bulk indexing stats for Elasticsearch metricsets (#17992) (#1…
Browse files Browse the repository at this point in the history
…8229)

* Collecting new index_stats bulk metrics

* Collecting new indices_stats bulk metrics

* Collecting new node_stats bulk metrics

* Adding CHANGELOG entry

* Request bulk stats

* Request bulk metrics only if supported

* Fixing code and tests

* Fixing code so only service URI path is replaced

* Updating unit tests
  • Loading branch information
ycombinator authored May 5, 2020
1 parent 2deccde commit 2c4bf42
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Move the perfmon metricset to GA. {issue}16608[16608] {pull}17879[17879]
- Stack Monitoring modules now auto-configure required metricsets when `xpack.enabled: true` is set. {issue}16471[[16471] {pull}17609[17609]
- Add static mapping for metricsets under aws module. {pull}17614[17614] {pull}17650[17650]
- Collect new `bulk` indexing metrics from Elasticsearch when `xpack.enabled:true` is set. {issue} {pull}17992[17992]

*Packetbeat*

Expand Down
13 changes: 13 additions & 0 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
s "github.com/elastic/beats/v7/libbeat/common/schema"
c "github.com/elastic/beats/v7/libbeat/common/schema/mapstriface"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/helper/elastic"
Expand Down Expand Up @@ -63,6 +65,9 @@ var CCRStatsAPIAvailableVersion = common.MustNewVersion("6.5.0")
// EnrichStatsAPIAvailableVersion is the version of Elasticsearch since when the Enrich stats API is available.
var EnrichStatsAPIAvailableVersion = common.MustNewVersion("7.5.0")

// BulkStatsAvailableVersion is the version since when bulk indexing stats are available
var BulkStatsAvailableVersion = common.MustNewVersion("7.8.0")

// Global clusterIdCache. Assumption is that the same node id never can belong to a different cluster id.
var clusterIDCache = map[string]string{}

Expand Down Expand Up @@ -107,6 +112,14 @@ type licenseWrapper struct {
License License `json:"license"`
}

var BulkStatsDict = c.Dict("bulk", s.Schema{
"total_operations": c.Int("total_operations"),
"total_time_in_millis": c.Int("total_time_in_millis"),
"total_size_in_bytes": c.Int("total_size_in_bytes"),
"avg_time_in_millis": c.Int("avg_time_in_millis"),
"avg_size_in_bytes": c.Int("avg_size_in_bytes"),
}, c.DictOptional)

// GetClusterID fetches cluster id for given nodeID.
func GetClusterID(http *helper.HTTP, uri string, nodeID string) (string, error) {
// Check if cluster id already cached. If yes, return it.
Expand Down
9 changes: 9 additions & 0 deletions metricbeat/module/elasticsearch/index/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type indexStats struct {
IndexTimeInMillis int `json:"index_time_in_millis"`
ThrottleTimeInMillis int `json:"throttle_time_in_millis"`
} `json:"indexing"`
Bulk bulkStats `json:"bulk"`
Merges struct {
TotalSizeInBytes int `json:"total_size_in_bytes"`
} `json:"merges"`
Expand Down Expand Up @@ -120,6 +121,14 @@ type shardStats struct {
Relocating int `json:"relocating"`
}

type bulkStats struct {
TotalOperations int `json:"total_operations"`
TotalTimeInMillis int `json:"total_time_in_millis"`
TotalSizeInBytes int `json:"total_size_in_bytes"`
AvgTimeInMillis int `json:"throttle_time_in_millis"`
AvgSizeInBytes int `json:"avg_size_in_bytes"`
}

func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error {
clusterStateMetrics := []string{"metadata", "routing_table"}
clusterState, err := elasticsearch.GetClusterState(m.HTTP, m.HTTP.GetURI(), clusterStateMetrics)
Expand Down
50 changes: 47 additions & 3 deletions metricbeat/module/elasticsearch/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
package index

import (
"net/url"
"strings"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/elasticsearch"
)
Expand Down Expand Up @@ -67,14 +71,22 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return nil
}

content, err := m.HTTP.FetchContent()
info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI)
if err != nil {
return errors.Wrap(err, "failed to get info from Elasticsearch")
}

if err := m.updateServicePath(*info.Version.Number); err != nil {
if m.XPack {
m.Logger().Error(err)
return nil
}
return err
}

info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI)
content, err := m.HTTP.FetchContent()
if err != nil {
return errors.Wrap(err, "failed to get info from Elasticsearch")
return err
}

if m.XPack {
Expand All @@ -92,3 +104,35 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {

return nil
}

func (m *MetricSet) updateServicePath(esVersion common.Version) error {
p, err := getServicePath(esVersion)
if err != nil {
return err
}

m.SetServiceURI(p)
return nil

}

func getServicePath(esVersion common.Version) (string, error) {
currPath := statsPath
if esVersion.LessThan(elasticsearch.BulkStatsAvailableVersion) {
// Can't request bulk stats so don't change service URI
return currPath, nil
}

u, err := url.Parse(currPath)
if err != nil {
return "", err
}

if strings.HasSuffix(u.Path, ",bulk") {
// Bulk stats already being requested so don't change service URI
return currPath, nil
}

u.Path += ",bulk"
return u.String(), nil
}
70 changes: 70 additions & 0 deletions metricbeat/module/elasticsearch/index/index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package index

import (
"strings"
"testing"
"testing/quick"

"github.com/elastic/beats/v7/libbeat/common"

"github.com/stretchr/testify/require"
)

func TestGetServiceURI(t *testing.T) {
tests := map[string]struct {
esVersion *common.Version
expectedPath string
}{
"bulk_stats_unavailable": {
esVersion: common.MustNewVersion("7.7.0"),
expectedPath: statsPath,
},
"bulk_stats_available": {
esVersion: common.MustNewVersion("7.8.0"),
expectedPath: strings.Replace(statsPath, statsMetrics, statsMetrics+",bulk", 1),
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
newURI, err := getServicePath(*test.esVersion)
require.NoError(t, err)
require.Equal(t, test.expectedPath, newURI)
})
}
}

func TestGetServiceURIMultipleCalls(t *testing.T) {
err := quick.Check(func(r uint) bool {
numCalls := 2 + (r % 10) // between 2 and 11

var uri string
var err error
for i := uint(0); i < numCalls; i++ {
uri, err = getServicePath(*common.MustNewVersion("7.8.0"))
if err != nil {
return false
}
}

return err == nil && uri == strings.Replace(statsPath, statsMetrics, statsMetrics+",bulk", 1)
}, nil)
require.NoError(t, err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var (
"is_throttled": c.Bool("is_throttled"),
"throttle_time_in_millis": c.Int("throttle_time_in_millis"),
}),
"bulk": elasticsearch.BulkStatsDict,
"search": c.Dict("search", s.Schema{
"query_total": c.Int("query_total"),
"query_time_in_millis": c.Int("query_time_in_millis"),
Expand Down
1 change: 1 addition & 0 deletions metricbeat/module/elasticsearch/node_stats/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var (
"index_time_in_millis": c.Int("index_time_in_millis"),
"throttle_time_in_millis": c.Int("throttle_time_in_millis"),
}),
"bulk": elasticsearch.BulkStatsDict,
"search": c.Dict("search", s.Schema{
"query_total": c.Int("query_total"),
"query_time_in_millis": c.Int("query_time_in_millis"),
Expand Down

0 comments on commit 2c4bf42

Please sign in to comment.