Skip to content

Commit

Permalink
Csv header delimiter (#370)
Browse files Browse the repository at this point in the history
* support optionally setting the header delimiter

* perform type assertion once

* changelog for csv parser: support optionally setting the header delimiter

* update error test cases. add HeaderDelimiter test cases
  • Loading branch information
Joseph Sirianni authored Aug 4, 2021
1 parent 291404b commit bc43115
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- File input: Added optional labels for resolved symlink file name and path [PR 364](https://github.com/observIQ/stanza/pull/364)
- CSV Parser: Added optional configuration field `header_delimiter` [PR 370](https://github.com/observIQ/stanza/pull/370)

## 1.1.5 - 2021-07-15

Expand Down
45 changes: 45 additions & 0 deletions docs/operators/csv_parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The `csv_parser` operator parses the string-type field selected by `parse_from`
| `id` | `csv_parser` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `header` | required | A string of delimited field names. The values in the delimited header will be used as keys |
| `header_delimiter` | value of delimiter | A character that will be used as a delimiter for the header. Values `\r` and `\n` cannot be used as a delimiter |
| `delimiter` | `,` | A character that will be used as a delimiter. Values `\r` and `\n` cannot be used as a delimiter |
| `parse_from` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed |
| `parse_to` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed |
Expand Down Expand Up @@ -144,6 +145,50 @@ Configuration:
}
```

</td>
</tr>
</table>

#### Parse the field `message` with differing delimiters for header and fields

Configuration:

```yaml
- type: csv_parser
parse_from: message
delimiter: "+"
header_delimiter: ","
header: 'id,severity,message'
```
<table>
<tr><td> Input record </td> <td> Output record </td></tr>
<tr>
<td>
```json
{
"timestamp": "",
"record": {
"message": "1+debug+\"\"Debug Message\"\""
}
}
```

</td>
<td>

```json
{
"timestamp": "",
"record": {
"id": "1",
"severity": "debug",
"message": "\"Debug Message\""
}
}
```

</td>
</tr>
</table>
43 changes: 25 additions & 18 deletions operator/builtin/parser/csv/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ func NewCSVParserConfig(operatorID string) *CSVParserConfig {
type CSVParserConfig struct {
helper.ParserConfig `yaml:",inline"`

Header string `json:"header" yaml:"header"`
FieldDelimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
Header string `json:"header" yaml:"header"`
HeaderDelimiter string `json:"header_delimiter,omitempty" yaml:"header_delimiter,omitempty"`
FieldDelimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
}

// Build will build a csv parser operator.
Expand All @@ -52,17 +53,24 @@ func (c CSVParserConfig) Build(context operator.BuildContext) ([]operator.Operat

fieldDelimiter := []rune(c.FieldDelimiter)[0]

if !strings.Contains(c.Header, c.FieldDelimiter) {
return nil, fmt.Errorf("missing field delimiter in header")
if c.HeaderDelimiter == "" {
c.HeaderDelimiter = c.FieldDelimiter
}

numFields := len(strings.Split(c.Header, c.FieldDelimiter))
headerDelimiter := []rune(c.HeaderDelimiter)[0]

if !strings.Contains(c.Header, c.HeaderDelimiter) {
return nil, fmt.Errorf("missing header delimiter in header")
}

numFields := len(strings.Split(c.Header, c.HeaderDelimiter))

csvParser := &CSVParser{
ParserOperator: parserOperator,
header: c.Header,
fieldDelimiter: fieldDelimiter,
numFields: numFields,
ParserOperator: parserOperator,
header: c.Header,
headerDelimiter: headerDelimiter,
fieldDelimiter: fieldDelimiter,
numFields: numFields,
}

return []operator.Operator{csvParser}, nil
Expand All @@ -71,9 +79,10 @@ func (c CSVParserConfig) Build(context operator.BuildContext) ([]operator.Operat
// CSVParser is an operator that parses csv in an entry.
type CSVParser struct {
helper.ParserOperator
header string
fieldDelimiter rune
numFields int
header string
headerDelimiter rune
fieldDelimiter rune
numFields int
}

// Process will parse an entry for csv.
Expand All @@ -84,17 +93,15 @@ func (r *CSVParser) Process(ctx context.Context, entry *entry.Entry) error {
// parse will parse a value using the supplied csv header.
func (r *CSVParser) parse(value interface{}) (interface{}, error) {
var csvLine string
switch value.(type) {
switch t := value.(type) {
case string:
csvLine += value.(string)
csvLine += t
case []byte:
csvLine += string(value.([]byte))
csvLine += string(t)
default:
return nil, fmt.Errorf("type '%T' cannot be parsed as csv", value)
}

delimiterStr := string([]rune{r.fieldDelimiter})

reader := csvparser.NewReader(strings.NewReader(csvLine))
reader.Comma = r.fieldDelimiter
reader.FieldsPerRecord = r.numFields
Expand All @@ -110,7 +117,7 @@ func (r *CSVParser) parse(value interface{}) (interface{}, error) {
return nil, err
}

for i, key := range strings.Split(r.header, delimiterStr) {
for i, key := range strings.Split(r.header, string([]rune{r.headerDelimiter})) {
parsedValues[key] = record[i]
}
}
Expand Down
36 changes: 34 additions & 2 deletions operator/builtin/parser/csv/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,19 @@ func TestParserCSV(t *testing.T) {
"position": "agent",
},
},
{
"header-delimiter",
func(p *CSVParserConfig) {
p.Header = "name+sev+msg"
p.HeaderDelimiter = "+"
},
"stanza,INFO,started agent",
map[string]interface{}{
"name": "stanza",
"sev": "INFO",
"msg": "started agent",
},
},
}

for _, tc := range cases {
Expand Down Expand Up @@ -270,7 +283,7 @@ func TestBuildParserCSV(t *testing.T) {
c.Header = "name"
_, err := c.Build(testutil.NewBuildContext(t))
require.Error(t, err)
require.Contains(t, err.Error(), "missing field delimiter in header")
require.Contains(t, err.Error(), "missing header delimiter in header")
})

t.Run("InvalidHeaderFieldWrongDelimiter", func(t *testing.T) {
Expand All @@ -286,6 +299,25 @@ func TestBuildParserCSV(t *testing.T) {
c.FieldDelimiter = ":"
_, err := c.Build(testutil.NewBuildContext(t))
require.Error(t, err)
require.Contains(t, err.Error(), "missing field delimiter in header")
require.Contains(t, err.Error(), "missing header delimiter in header")
})

t.Run("HeaderDelimiter", func(t *testing.T) {
c := newBasicCSVParser()
c.Header = "name+position+number"
c.HeaderDelimiter = "+"
c.FieldDelimiter = ":"
_, err := c.Build(testutil.NewBuildContext(t))
require.NoError(t, err)
})

t.Run("InvalidHeaderDelimiter", func(t *testing.T) {
c := newBasicCSVParser()
c.Header = "name,position,number"
c.HeaderDelimiter = "+"
c.FieldDelimiter = ":"
_, err := c.Build(testutil.NewBuildContext(t))
require.Error(t, err)
require.Contains(t, err.Error(), "missing header delimiter in header")
})
}

0 comments on commit bc43115

Please sign in to comment.