diff --git a/plugins/outputs/elasticsearch/README.md b/plugins/outputs/elasticsearch/README.md index 41001ee89e282..b65ed2ea85cff 100644 --- a/plugins/outputs/elasticsearch/README.md +++ b/plugins/outputs/elasticsearch/README.md @@ -199,6 +199,15 @@ This plugin will format the events in the following way: ## If set to true a unique ID hash will be sent as sha256(concat(timestamp,measurement,series-hash)) string ## it will enable data resend and update metric points avoiding duplicated metrics with diferent id's force_document_id = false + + ## Specifies the handling of NaN and Inf values. + ## This option can have the following values: + ## none -- do not modify field-values (default); will produce an error if NaNs or infs are encountered + ## drop -- drop fields containing NaNs or infs + ## replace -- replace with the value in "float_replacement_value" (default: 0.0) + ## NaNs and inf will be replaced with the given number, -inf with the negative of that number + # float_handling = "none" + # float_replacement_value = 0.0 ``` ### Permissions @@ -236,6 +245,8 @@ Additionally, you can specify dynamic index names by using tags with the notatio * `template_name`: The template name used for telegraf indexes. * `overwrite_template`: Set to true if you want telegraf to overwrite an existing template. * `force_document_id`: Set to true will compute a unique hash from as sha256(concat(timestamp,measurement,series-hash)),enables resend or update data withoud ES duplicated documents. +* `float_handling`: Specifies how to handle `NaN` and infinite field values. `"none"` (default) will do nothing, `"drop"` will drop the field and `replace` will replace the field value by the number in `float_replacement_value` +* `float_replacement_value`: Value (defaulting to `0.0`) to replace `NaN`s and `inf`s if `float_handling` is set to `replace`. Negative `inf` will be replaced by the negative value in this number to respect the sign of the field's original value. ## Known issues diff --git a/plugins/outputs/elasticsearch/elasticsearch.go b/plugins/outputs/elasticsearch/elasticsearch.go index 235a8ee088ec3..7aeaa0b3c1911 100644 --- a/plugins/outputs/elasticsearch/elasticsearch.go +++ b/plugins/outputs/elasticsearch/elasticsearch.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "math" "net/http" "strconv" "strings" @@ -35,6 +36,8 @@ type Elasticsearch struct { OverwriteTemplate bool ForceDocumentID bool `toml:"force_document_id"` MajorReleaseNumber int + FloatHandling string `toml:"float_handling"` + FloatReplacement float64 `toml:"float_replacement_value"` Log telegraf.Logger `toml:"-"` tls.ClientConfig @@ -95,6 +98,15 @@ var sampleConfig = ` ## If set to true a unique ID hash will be sent as sha256(concat(timestamp,measurement,series-hash)) string ## it will enable data resend and update metric points avoiding duplicated metrics with diferent id's force_document_id = false + + ## Specifies the handling of NaN and Inf values. + ## This option can have the following values: + ## none -- do not modify field-values (default); will produce an error if NaNs or infs are encountered + ## drop -- drop fields containing NaNs or infs + ## replace -- replace with the value in "float_replacement_value" (default: 0.0) + ## NaNs and inf will be replaced with the given number, -inf with the negative of that number + # float_handling = "none" + # float_replacement_value = 0.0 ` const telegrafTemplate = ` @@ -177,6 +189,15 @@ func (a *Elasticsearch) Connect() error { return fmt.Errorf("elasticsearch urls or index_name is not defined") } + // Determine if we should process NaN and inf values + switch a.FloatHandling { + case "", "none": + a.FloatHandling = "none" + case "drop", "replace": + default: + return fmt.Errorf("invalid float_handling type %q", a.FloatHandling) + } + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(a.Timeout)) defer cancel() @@ -278,12 +299,31 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error { // to send the metric to the correct time-based index indexName := a.GetIndexName(a.IndexName, metric.Time(), a.TagKeys, metric.Tags()) + // Handle NaN and inf field-values + fields := make(map[string]interface{}) + for k, value := range metric.Fields() { + v, ok := value.(float64) + if !ok || a.FloatHandling == "none" || !(math.IsNaN(v) || math.IsInf(v, 0)) { + fields[k] = value + continue + } + if a.FloatHandling == "drop" { + continue + } + + if math.IsNaN(v) || math.IsInf(v, 1) { + fields[k] = a.FloatReplacement + } else { + fields[k] = -a.FloatReplacement + } + } + m := make(map[string]interface{}) m["@timestamp"] = metric.Time() m["measurement_name"] = name m["tag"] = metric.Tags() - m[name] = metric.Fields() + m[name] = fields br := elastic.NewBulkIndexRequest().Index(indexName).Doc(m) diff --git a/plugins/outputs/elasticsearch/elasticsearch_test.go b/plugins/outputs/elasticsearch/elasticsearch_test.go index ecfe03f2e0a82..ec6bb3a9249ab 100644 --- a/plugins/outputs/elasticsearch/elasticsearch_test.go +++ b/plugins/outputs/elasticsearch/elasticsearch_test.go @@ -2,12 +2,14 @@ package elasticsearch import ( "context" + "math" "net/http" "net/http/httptest" "reflect" "testing" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" @@ -41,6 +43,149 @@ func TestConnectAndWriteIntegration(t *testing.T) { require.NoError(t, err) } +func TestConnectAndWriteMetricWithNaNValueEmpty(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} + + e := &Elasticsearch{ + URLs: urls, + IndexName: "test-%Y.%m.%d", + Timeout: config.Duration(time.Second * 5), + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: config.Duration(time.Second * 10), + Log: testutil.Logger{}, + } + + metrics := []telegraf.Metric{ + testutil.TestMetric(math.NaN()), + testutil.TestMetric(math.Inf(1)), + testutil.TestMetric(math.Inf(-1)), + } + + // Verify that we can connect to Elasticsearch + err := e.Connect() + require.NoError(t, err) + + // Verify that we can fail for metric with unhandled NaN/inf/-inf values + for _, m := range metrics { + err = e.Write([]telegraf.Metric{m}) + require.Error(t, err, "error sending bulk request to Elasticsearch: json: unsupported value: NaN") + } +} + +func TestConnectAndWriteMetricWithNaNValueNone(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} + + e := &Elasticsearch{ + URLs: urls, + IndexName: "test-%Y.%m.%d", + Timeout: config.Duration(time.Second * 5), + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: config.Duration(time.Second * 10), + FloatHandling: "none", + Log: testutil.Logger{}, + } + + metrics := []telegraf.Metric{ + testutil.TestMetric(math.NaN()), + testutil.TestMetric(math.Inf(1)), + testutil.TestMetric(math.Inf(-1)), + } + + // Verify that we can connect to Elasticsearch + err := e.Connect() + require.NoError(t, err) + + // Verify that we can fail for metric with unhandled NaN/inf/-inf values + for _, m := range metrics { + err = e.Write([]telegraf.Metric{m}) + require.Error(t, err, "error sending bulk request to Elasticsearch: json: unsupported value: NaN") + } +} + +func TestConnectAndWriteMetricWithNaNValueDrop(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} + + e := &Elasticsearch{ + URLs: urls, + IndexName: "test-%Y.%m.%d", + Timeout: config.Duration(time.Second * 5), + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: config.Duration(time.Second * 10), + FloatHandling: "drop", + Log: testutil.Logger{}, + } + + metrics := []telegraf.Metric{ + testutil.TestMetric(math.NaN()), + testutil.TestMetric(math.Inf(1)), + testutil.TestMetric(math.Inf(-1)), + } + + // Verify that we can connect to Elasticsearch + err := e.Connect() + require.NoError(t, err) + + // Verify that we can fail for metric with unhandled NaN/inf/-inf values + for _, m := range metrics { + err = e.Write([]telegraf.Metric{m}) + require.NoError(t, err) + } +} + +func TestConnectAndWriteMetricWithNaNValueReplacement(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} + + e := &Elasticsearch{ + URLs: urls, + IndexName: "test-%Y.%m.%d", + Timeout: config.Duration(time.Second * 5), + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: config.Duration(time.Second * 10), + FloatHandling: "3.1415", + Log: testutil.Logger{}, + } + + metrics := []telegraf.Metric{ + testutil.TestMetric(math.NaN()), + testutil.TestMetric(math.Inf(1)), + testutil.TestMetric(math.Inf(-1)), + } + + // Verify that we can connect to Elasticsearch + err := e.Connect() + require.NoError(t, err) + + // Verify that we can fail for metric with unhandled NaN/inf/-inf values + for _, m := range metrics { + err = e.Write([]telegraf.Metric{m}) + require.NoError(t, err) + } +} + func TestTemplateManagementEmptyTemplateIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") @@ -121,7 +266,7 @@ func TestGetTagKeys(t *testing.T) { Log: testutil.Logger{}, } - var tests = []struct { + tests := []struct { IndexName string ExpectedIndexName string ExpectedTagKeys []string @@ -181,7 +326,7 @@ func TestGetIndexName(t *testing.T) { Log: testutil.Logger{}, } - var tests = []struct { + tests := []struct { EventTime time.Time Tags map[string]string TagKeys []string