diff --git a/config/config.go b/config/config.go index f8d44642b7176..ba870ae90eff0 100644 --- a/config/config.go +++ b/config/config.go @@ -1196,6 +1196,8 @@ func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error) c.getFieldDuration(tbl, "json_timestamp_units", &sc.TimestampUnits) c.getFieldString(tbl, "json_timestamp_format", &sc.TimestampFormat) c.getFieldString(tbl, "json_transformation", &sc.Transformation) + c.getFieldStringSlice(tbl, "json_nested_fields_include", &sc.JSONNestedFieldInclude) + c.getFieldStringSlice(tbl, "json_nested_fields_exclude", &sc.JSONNestedFieldExclude) c.getFieldBool(tbl, "splunkmetric_hec_routing", &sc.HecRouting) c.getFieldBool(tbl, "splunkmetric_multimetric", &sc.SplunkmetricMultiMetric) @@ -1276,6 +1278,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error { "graphite_tag_sanitize_mode", "graphite_tag_support", "graphite_separator", "influx_max_line_bytes", "influx_sort_fields", "influx_uint_support", "json_timestamp_format", "json_timestamp_units", "json_transformation", + "json_nested_fields_include", "json_nested_fields_exclude", "prometheus_export_timestamp", "prometheus_sort_metrics", "prometheus_string_as_label", "prometheus_compact_encoding", "splunkmetric_hec_routing", "splunkmetric_multimetric", "splunkmetric_omit_event_tag", diff --git a/plugins/outputs/azure_data_explorer/azure_data_explorer.go b/plugins/outputs/azure_data_explorer/azure_data_explorer.go index 5e58910e9e47b..002ca236d2b93 100644 --- a/plugins/outputs/azure_data_explorer/azure_data_explorer.go +++ b/plugins/outputs/azure_data_explorer/azure_data_explorer.go @@ -323,7 +323,10 @@ func (adx *AzureDataExplorer) Init() error { return fmt.Errorf("unknown ingestion type %q", adx.IngestionType) } - serializer, err := json.NewSerializer(time.Nanosecond, time.RFC3339Nano, "") + serializer, err := json.NewSerializer(json.FormatConfig{ + TimestampUnits: time.Nanosecond, + TimestampFormat: time.RFC3339Nano, + }) if err != nil { return err } diff --git a/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go b/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go index 20d4068b15e08..8d211f0f0e15b 100644 --- a/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go +++ b/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go @@ -140,7 +140,10 @@ func TestWrite(t *testing.T) { for _, tC := range testCases { t.Run(tC.name, func(t *testing.T) { - serializer, err := telegrafJson.NewSerializer(time.Second, "", "") + serializer, err := telegrafJson.NewSerializer( + telegrafJson.FormatConfig{ + TimestampUnits: time.Second, + }) require.NoError(t, err) plugin := AzureDataExplorer{ @@ -264,7 +267,7 @@ func TestWriteWithType(t *testing.T) { } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - serializer, err := telegrafJson.NewSerializer(time.Second, "", "") + serializer, err := telegrafJson.NewSerializer(telegrafJson.FormatConfig{TimestampUnits: time.Second}) require.NoError(t, err) for tableName, jsonValue := range testCase.tableNameToExpectedResult { ingestionType := "queued" diff --git a/plugins/outputs/event_hubs/event_hubs_test.go b/plugins/outputs/event_hubs/event_hubs_test.go index 661da7cf44ffc..efa5b5af908d9 100644 --- a/plugins/outputs/event_hubs/event_hubs_test.go +++ b/plugins/outputs/event_hubs/event_hubs_test.go @@ -42,7 +42,7 @@ func (eh *mockEventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIt /* End wrapper interface */ func TestInitAndWrite(t *testing.T) { - serializer, err := json.NewSerializer(time.Second, "", "") + serializer, err := json.NewSerializer(json.FormatConfig{TimestampUnits: time.Second}) require.NoError(t, err) mockHub := &mockEventHub{} e := &EventHubs{ @@ -101,7 +101,7 @@ func TestInitAndWriteIntegration(t *testing.T) { testHubCS := os.Getenv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + entity.Name // Configure the plugin to target the newly created hub - serializer, err := json.NewSerializer(time.Second, "", "") + serializer, err := json.NewSerializer(json.FormatConfig{TimestampUnits: time.Second}) require.NoError(t, err) e := &EventHubs{ Hub: &eventHub{}, diff --git a/plugins/outputs/http/http_test.go b/plugins/outputs/http/http_test.go index b98dd7eb4f1ce..d57cde322ed68 100644 --- a/plugins/outputs/http/http_test.go +++ b/plugins/outputs/http/http_test.go @@ -661,7 +661,7 @@ func TestBatchedUnbatched(t *testing.T) { Method: defaultMethod, } - jsonSerializer, err := json.NewSerializer(time.Second, "", "") + jsonSerializer, err := json.NewSerializer(json.FormatConfig{TimestampUnits: time.Second}) require.NoError(t, err) s := map[string]serializers.Serializer{ "influx": influx.NewSerializer(), diff --git a/plugins/outputs/stomp/stomp_test.go b/plugins/outputs/stomp/stomp_test.go index aa18a7561b4ec..2b33225d7e066 100644 --- a/plugins/outputs/stomp/stomp_test.go +++ b/plugins/outputs/stomp/stomp_test.go @@ -28,7 +28,11 @@ func TestConnectAndWrite(t *testing.T) { require.NoError(t, err, "failed to start container") defer container.Terminate() var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]) - s, err := serializers.NewJSONSerializer(10*time.Second, "yyy-dd-mmThh:mm:ss", "") + s, err := serializers.NewJSONSerializer( + &serializers.Config{ + TimestampUnits: 10 * time.Second, + TimestampFormat: "yyy-dd-mmThh:mm:ss", + }) require.NoError(t, err) st := &STOMP{ Host: url, diff --git a/plugins/serializers/json/README.md b/plugins/serializers/json/README.md index 5284e7a610645..a0fe4d20a5289 100644 --- a/plugins/serializers/json/README.md +++ b/plugins/serializers/json/README.md @@ -33,6 +33,13 @@ The `json` output data format converts metrics into JSON documents. ## This allows to generate an arbitrary output form based on the metric(s). Please use ## multiline strings (starting and ending with three single-quotes) if needed. #json_transformation = "" + + ## Filter for fields that contain nested JSON data. + ## The serializer will try to decode matching STRING fields containing + ## valid JSON. This is done BEFORE any JSON transformation. The filters + ## can contain wildcards. + #json_nested_fields_include = [] + #json_nested_fields_exclude = [] ``` ## Examples diff --git a/plugins/serializers/json/json.go b/plugins/serializers/json/json.go index 222be9ea2340d..5c36fba53cf1a 100644 --- a/plugins/serializers/json/json.go +++ b/plugins/serializers/json/json.go @@ -10,29 +10,47 @@ import ( jsonata "github.com/blues/jsonata-go" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" ) +type FormatConfig struct { + TimestampUnits time.Duration + TimestampFormat string + Transformation string + NestedFieldsInclude []string + NestedFieldsExclude []string +} + type Serializer struct { TimestampUnits time.Duration TimestampFormat string transformation *jsonata.Expr + nestedfields filter.Filter } -func NewSerializer(timestampUnits time.Duration, timestampFormat, transform string) (*Serializer, error) { +func NewSerializer(cfg FormatConfig) (*Serializer, error) { s := &Serializer{ - TimestampUnits: truncateDuration(timestampUnits), - TimestampFormat: timestampFormat, + TimestampUnits: truncateDuration(cfg.TimestampUnits), + TimestampFormat: cfg.TimestampFormat, } - if transform != "" { - e, err := jsonata.Compile(transform) + if cfg.Transformation != "" { + e, err := jsonata.Compile(cfg.Transformation) if err != nil { return nil, err } s.transformation = e } + if len(cfg.NestedFieldsInclude) > 0 || len(cfg.NestedFieldsExclude) > 0 { + f, err := filter.NewIncludeExcludeFilter(cfg.NestedFieldsInclude, cfg.NestedFieldsExclude) + if err != nil { + return nil, err + } + s.nestedfields = f + } + return s, nil } @@ -99,13 +117,26 @@ func (s *Serializer) createObject(metric telegraf.Metric) map[string]interface{} fields := make(map[string]interface{}, len(metric.FieldList())) for _, field := range metric.FieldList() { - if fv, ok := field.Value.(float64); ok { + val := field.Value + switch fv := field.Value.(type) { + case float64: // JSON does not support these special values if math.IsNaN(fv) || math.IsInf(fv, 0) { continue } + case string: + // Check for nested fields if any + if s.nestedfields != nil && s.nestedfields.Match(field.Key) { + bv := []byte(fv) + if json.Valid(bv) { + var nested interface{} + if err := json.Unmarshal(bv, &nested); err == nil { + val = nested + } + } + } } - fields[field.Key] = field.Value + fields[field.Key] = val } m["fields"] = fields diff --git a/plugins/serializers/json/json_test.go b/plugins/serializers/json/json_test.go index df8f4ac292946..a2d3c40dd446c 100644 --- a/plugins/serializers/json/json_test.go +++ b/plugins/serializers/json/json_test.go @@ -29,7 +29,7 @@ func TestSerializeMetricFloat(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s, err := NewSerializer(0, "", "") + s, err := NewSerializer(FormatConfig{}) require.NoError(t, err) buf, err := s.Serialize(m) require.NoError(t, err) @@ -90,7 +90,10 @@ func TestSerialize_TimestampUnits(t *testing.T) { }, time.Unix(1525478795, 123456789), ) - s, err := NewSerializer(tt.timestampUnits, tt.timestampFormat, "") + s, err := NewSerializer(FormatConfig{ + TimestampUnits: tt.timestampUnits, + TimestampFormat: tt.timestampFormat, + }) require.NoError(t, err) actual, err := s.Serialize(m) require.NoError(t, err) @@ -109,7 +112,7 @@ func TestSerializeMetricInt(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s, err := NewSerializer(0, "", "") + s, err := NewSerializer(FormatConfig{}) require.NoError(t, err) buf, err := s.Serialize(m) require.NoError(t, err) @@ -128,7 +131,7 @@ func TestSerializeMetricString(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s, err := NewSerializer(0, "", "") + s, err := NewSerializer(FormatConfig{}) require.NoError(t, err) buf, err := s.Serialize(m) require.NoError(t, err) @@ -148,7 +151,7 @@ func TestSerializeMultiFields(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s, err := NewSerializer(0, "", "") + s, err := NewSerializer(FormatConfig{}) require.NoError(t, err) buf, err := s.Serialize(m) require.NoError(t, err) @@ -167,7 +170,7 @@ func TestSerializeMetricWithEscapes(t *testing.T) { } m := metric.New("My CPU", tags, fields, now) - s, err := NewSerializer(0, "", "") + s, err := NewSerializer(FormatConfig{}) require.NoError(t, err) buf, err := s.Serialize(m) require.NoError(t, err) @@ -187,7 +190,7 @@ func TestSerializeBatch(t *testing.T) { ) metrics := []telegraf.Metric{m, m} - s, err := NewSerializer(0, "", "") + s, err := NewSerializer(FormatConfig{}) require.NoError(t, err) buf, err := s.SerializeBatch(metrics) require.NoError(t, err) @@ -211,7 +214,7 @@ func TestSerializeBatchSkipInf(t *testing.T) { ), } - s, err := NewSerializer(0, "", "") + s, err := NewSerializer(FormatConfig{}) require.NoError(t, err) buf, err := s.SerializeBatch(metrics) require.NoError(t, err) @@ -230,7 +233,7 @@ func TestSerializeBatchSkipInfAllFields(t *testing.T) { ), } - s, err := NewSerializer(0, "", "") + s, err := NewSerializer(FormatConfig{}) require.NoError(t, err) buf, err := s.SerializeBatch(metrics) require.NoError(t, err) @@ -266,7 +269,12 @@ func TestSerializeTransformationNonBatch(t *testing.T) { expected := expectedArray.([]interface{}) // Serialize - serializer, err := NewSerializer(cfg.TimestampUnits, cfg.TimestampFormat, cfg.Transformation) + serializer, err := NewSerializer( + FormatConfig{ + TimestampUnits: cfg.TimestampUnits, + TimestampFormat: cfg.TimestampFormat, + Transformation: cfg.Transformation, + }) require.NoError(t, err) for i, m := range metrics { buf, err := serializer.Serialize(m) @@ -275,8 +283,6 @@ func TestSerializeTransformationNonBatch(t *testing.T) { // Compare var actual interface{} require.NoError(t, json.Unmarshal(buf, &actual)) - fmt.Printf("actual: %v\n", actual) - fmt.Printf("expected: %v\n", expected[i]) require.EqualValuesf(t, expected[i], actual, "mismatch in %d", i) } }) @@ -311,7 +317,12 @@ func TestSerializeTransformationBatch(t *testing.T) { require.NoError(t, err) // Serialize - serializer, err := NewSerializer(cfg.TimestampUnits, cfg.TimestampFormat, cfg.Transformation) + serializer, err := NewSerializer( + FormatConfig{ + TimestampUnits: cfg.TimestampUnits, + TimestampFormat: cfg.TimestampFormat, + Transformation: cfg.Transformation, + }) require.NoError(t, err) buf, err := serializer.SerializeBatch(metrics) require.NoError(t, err) @@ -324,10 +335,70 @@ func TestSerializeTransformationBatch(t *testing.T) { } } +func TestSerializeNesting(t *testing.T) { + var tests = []struct { + name string + filename string + out string + }{ + { + name: "nested fields include", + filename: "testcases/nested_fields_include.conf", + out: "testcases/nested_fields_out.json", + }, + { + name: "nested fields exclude", + filename: "testcases/nested_fields_exclude.conf", + out: "testcases/nested_fields_out.json", + }, + } + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filename := filepath.FromSlash(tt.filename) + cfg, header, err := loadTestConfiguration(filename) + require.NoError(t, err) + + // Get the input metrics + metrics, err := testutil.ParseMetricsFrom(header, "Input:", parser) + require.NoError(t, err) + require.Len(t, metrics, 1) + + // Get the expectations + expectedArray, err := loadJSON(tt.out) + require.NoError(t, err) + expected := expectedArray.(map[string]interface{}) + + // Serialize + serializer, err := NewSerializer( + FormatConfig{ + TimestampUnits: cfg.TimestampUnits, + TimestampFormat: cfg.TimestampFormat, + Transformation: cfg.Transformation, + NestedFieldsInclude: cfg.JSONNestedFieldsInclude, + NestedFieldsExclude: cfg.JSONNestedFieldsExclude, + }) + require.NoError(t, err) + + buf, err := serializer.Serialize(metrics[0]) + require.NoError(t, err) + + // Compare + var actual interface{} + require.NoError(t, json.Unmarshal(buf, &actual)) + require.EqualValues(t, expected, actual) + }) + } +} + type Config struct { - TimestampUnits time.Duration `toml:"json_timestamp_units"` - TimestampFormat string `toml:"json_timestamp_format"` - Transformation string `toml:"json_transformation"` + TimestampUnits time.Duration `toml:"json_timestamp_units"` + TimestampFormat string `toml:"json_timestamp_format"` + Transformation string `toml:"json_transformation"` + JSONNestedFieldsInclude []string `toml:"json_nested_fields_include"` + JSONNestedFieldsExclude []string `toml:"json_nested_fields_exclude"` } func loadTestConfiguration(filename string) (*Config, []string, error) { diff --git a/plugins/serializers/json/testcases/nested_fields_exclude.conf b/plugins/serializers/json/testcases/nested_fields_exclude.conf new file mode 100644 index 0000000000000..d3531b4e2f62b --- /dev/null +++ b/plugins/serializers/json/testcases/nested_fields_exclude.conf @@ -0,0 +1,6 @@ +# Example for decoding fields that contain nested JSON structures. +# +# Input: +# in,host=myhost,type=diagnostic hops=10,latency=1.23,id-1234="{\"address\": \"AB1A\", \"status\": \"online\"}",id-0000="{\"status\": \"offline\"}",id-5678="{\"address\": \"0000\", \"status\": \"online\"}" 1666006350000000000 + +json_nested_fields_exclude = ["hops", "latency"] \ No newline at end of file diff --git a/plugins/serializers/json/testcases/nested_fields_include.conf b/plugins/serializers/json/testcases/nested_fields_include.conf new file mode 100644 index 0000000000000..9f2540389a21d --- /dev/null +++ b/plugins/serializers/json/testcases/nested_fields_include.conf @@ -0,0 +1,6 @@ +# Example for decoding fields that contain nested JSON structures. +# +# Input: +# in,host=myhost,type=diagnostic hops=10,latency=1.23,id-1234="{\"address\": \"AB1A\", \"status\": \"online\"}",id-0000="{\"status\": \"offline\"}",id-5678="{\"address\": \"0000\", \"status\": \"online\"}" 1666006350000000000 + +json_nested_fields_include = ["id-*"] \ No newline at end of file diff --git a/plugins/serializers/json/testcases/nested_fields_out.json b/plugins/serializers/json/testcases/nested_fields_out.json new file mode 100644 index 0000000000000..4c3f772802bae --- /dev/null +++ b/plugins/serializers/json/testcases/nested_fields_out.json @@ -0,0 +1,23 @@ +{ + "fields": { + "id-1234": { + "address": "AB1A", + "status": "online" + }, + "id-0000": { + "status": "offline" + }, + "id-5678": { + "address": "0000", + "status": "online" + }, + "hops": 10, + "latency": 1.23 + }, + "name": "in", + "tags": { + "host": "myhost", + "type": "diagnostic" + }, + "timestamp": 1666006350 +} \ No newline at end of file diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index 776b1737bc4e8..59372d2a379f5 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -104,6 +104,10 @@ type Config struct { // Transformation as JSONata expression to use for JSON formatted output Transformation string `toml:"transformation"` + // Field filter for interpreting data as nested JSON for JSON serializer + JSONNestedFieldInclude []string `toml:"json_nested_fields_include"` + JSONNestedFieldExclude []string `toml:"json_nested_fields_exclude"` + // Include HEC routing fields for splunkmetric output HecRouting bool `toml:"hec_routing"` @@ -157,7 +161,7 @@ func NewSerializer(config *Config) (Serializer, error) { config.Templates, ) case "json": - serializer, err = NewJSONSerializer(config.TimestampUnits, config.TimestampFormat, config.Transformation) + serializer, err = NewJSONSerializer(config) case "splunkmetric": serializer, err = NewSplunkmetricSerializer(config.HecRouting, config.SplunkmetricMultiMetric, config.SplunkmetricOmitEventTag), nil case "nowmetric": @@ -232,8 +236,14 @@ func NewWavefrontSerializer(prefix string, useStrict bool, sourceOverride []stri return wavefront.NewSerializer(prefix, useStrict, sourceOverride, disablePrefixConversions) } -func NewJSONSerializer(timestampUnits time.Duration, timestampFormat, transform string) (Serializer, error) { - return json.NewSerializer(timestampUnits, timestampFormat, transform) +func NewJSONSerializer(config *Config) (Serializer, error) { + return json.NewSerializer(json.FormatConfig{ + TimestampUnits: config.TimestampUnits, + TimestampFormat: config.TimestampFormat, + Transformation: config.Transformation, + NestedFieldsInclude: config.JSONNestedFieldInclude, + NestedFieldsExclude: config.JSONNestedFieldExclude, + }) } func NewCarbon2Serializer(carbon2format string, carbon2SanitizeReplaceChar string) (Serializer, error) {