Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Implement NaN and inf handling for elasticsearch output #10196

Merged
merged 5 commits into from
Dec 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions plugins/outputs/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,15 @@ This plugin will format the events in the following way:
## If set to true a unique ID hash will be sent as sha256(concat(timestamp,measurement,series-hash)) string
## it will enable data resend and update metric points avoiding duplicated metrics with diferent id's
force_document_id = false

## Specifies the handling of NaN and Inf values.
## This option can have the following values:
## none -- do not modify field-values (default); will produce an error if NaNs or infs are encountered
## drop -- drop fields containing NaNs or infs
## replace -- replace with the value in "float_replacement_value" (default: 0.0)
## NaNs and inf will be replaced with the given number, -inf with the negative of that number
# float_handling = "none"
# float_replacement_value = 0.0
```

### Permissions
Expand Down Expand Up @@ -236,6 +245,8 @@ Additionally, you can specify dynamic index names by using tags with the notatio
* `template_name`: The template name used for telegraf indexes.
* `overwrite_template`: Set to true if you want telegraf to overwrite an existing template.
* `force_document_id`: Set to true will compute a unique hash from as sha256(concat(timestamp,measurement,series-hash)),enables resend or update data withoud ES duplicated documents.
* `float_handling`: Specifies how to handle `NaN` and infinite field values. `"none"` (default) will do nothing, `"drop"` will drop the field and `replace` will replace the field value by the number in `float_replacement_value`
* `float_replacement_value`: Value (defaulting to `0.0`) to replace `NaN`s and `inf`s if `float_handling` is set to `replace`. Negative `inf` will be replaced by the negative value in this number to respect the sign of the field's original value.

## Known issues

Expand Down
42 changes: 41 additions & 1 deletion plugins/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"math"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -35,6 +36,8 @@ type Elasticsearch struct {
OverwriteTemplate bool
ForceDocumentID bool `toml:"force_document_id"`
MajorReleaseNumber int
FloatHandling string `toml:"float_handling"`
FloatReplacement float64 `toml:"float_replacement_value"`
Log telegraf.Logger `toml:"-"`
tls.ClientConfig

Expand Down Expand Up @@ -95,6 +98,15 @@ var sampleConfig = `
## If set to true a unique ID hash will be sent as sha256(concat(timestamp,measurement,series-hash)) string
## it will enable data resend and update metric points avoiding duplicated metrics with diferent id's
force_document_id = false

## Specifies the handling of NaN and Inf values.
## This option can have the following values:
## none -- do not modify field-values (default); will produce an error if NaNs or infs are encountered
## drop -- drop fields containing NaNs or infs
## replace -- replace with the value in "float_replacement_value" (default: 0.0)
## NaNs and inf will be replaced with the given number, -inf with the negative of that number
# float_handling = "none"
# float_replacement_value = 0.0
`

const telegrafTemplate = `
Expand Down Expand Up @@ -177,6 +189,15 @@ func (a *Elasticsearch) Connect() error {
return fmt.Errorf("elasticsearch urls or index_name is not defined")
}

// Determine if we should process NaN and inf values
switch a.FloatHandling {
case "", "none":
a.FloatHandling = "none"
case "drop", "replace":
default:
return fmt.Errorf("invalid float_handling type %q", a.FloatHandling)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(a.Timeout))
defer cancel()

Expand Down Expand Up @@ -278,12 +299,31 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
// to send the metric to the correct time-based index
indexName := a.GetIndexName(a.IndexName, metric.Time(), a.TagKeys, metric.Tags())

// Handle NaN and inf field-values
fields := make(map[string]interface{})
for k, value := range metric.Fields() {
v, ok := value.(float64)
if !ok || a.FloatHandling == "none" || !(math.IsNaN(v) || math.IsInf(v, 0)) {
fields[k] = value
continue
}
if a.FloatHandling == "drop" {
continue
}

if math.IsNaN(v) || math.IsInf(v, 1) {
fields[k] = a.FloatReplacement
} else {
fields[k] = -a.FloatReplacement
}
}

m := make(map[string]interface{})

m["@timestamp"] = metric.Time()
m["measurement_name"] = name
m["tag"] = metric.Tags()
m[name] = metric.Fields()
m[name] = fields

br := elastic.NewBulkIndexRequest().Index(indexName).Doc(m)

Expand Down
149 changes: 147 additions & 2 deletions plugins/outputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package elasticsearch

import (
"context"
"math"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -41,6 +43,149 @@ func TestConnectAndWriteIntegration(t *testing.T) {
require.NoError(t, err)
}

func TestConnectAndWriteMetricWithNaNValueEmpty(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}

e := &Elasticsearch{
URLs: urls,
IndexName: "test-%Y.%m.%d",
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
Log: testutil.Logger{},
}

metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}

// Verify that we can connect to Elasticsearch
err := e.Connect()
require.NoError(t, err)

// Verify that we can fail for metric with unhandled NaN/inf/-inf values
for _, m := range metrics {
err = e.Write([]telegraf.Metric{m})
require.Error(t, err, "error sending bulk request to Elasticsearch: json: unsupported value: NaN")
}
}

func TestConnectAndWriteMetricWithNaNValueNone(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}

e := &Elasticsearch{
URLs: urls,
IndexName: "test-%Y.%m.%d",
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
FloatHandling: "none",
Log: testutil.Logger{},
}

metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}

// Verify that we can connect to Elasticsearch
err := e.Connect()
require.NoError(t, err)

// Verify that we can fail for metric with unhandled NaN/inf/-inf values
for _, m := range metrics {
err = e.Write([]telegraf.Metric{m})
require.Error(t, err, "error sending bulk request to Elasticsearch: json: unsupported value: NaN")
}
}

func TestConnectAndWriteMetricWithNaNValueDrop(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}

e := &Elasticsearch{
URLs: urls,
IndexName: "test-%Y.%m.%d",
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
FloatHandling: "drop",
Log: testutil.Logger{},
}

metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}

// Verify that we can connect to Elasticsearch
err := e.Connect()
require.NoError(t, err)

// Verify that we can fail for metric with unhandled NaN/inf/-inf values
for _, m := range metrics {
err = e.Write([]telegraf.Metric{m})
require.NoError(t, err)
}
}

func TestConnectAndWriteMetricWithNaNValueReplacement(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}

e := &Elasticsearch{
URLs: urls,
IndexName: "test-%Y.%m.%d",
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
FloatHandling: "3.1415",
Log: testutil.Logger{},
}

metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}

// Verify that we can connect to Elasticsearch
err := e.Connect()
require.NoError(t, err)

// Verify that we can fail for metric with unhandled NaN/inf/-inf values
for _, m := range metrics {
err = e.Write([]telegraf.Metric{m})
require.NoError(t, err)
}
}

func TestTemplateManagementEmptyTemplateIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
Expand Down Expand Up @@ -121,7 +266,7 @@ func TestGetTagKeys(t *testing.T) {
Log: testutil.Logger{},
}

var tests = []struct {
tests := []struct {
IndexName string
ExpectedIndexName string
ExpectedTagKeys []string
Expand Down Expand Up @@ -181,7 +326,7 @@ func TestGetIndexName(t *testing.T) {
Log: testutil.Logger{},
}

var tests = []struct {
tests := []struct {
EventTime time.Time
Tags map[string]string
TagKeys []string
Expand Down