diff --git a/CHANGELOG.md b/CHANGELOG.md
index 732e3c10c..fba843194 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- File input: Added optional labels for resolved symlink file name and path [PR 364](https://github.com/observIQ/stanza/pull/364)
+- CSV Parser: Added optional configuration field `header_delimiter` [PR 370](https://github.com/observIQ/stanza/pull/370)
## 1.1.5 - 2021-07-15
diff --git a/docs/operators/csv_parser.md b/docs/operators/csv_parser.md
index 2f44643b7..d1949a9bc 100644
--- a/docs/operators/csv_parser.md
+++ b/docs/operators/csv_parser.md
@@ -9,6 +9,7 @@ The `csv_parser` operator parses the string-type field selected by `parse_from`
| `id` | `csv_parser` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `header` | required | A string of delimited field names. The values in the delimited header will be used as keys |
+| `header_delimiter` | value of delimiter | A character that will be used as a delimiter for the header. Values `\r` and `\n` cannot be used as a delimiter |
| `delimiter` | `,` | A character that will be used as a delimiter. Values `\r` and `\n` cannot be used as a delimiter |
| `parse_from` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed |
| `parse_to` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed |
@@ -144,6 +145,50 @@ Configuration:
}
```
+
+
+
+
+#### Parse the field `message` with differing delimiters for header and fields
+
+Configuration:
+
+```yaml
+- type: csv_parser
+ parse_from: message
+ delimiter: "+"
+ header_delimiter: ","
+ header: 'id,severity,message'
+```
+
+
+ Input record | Output record |
+
+
+
+```json
+{
+ "timestamp": "",
+ "record": {
+ "message": "1+debug+\"\"Debug Message\"\""
+ }
+}
+```
+
+ |
+
+
+```json
+{
+ "timestamp": "",
+ "record": {
+ "id": "1",
+ "severity": "debug",
+ "message": "\"Debug Message\""
+ }
+}
+```
+
|
\ No newline at end of file
diff --git a/operator/builtin/parser/csv/csv.go b/operator/builtin/parser/csv/csv.go
index fa0da6d81..9b53926a1 100644
--- a/operator/builtin/parser/csv/csv.go
+++ b/operator/builtin/parser/csv/csv.go
@@ -27,8 +27,9 @@ func NewCSVParserConfig(operatorID string) *CSVParserConfig {
type CSVParserConfig struct {
helper.ParserConfig `yaml:",inline"`
- Header string `json:"header" yaml:"header"`
- FieldDelimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
+ Header string `json:"header" yaml:"header"`
+ HeaderDelimiter string `json:"header_delimiter,omitempty" yaml:"header_delimiter,omitempty"`
+ FieldDelimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
}
// Build will build a csv parser operator.
@@ -52,17 +53,24 @@ func (c CSVParserConfig) Build(context operator.BuildContext) ([]operator.Operat
fieldDelimiter := []rune(c.FieldDelimiter)[0]
- if !strings.Contains(c.Header, c.FieldDelimiter) {
- return nil, fmt.Errorf("missing field delimiter in header")
+ if c.HeaderDelimiter == "" {
+ c.HeaderDelimiter = c.FieldDelimiter
}
- numFields := len(strings.Split(c.Header, c.FieldDelimiter))
+ headerDelimiter := []rune(c.HeaderDelimiter)[0]
+
+ if !strings.Contains(c.Header, c.HeaderDelimiter) {
+ return nil, fmt.Errorf("missing header delimiter in header")
+ }
+
+ numFields := len(strings.Split(c.Header, c.HeaderDelimiter))
csvParser := &CSVParser{
- ParserOperator: parserOperator,
- header: c.Header,
- fieldDelimiter: fieldDelimiter,
- numFields: numFields,
+ ParserOperator: parserOperator,
+ header: c.Header,
+ headerDelimiter: headerDelimiter,
+ fieldDelimiter: fieldDelimiter,
+ numFields: numFields,
}
return []operator.Operator{csvParser}, nil
@@ -71,9 +79,10 @@ func (c CSVParserConfig) Build(context operator.BuildContext) ([]operator.Operat
// CSVParser is an operator that parses csv in an entry.
type CSVParser struct {
helper.ParserOperator
- header string
- fieldDelimiter rune
- numFields int
+ header string
+ headerDelimiter rune
+ fieldDelimiter rune
+ numFields int
}
// Process will parse an entry for csv.
@@ -84,17 +93,15 @@ func (r *CSVParser) Process(ctx context.Context, entry *entry.Entry) error {
// parse will parse a value using the supplied csv header.
func (r *CSVParser) parse(value interface{}) (interface{}, error) {
var csvLine string
- switch value.(type) {
+ switch t := value.(type) {
case string:
- csvLine += value.(string)
+ csvLine += t
case []byte:
- csvLine += string(value.([]byte))
+ csvLine += string(t)
default:
return nil, fmt.Errorf("type '%T' cannot be parsed as csv", value)
}
- delimiterStr := string([]rune{r.fieldDelimiter})
-
reader := csvparser.NewReader(strings.NewReader(csvLine))
reader.Comma = r.fieldDelimiter
reader.FieldsPerRecord = r.numFields
@@ -110,7 +117,7 @@ func (r *CSVParser) parse(value interface{}) (interface{}, error) {
return nil, err
}
- for i, key := range strings.Split(r.header, delimiterStr) {
+ for i, key := range strings.Split(r.header, string([]rune{r.headerDelimiter})) {
parsedValues[key] = record[i]
}
}
diff --git a/operator/builtin/parser/csv/csv_test.go b/operator/builtin/parser/csv/csv_test.go
index 8d21e08af..03a5955f9 100644
--- a/operator/builtin/parser/csv/csv_test.go
+++ b/operator/builtin/parser/csv/csv_test.go
@@ -169,6 +169,19 @@ func TestParserCSV(t *testing.T) {
"position": "agent",
},
},
+ {
+ "header-delimiter",
+ func(p *CSVParserConfig) {
+ p.Header = "name+sev+msg"
+ p.HeaderDelimiter = "+"
+ },
+ "stanza,INFO,started agent",
+ map[string]interface{}{
+ "name": "stanza",
+ "sev": "INFO",
+ "msg": "started agent",
+ },
+ },
}
for _, tc := range cases {
@@ -270,7 +283,7 @@ func TestBuildParserCSV(t *testing.T) {
c.Header = "name"
_, err := c.Build(testutil.NewBuildContext(t))
require.Error(t, err)
- require.Contains(t, err.Error(), "missing field delimiter in header")
+ require.Contains(t, err.Error(), "missing header delimiter in header")
})
t.Run("InvalidHeaderFieldWrongDelimiter", func(t *testing.T) {
@@ -286,6 +299,25 @@ func TestBuildParserCSV(t *testing.T) {
c.FieldDelimiter = ":"
_, err := c.Build(testutil.NewBuildContext(t))
require.Error(t, err)
- require.Contains(t, err.Error(), "missing field delimiter in header")
+ require.Contains(t, err.Error(), "missing header delimiter in header")
+ })
+
+ t.Run("HeaderDelimiter", func(t *testing.T) {
+ c := newBasicCSVParser()
+ c.Header = "name+position+number"
+ c.HeaderDelimiter = "+"
+ c.FieldDelimiter = ":"
+ _, err := c.Build(testutil.NewBuildContext(t))
+ require.NoError(t, err)
+ })
+
+ t.Run("InvalidHeaderDelimiter", func(t *testing.T) {
+ c := newBasicCSVParser()
+ c.Header = "name,position,number"
+ c.HeaderDelimiter = "+"
+ c.FieldDelimiter = ":"
+ _, err := c.Build(testutil.NewBuildContext(t))
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "missing header delimiter in header")
})
}