From bc43115423d80528254bc18836e3a4559ad74000 Mon Sep 17 00:00:00 2001 From: Joseph Sirianni Date: Wed, 4 Aug 2021 14:58:01 -0400 Subject: [PATCH] Csv header delimiter (#370) * support optionally setting the header delimiter * perform type assertion once * changelog for csv parser: support optionally setting the header delimiter * update error test cases. add HeaderDelimiter test cases --- CHANGELOG.md | 1 + docs/operators/csv_parser.md | 45 +++++++++++++++++++++++++ operator/builtin/parser/csv/csv.go | 43 +++++++++++++---------- operator/builtin/parser/csv/csv_test.go | 36 ++++++++++++++++++-- 4 files changed, 105 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 732e3c10c..fba843194 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - File input: Added optional labels for resolved symlink file name and path [PR 364](https://github.com/observIQ/stanza/pull/364) +- CSV Parser: Added optional configuration field `header_delimiter` [PR 370](https://github.com/observIQ/stanza/pull/370) ## 1.1.5 - 2021-07-15 diff --git a/docs/operators/csv_parser.md b/docs/operators/csv_parser.md index 2f44643b7..d1949a9bc 100644 --- a/docs/operators/csv_parser.md +++ b/docs/operators/csv_parser.md @@ -9,6 +9,7 @@ The `csv_parser` operator parses the string-type field selected by `parse_from` | `id` | `csv_parser` | A unique identifier for the operator | | `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries | | `header` | required | A string of delimited field names. The values in the delimited header will be used as keys | +| `header_delimiter` | value of delimiter | A character that will be used as a delimiter for the header. Values `\r` and `\n` cannot be used as a delimiter | | `delimiter` | `,` | A character that will be used as a delimiter. Values `\r` and `\n` cannot be used as a delimiter | | `parse_from` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed | | `parse_to` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed | @@ -144,6 +145,50 @@ Configuration: } ``` + + + + +#### Parse the field `message` with differing delimiters for header and fields + +Configuration: + +```yaml +- type: csv_parser + parse_from: message + delimiter: "+" + header_delimiter: "," + header: 'id,severity,message' +``` + + + + + +
Input record Output record
+ +```json +{ + "timestamp": "", + "record": { + "message": "1+debug+\"\"Debug Message\"\"" + } +} +``` + + + +```json +{ + "timestamp": "", + "record": { + "id": "1", + "severity": "debug", + "message": "\"Debug Message\"" + } +} +``` +
\ No newline at end of file diff --git a/operator/builtin/parser/csv/csv.go b/operator/builtin/parser/csv/csv.go index fa0da6d81..9b53926a1 100644 --- a/operator/builtin/parser/csv/csv.go +++ b/operator/builtin/parser/csv/csv.go @@ -27,8 +27,9 @@ func NewCSVParserConfig(operatorID string) *CSVParserConfig { type CSVParserConfig struct { helper.ParserConfig `yaml:",inline"` - Header string `json:"header" yaml:"header"` - FieldDelimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"` + Header string `json:"header" yaml:"header"` + HeaderDelimiter string `json:"header_delimiter,omitempty" yaml:"header_delimiter,omitempty"` + FieldDelimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"` } // Build will build a csv parser operator. @@ -52,17 +53,24 @@ func (c CSVParserConfig) Build(context operator.BuildContext) ([]operator.Operat fieldDelimiter := []rune(c.FieldDelimiter)[0] - if !strings.Contains(c.Header, c.FieldDelimiter) { - return nil, fmt.Errorf("missing field delimiter in header") + if c.HeaderDelimiter == "" { + c.HeaderDelimiter = c.FieldDelimiter } - numFields := len(strings.Split(c.Header, c.FieldDelimiter)) + headerDelimiter := []rune(c.HeaderDelimiter)[0] + + if !strings.Contains(c.Header, c.HeaderDelimiter) { + return nil, fmt.Errorf("missing header delimiter in header") + } + + numFields := len(strings.Split(c.Header, c.HeaderDelimiter)) csvParser := &CSVParser{ - ParserOperator: parserOperator, - header: c.Header, - fieldDelimiter: fieldDelimiter, - numFields: numFields, + ParserOperator: parserOperator, + header: c.Header, + headerDelimiter: headerDelimiter, + fieldDelimiter: fieldDelimiter, + numFields: numFields, } return []operator.Operator{csvParser}, nil @@ -71,9 +79,10 @@ func (c CSVParserConfig) Build(context operator.BuildContext) ([]operator.Operat // CSVParser is an operator that parses csv in an entry. type CSVParser struct { helper.ParserOperator - header string - fieldDelimiter rune - numFields int + header string + headerDelimiter rune + fieldDelimiter rune + numFields int } // Process will parse an entry for csv. @@ -84,17 +93,15 @@ func (r *CSVParser) Process(ctx context.Context, entry *entry.Entry) error { // parse will parse a value using the supplied csv header. func (r *CSVParser) parse(value interface{}) (interface{}, error) { var csvLine string - switch value.(type) { + switch t := value.(type) { case string: - csvLine += value.(string) + csvLine += t case []byte: - csvLine += string(value.([]byte)) + csvLine += string(t) default: return nil, fmt.Errorf("type '%T' cannot be parsed as csv", value) } - delimiterStr := string([]rune{r.fieldDelimiter}) - reader := csvparser.NewReader(strings.NewReader(csvLine)) reader.Comma = r.fieldDelimiter reader.FieldsPerRecord = r.numFields @@ -110,7 +117,7 @@ func (r *CSVParser) parse(value interface{}) (interface{}, error) { return nil, err } - for i, key := range strings.Split(r.header, delimiterStr) { + for i, key := range strings.Split(r.header, string([]rune{r.headerDelimiter})) { parsedValues[key] = record[i] } } diff --git a/operator/builtin/parser/csv/csv_test.go b/operator/builtin/parser/csv/csv_test.go index 8d21e08af..03a5955f9 100644 --- a/operator/builtin/parser/csv/csv_test.go +++ b/operator/builtin/parser/csv/csv_test.go @@ -169,6 +169,19 @@ func TestParserCSV(t *testing.T) { "position": "agent", }, }, + { + "header-delimiter", + func(p *CSVParserConfig) { + p.Header = "name+sev+msg" + p.HeaderDelimiter = "+" + }, + "stanza,INFO,started agent", + map[string]interface{}{ + "name": "stanza", + "sev": "INFO", + "msg": "started agent", + }, + }, } for _, tc := range cases { @@ -270,7 +283,7 @@ func TestBuildParserCSV(t *testing.T) { c.Header = "name" _, err := c.Build(testutil.NewBuildContext(t)) require.Error(t, err) - require.Contains(t, err.Error(), "missing field delimiter in header") + require.Contains(t, err.Error(), "missing header delimiter in header") }) t.Run("InvalidHeaderFieldWrongDelimiter", func(t *testing.T) { @@ -286,6 +299,25 @@ func TestBuildParserCSV(t *testing.T) { c.FieldDelimiter = ":" _, err := c.Build(testutil.NewBuildContext(t)) require.Error(t, err) - require.Contains(t, err.Error(), "missing field delimiter in header") + require.Contains(t, err.Error(), "missing header delimiter in header") + }) + + t.Run("HeaderDelimiter", func(t *testing.T) { + c := newBasicCSVParser() + c.Header = "name+position+number" + c.HeaderDelimiter = "+" + c.FieldDelimiter = ":" + _, err := c.Build(testutil.NewBuildContext(t)) + require.NoError(t, err) + }) + + t.Run("InvalidHeaderDelimiter", func(t *testing.T) { + c := newBasicCSVParser() + c.Header = "name,position,number" + c.HeaderDelimiter = "+" + c.FieldDelimiter = ":" + _, err := c.Build(testutil.NewBuildContext(t)) + require.Error(t, err) + require.Contains(t, err.Error(), "missing header delimiter in header") }) }