Skip to content

Commit

Permalink
Port add operator (#272)
Browse files Browse the repository at this point in the history
* Add 'Add' operator and the doc for it

* Update Changelog

* Remove license

* Force Update

* Update init_common

* Update init_common

* Update Changelog

* Force Ci
  • Loading branch information
Mrod1598 authored May 7, 2021
1 parent 012b857 commit 160fc99
Show file tree
Hide file tree
Showing 5 changed files with 636 additions and 1 deletion.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed TCP Input Operator panic [PR296](https://github.com/observIQ/stanza/pull/296)

## [0.13.20] - 2021-05-06
## [0.13.20] - 2021-05-06

### Added
- Added flatten Operator [PR 286](https://github.com/observIQ/stanza/pull/286)
Expand Down
1 change: 1 addition & 0 deletions cmd/stanza/init_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
_ "github.com/observiq/stanza/operator/builtin/parser/time"
_ "github.com/observiq/stanza/operator/builtin/parser/uri"

_ "github.com/observiq/stanza/operator/builtin/transformer/add"
_ "github.com/observiq/stanza/operator/builtin/transformer/copy"
_ "github.com/observiq/stanza/operator/builtin/transformer/filter"
_ "github.com/observiq/stanza/operator/builtin/transformer/hostmetadata"
Expand Down
274 changes: 274 additions & 0 deletions docs/operators/add.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
## `add` operator

The `add` operator adds a value to an `entry`'s `record`, `labels`, or `resource`.

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `add` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `field` | required | The [field](/docs/types/field.md) to be added.
| `value` | required | `value` is either a static value or an [expression](https://github.com/open-telemetry/opentelemetry-log-collection/blob/main/docs/types/expression.md). If a value is specified, it will be added to each entry at the field defined by `field`. If an expression is specified, it will be evaluated for each entry and added at the field defined by `field`
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) |
| `if` | | An [expression](/docs/types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. |


Example usage:

<hr>
Add a string to the record

```yaml
- type: add
field: key2
value: val2
```
<table>
<tr><td> Input entry </td> <td> Output entry </td></tr>
<tr>
<td>
```json
{
"resource": { },
"labels": { },
"record": {
"key1": "val1",
}
}
```

</td>
<td>

```json
{
"resource": { },
"labels": { },
"record": {
"key1": "val1",
"key2": "val2"
}
}
```

</td>
</tr>
</table>

<hr>
Add a value to the record using an expression

```yaml
- type: add
field: key2
value: EXPR($.key1 + "_suffix")
```
<table>
<tr><td> Input entry </td> <td> Output entry </td></tr>
<tr>
<td>
```json
{
"resource": { },
"labels": { },
"record": {
"key1": "val1",
}
}
```

</td>
<td>

```json
{
"resource": { },
"labels": { },
"record": {
"key1": "val1",
"key2": "val1_suffix"
}
}
```

</td>
</tr>
</table>

<hr>
Add an object to the record

```yaml
- type: add
field: key2
value:
nestedkey: nestedvalue
```
<table>
<tr><td> Input entry </td> <td> Output entry </td></tr>
<tr>
<td>
```json
{
"resource": { },
"labels": { },
"record": {
"key1": "val1",
}
}
```

</td>
<td>

```json
{
"resource": { },
"labels": { },
"record": {
"key1": "val1",
"key2": {
"nestedkey":"nested value"
}
}
}
```

</td>
</tr>
</table>

<hr>
Add a value to labels

```yaml
- type: add
field: $labels.key2
value: val2
```
<table>
<tr><td> Input entry </td> <td> Output entry </td></tr>
<tr>
<td>
```json
{
"resource": { },
"labels": { },
"record": {
"key1": "val1",
}
}
```

</td>
<td>

```json
{
"resource": { },
"labels": {
"key2": "val2"
},
"record": {
"key1": "val1"
}
}
```

</td>
</tr>
</table>

<hr>
Add a value to resource

```yaml
- type: add
field: $resource.key2
value: val2
```
<table>
<tr><td> Input entry </td> <td> Output entry </td></tr>
<tr>
<td>
```json
{
"resource": { },
"labels": { },
"record": {
"key1": "val1",
}
}
```

</td>
<td>

```json
{
"resource": {
"key2": "val2"
},
"labels": { },
"record": {
"key1": "val1"
}
}
```

</td>
</tr>
</table>

Add a value to resource using an expression

```yaml
- type: add
field: $resource.key2
value: EXPR($.key1 + "_suffix")
```
<table>
<tr><td> Input entry </td> <td> Output entry </td></tr>
<tr>
<td>
```json
{
"resource": { },
"labels": { },
"record": {
"key1": "val1",
}
}
```

</td>
<td>

```json
{
"resource": {
"key2": "val_suffix"
},
"labels": { },
"record": {
"key1": "val1",
}
}
```

</td>
</tr>
</table>
96 changes: 96 additions & 0 deletions operator/builtin/transformer/add/add.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package add

import (
"context"
"fmt"
"strings"

"github.com/antonmedv/expr"
"github.com/antonmedv/expr/vm"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
"github.com/observiq/stanza/operator/helper"
)

func init() {
operator.Register("add", func() operator.Builder { return NewAddOperatorConfig("") })
}

// NewAddOperatorConfig creates a new add operator config with default values
func NewAddOperatorConfig(operatorID string) *AddOperatorConfig {
return &AddOperatorConfig{
TransformerConfig: helper.NewTransformerConfig(operatorID, "add"),
}
}

// AddOperatorConfig is the configuration of an add operator
type AddOperatorConfig struct {
helper.TransformerConfig `mapstructure:",squash" yaml:",inline"`
Field entry.Field `mapstructure:"field" json:"field" yaml:"field"`
Value interface{} `mapstructure:"value,omitempty" json:"value,omitempty" yaml:"value,omitempty"`
}

// Build will build an add operator from the supplied configuration
func (c AddOperatorConfig) Build(context operator.BuildContext) ([]operator.Operator, error) {
transformerOperator, err := c.TransformerConfig.Build(context)
if err != nil {
return nil, err
}

addOperator := &AddOperator{
TransformerOperator: transformerOperator,
Field: c.Field,
}
strVal, ok := c.Value.(string)
if !ok || !isExpr(strVal) {
addOperator.Value = c.Value
return []operator.Operator{addOperator}, nil
}
exprStr := strings.TrimPrefix(strVal, "EXPR(")
exprStr = strings.TrimSuffix(exprStr, ")")

compiled, err := expr.Compile(exprStr, expr.AllowUndefinedVariables())
if err != nil {
return nil, fmt.Errorf("failed to compile expression '%s': %w", c.IfExpr, err)
}

addOperator.program = compiled
return []operator.Operator{addOperator}, nil
}

// AddOperator is an operator that adds a string value or an expression value
type AddOperator struct {
helper.TransformerOperator

Field entry.Field
Value interface{}
program *vm.Program
}

// Process will process an entry with a add transformation.
func (p *AddOperator) Process(ctx context.Context, entry *entry.Entry) error {
return p.ProcessWith(ctx, entry, p.Transform)
}

// Transform will apply the add operations to an entry
func (p *AddOperator) Transform(e *entry.Entry) error {
if p.Value != nil {
return e.Set(p.Field, p.Value)
}
if p.program != nil {
env := helper.GetExprEnv(e)
defer helper.PutExprEnv(env)

result, err := vm.Run(p.program, env)
if err != nil {
return fmt.Errorf("evaluate value_expr: %s", err)
}
return e.Set(p.Field, result)
}
return fmt.Errorf("add: missing required field 'value'")
}

func isExpr(str string) bool {
return strings.HasPrefix(str, "EXPR(") && strings.HasSuffix(str, ")")
}
Loading

0 comments on commit 160fc99

Please sign in to comment.