Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(serializers.json): Support serializing JSON nested in string fields #12260

Merged
merged 5 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,8 @@ func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error)
c.getFieldDuration(tbl, "json_timestamp_units", &sc.TimestampUnits)
c.getFieldString(tbl, "json_timestamp_format", &sc.TimestampFormat)
c.getFieldString(tbl, "json_transformation", &sc.Transformation)
c.getFieldStringSlice(tbl, "json_nested_fields_include", &sc.JSONNestedFieldInclude)
c.getFieldStringSlice(tbl, "json_nested_fields_exclude", &sc.JSONNestedFieldExclude)

c.getFieldBool(tbl, "splunkmetric_hec_routing", &sc.HecRouting)
c.getFieldBool(tbl, "splunkmetric_multimetric", &sc.SplunkmetricMultiMetric)
Expand Down Expand Up @@ -1276,6 +1278,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
"graphite_tag_sanitize_mode", "graphite_tag_support", "graphite_separator",
"influx_max_line_bytes", "influx_sort_fields", "influx_uint_support",
"json_timestamp_format", "json_timestamp_units", "json_transformation",
"json_nested_fields_include", "json_nested_fields_exclude",
"prometheus_export_timestamp", "prometheus_sort_metrics", "prometheus_string_as_label",
"prometheus_compact_encoding",
"splunkmetric_hec_routing", "splunkmetric_multimetric", "splunkmetric_omit_event_tag",
Expand Down
5 changes: 4 additions & 1 deletion plugins/outputs/azure_data_explorer/azure_data_explorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,10 @@ func (adx *AzureDataExplorer) Init() error {
return fmt.Errorf("unknown ingestion type %q", adx.IngestionType)
}

serializer, err := json.NewSerializer(time.Nanosecond, time.RFC3339Nano, "")
serializer, err := json.NewSerializer(json.FormatConfig{
TimestampUnits: time.Nanosecond,
TimestampFormat: time.RFC3339Nano,
})
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ func TestWrite(t *testing.T) {

for _, tC := range testCases {
t.Run(tC.name, func(t *testing.T) {
serializer, err := telegrafJson.NewSerializer(time.Second, "", "")
serializer, err := telegrafJson.NewSerializer(
telegrafJson.FormatConfig{
TimestampUnits: time.Second,
})
require.NoError(t, err)

plugin := AzureDataExplorer{
Expand Down Expand Up @@ -264,7 +267,7 @@ func TestWriteWithType(t *testing.T) {
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
serializer, err := telegrafJson.NewSerializer(time.Second, "", "")
serializer, err := telegrafJson.NewSerializer(telegrafJson.FormatConfig{TimestampUnits: time.Second})
require.NoError(t, err)
for tableName, jsonValue := range testCase.tableNameToExpectedResult {
ingestionType := "queued"
Expand Down
4 changes: 2 additions & 2 deletions plugins/outputs/event_hubs/event_hubs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (eh *mockEventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIt
/* End wrapper interface */

func TestInitAndWrite(t *testing.T) {
serializer, err := json.NewSerializer(time.Second, "", "")
serializer, err := json.NewSerializer(json.FormatConfig{TimestampUnits: time.Second})
require.NoError(t, err)
mockHub := &mockEventHub{}
e := &EventHubs{
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestInitAndWriteIntegration(t *testing.T) {
testHubCS := os.Getenv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + entity.Name

// Configure the plugin to target the newly created hub
serializer, err := json.NewSerializer(time.Second, "", "")
serializer, err := json.NewSerializer(json.FormatConfig{TimestampUnits: time.Second})
require.NoError(t, err)
e := &EventHubs{
Hub: &eventHub{},
Expand Down
2 changes: 1 addition & 1 deletion plugins/outputs/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ func TestBatchedUnbatched(t *testing.T) {
Method: defaultMethod,
}

jsonSerializer, err := json.NewSerializer(time.Second, "", "")
jsonSerializer, err := json.NewSerializer(json.FormatConfig{TimestampUnits: time.Second})
require.NoError(t, err)
s := map[string]serializers.Serializer{
"influx": influx.NewSerializer(),
Expand Down
6 changes: 5 additions & 1 deletion plugins/outputs/stomp/stomp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ func TestConnectAndWrite(t *testing.T) {
require.NoError(t, err, "failed to start container")
defer container.Terminate()
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s, err := serializers.NewJSONSerializer(10*time.Second, "yyy-dd-mmThh:mm:ss", "")
s, err := serializers.NewJSONSerializer(
&serializers.Config{
TimestampUnits: 10 * time.Second,
TimestampFormat: "yyy-dd-mmThh:mm:ss",
})
require.NoError(t, err)
st := &STOMP{
Host: url,
Expand Down
7 changes: 7 additions & 0 deletions plugins/serializers/json/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ The `json` output data format converts metrics into JSON documents.
## This allows to generate an arbitrary output form based on the metric(s). Please use
## multiline strings (starting and ending with three single-quotes) if needed.
#json_transformation = ""

## Filter for fields that contain nested JSON data.
## The serializer will try to decode matching STRING fields containing
## valid JSON. This is done BEFORE any JSON transformation. The filters
## can contain wildcards.
#json_nested_fields_include = []
#json_nested_fields_exclude = []
```

## Examples
Expand Down
45 changes: 38 additions & 7 deletions plugins/serializers/json/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,47 @@ import (
jsonata "github.com/blues/jsonata-go"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
)

type FormatConfig struct {
TimestampUnits time.Duration
TimestampFormat string
Transformation string
NestedFieldsInclude []string
NestedFieldsExclude []string
}

type Serializer struct {
TimestampUnits time.Duration
TimestampFormat string

transformation *jsonata.Expr
nestedfields filter.Filter
}

func NewSerializer(timestampUnits time.Duration, timestampFormat, transform string) (*Serializer, error) {
func NewSerializer(cfg FormatConfig) (*Serializer, error) {
s := &Serializer{
TimestampUnits: truncateDuration(timestampUnits),
TimestampFormat: timestampFormat,
TimestampUnits: truncateDuration(cfg.TimestampUnits),
TimestampFormat: cfg.TimestampFormat,
}

if transform != "" {
e, err := jsonata.Compile(transform)
if cfg.Transformation != "" {
e, err := jsonata.Compile(cfg.Transformation)
if err != nil {
return nil, err
}
s.transformation = e
}

if len(cfg.NestedFieldsInclude) > 0 || len(cfg.NestedFieldsExclude) > 0 {
f, err := filter.NewIncludeExcludeFilter(cfg.NestedFieldsInclude, cfg.NestedFieldsExclude)
if err != nil {
return nil, err
}
s.nestedfields = f
}

return s, nil
}

Expand Down Expand Up @@ -99,13 +117,26 @@ func (s *Serializer) createObject(metric telegraf.Metric) map[string]interface{}

fields := make(map[string]interface{}, len(metric.FieldList()))
for _, field := range metric.FieldList() {
if fv, ok := field.Value.(float64); ok {
val := field.Value
switch fv := field.Value.(type) {
case float64:
// JSON does not support these special values
if math.IsNaN(fv) || math.IsInf(fv, 0) {
continue
}
case string:
// Check for nested fields if any
if s.nestedfields != nil && s.nestedfields.Match(field.Key) {
bv := []byte(fv)
if json.Valid(bv) {
var nested interface{}
if err := json.Unmarshal(bv, &nested); err == nil {
val = nested
}
}
}
}
fields[field.Key] = field.Value
fields[field.Key] = val
}
m["fields"] = fields

Expand Down
103 changes: 87 additions & 16 deletions plugins/serializers/json/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestSerializeMetricFloat(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)

s, err := NewSerializer(0, "", "")
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
buf, err := s.Serialize(m)
require.NoError(t, err)
Expand Down Expand Up @@ -90,7 +90,10 @@ func TestSerialize_TimestampUnits(t *testing.T) {
},
time.Unix(1525478795, 123456789),
)
s, err := NewSerializer(tt.timestampUnits, tt.timestampFormat, "")
s, err := NewSerializer(FormatConfig{
TimestampUnits: tt.timestampUnits,
TimestampFormat: tt.timestampFormat,
})
require.NoError(t, err)
actual, err := s.Serialize(m)
require.NoError(t, err)
Expand All @@ -109,7 +112,7 @@ func TestSerializeMetricInt(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)

s, err := NewSerializer(0, "", "")
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
buf, err := s.Serialize(m)
require.NoError(t, err)
Expand All @@ -128,7 +131,7 @@ func TestSerializeMetricString(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)

s, err := NewSerializer(0, "", "")
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
buf, err := s.Serialize(m)
require.NoError(t, err)
Expand All @@ -148,7 +151,7 @@ func TestSerializeMultiFields(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)

s, err := NewSerializer(0, "", "")
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
buf, err := s.Serialize(m)
require.NoError(t, err)
Expand All @@ -167,7 +170,7 @@ func TestSerializeMetricWithEscapes(t *testing.T) {
}
m := metric.New("My CPU", tags, fields, now)

s, err := NewSerializer(0, "", "")
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
buf, err := s.Serialize(m)
require.NoError(t, err)
Expand All @@ -187,7 +190,7 @@ func TestSerializeBatch(t *testing.T) {
)

metrics := []telegraf.Metric{m, m}
s, err := NewSerializer(0, "", "")
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
Expand All @@ -211,7 +214,7 @@ func TestSerializeBatchSkipInf(t *testing.T) {
),
}

s, err := NewSerializer(0, "", "")
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
Expand All @@ -230,7 +233,7 @@ func TestSerializeBatchSkipInfAllFields(t *testing.T) {
),
}

s, err := NewSerializer(0, "", "")
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
Expand Down Expand Up @@ -266,7 +269,12 @@ func TestSerializeTransformationNonBatch(t *testing.T) {
expected := expectedArray.([]interface{})

// Serialize
serializer, err := NewSerializer(cfg.TimestampUnits, cfg.TimestampFormat, cfg.Transformation)
serializer, err := NewSerializer(
FormatConfig{
TimestampUnits: cfg.TimestampUnits,
TimestampFormat: cfg.TimestampFormat,
Transformation: cfg.Transformation,
})
require.NoError(t, err)
for i, m := range metrics {
buf, err := serializer.Serialize(m)
Expand All @@ -275,8 +283,6 @@ func TestSerializeTransformationNonBatch(t *testing.T) {
// Compare
var actual interface{}
require.NoError(t, json.Unmarshal(buf, &actual))
fmt.Printf("actual: %v\n", actual)
fmt.Printf("expected: %v\n", expected[i])
require.EqualValuesf(t, expected[i], actual, "mismatch in %d", i)
}
})
Expand Down Expand Up @@ -311,7 +317,12 @@ func TestSerializeTransformationBatch(t *testing.T) {
require.NoError(t, err)

// Serialize
serializer, err := NewSerializer(cfg.TimestampUnits, cfg.TimestampFormat, cfg.Transformation)
serializer, err := NewSerializer(
FormatConfig{
TimestampUnits: cfg.TimestampUnits,
TimestampFormat: cfg.TimestampFormat,
Transformation: cfg.Transformation,
})
require.NoError(t, err)
buf, err := serializer.SerializeBatch(metrics)
require.NoError(t, err)
Expand All @@ -324,10 +335,70 @@ func TestSerializeTransformationBatch(t *testing.T) {
}
}

func TestSerializeNesting(t *testing.T) {
var tests = []struct {
name string
filename string
out string
}{
{
name: "nested fields include",
filename: "testcases/nested_fields_include.conf",
out: "testcases/nested_fields_out.json",
},
{
name: "nested fields exclude",
filename: "testcases/nested_fields_exclude.conf",
out: "testcases/nested_fields_out.json",
},
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
filename := filepath.FromSlash(tt.filename)
cfg, header, err := loadTestConfiguration(filename)
require.NoError(t, err)

// Get the input metrics
metrics, err := testutil.ParseMetricsFrom(header, "Input:", parser)
require.NoError(t, err)
require.Len(t, metrics, 1)

// Get the expectations
expectedArray, err := loadJSON(tt.out)
require.NoError(t, err)
expected := expectedArray.(map[string]interface{})

// Serialize
serializer, err := NewSerializer(
FormatConfig{
TimestampUnits: cfg.TimestampUnits,
TimestampFormat: cfg.TimestampFormat,
Transformation: cfg.Transformation,
NestedFieldsInclude: cfg.JSONNestedFieldsInclude,
NestedFieldsExclude: cfg.JSONNestedFieldsExclude,
})
require.NoError(t, err)

buf, err := serializer.Serialize(metrics[0])
require.NoError(t, err)

// Compare
var actual interface{}
require.NoError(t, json.Unmarshal(buf, &actual))
require.EqualValues(t, expected, actual)
})
}
}

type Config struct {
TimestampUnits time.Duration `toml:"json_timestamp_units"`
TimestampFormat string `toml:"json_timestamp_format"`
Transformation string `toml:"json_transformation"`
TimestampUnits time.Duration `toml:"json_timestamp_units"`
TimestampFormat string `toml:"json_timestamp_format"`
Transformation string `toml:"json_transformation"`
JSONNestedFieldsInclude []string `toml:"json_nested_fields_include"`
JSONNestedFieldsExclude []string `toml:"json_nested_fields_exclude"`
}

func loadTestConfiguration(filename string) (*Config, []string, error) {
Expand Down
6 changes: 6 additions & 0 deletions plugins/serializers/json/testcases/nested_fields_exclude.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Example for decoding fields that contain nested JSON structures.
#
# Input:
# in,host=myhost,type=diagnostic hops=10,latency=1.23,id-1234="{\"address\": \"AB1A\", \"status\": \"online\"}",id-0000="{\"status\": \"offline\"}",id-5678="{\"address\": \"0000\", \"status\": \"online\"}" 1666006350000000000

json_nested_fields_exclude = ["hops", "latency"]
6 changes: 6 additions & 0 deletions plugins/serializers/json/testcases/nested_fields_include.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Example for decoding fields that contain nested JSON structures.
#
# Input:
# in,host=myhost,type=diagnostic hops=10,latency=1.23,id-1234="{\"address\": \"AB1A\", \"status\": \"online\"}",id-0000="{\"status\": \"offline\"}",id-5678="{\"address\": \"0000\", \"status\": \"online\"}" 1666006350000000000

json_nested_fields_include = ["id-*"]
Loading