Skip to content

Commit

Permalink
Port copy operator (#278)
Browse files Browse the repository at this point in the history
* port copy operator
  • Loading branch information
Mrod1598 authored May 7, 2021
1 parent dbd2f09 commit 012b857
Show file tree
Hide file tree
Showing 5 changed files with 556 additions and 7 deletions.
7 changes: 0 additions & 7 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added doublestar support

## [0.13.22] - Unreleased
### Added
- Added retain operator

## [0.13.22] - 2021-05-07

### Added
- Added remove operator

## [0.13.21] - 2021-05-07

Expand Down
1 change: 1 addition & 0 deletions cmd/stanza/init_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
_ "github.com/observiq/stanza/operator/builtin/parser/time"
_ "github.com/observiq/stanza/operator/builtin/parser/uri"

_ "github.com/observiq/stanza/operator/builtin/transformer/copy"
_ "github.com/observiq/stanza/operator/builtin/transformer/filter"
_ "github.com/observiq/stanza/operator/builtin/transformer/hostmetadata"
_ "github.com/observiq/stanza/operator/builtin/transformer/k8smetadata"
Expand Down
198 changes: 198 additions & 0 deletions docs/operators/copy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
## `copy` operator

The `copy` operator copies a value from one [field](/docs/types/field.md) to another.

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `copy` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `from` | required | The [field](/docs/types/field.md) to copy the value of.
| `to` | required | The [field](/docs/types/field.md) to copy the value into.
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) |
| `if` | | An [expression](/docs/types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. |

Example usage:

<hr>
Copy a value from the record to resource

```yaml
- type: copy
from: key
to: $resource.newkey
```
<table>
<tr><td> Input Entry</td> <td> Output Entry </td></tr>
<tr>
<td>
```json
{
"resource": { },
"labels": { },
"record": {
"key":"value"
}
}
```

</td>
<td>

```json
{
"resource": {
"newkey":"value"
},
"labels": { },
"record": {
"key":"value"
}
}
```

</td>
</tr>
</table>

<hr>

Copy a value from the record to labels
```yaml
- type: copy
from: key2
to: $labels.newkey
```
<table>
<tr><td> Input Entry</td> <td> Output Entry </td></tr>
<tr>
<td>
```json
{
"resource": { },
"labels": { },
"record": {
"key1": "val1",
"key2": "val2"
}
}
```

</td>
<td>

```json
{
"resource": { },
"labels": {
"newkey": "val2"
},
"record": {
"key3": "val1",
"key2": "val2"
}
}
```

</td>
</tr>
</table>

<hr>

Copy a value from labels to the record
```yaml
- type: copy
from: $labels.key
to: newkey
```
<table>
<tr><td> Input Entry</td> <td> Output Entry </td></tr>
<tr>
<td>
```json
{
"resource": { },
"labels": {
"key": "newval"
},
"record": {
"key1": "val1",
"key2": "val2"
}
}
```

</td>
<td>

```json
{
"resource": { },
"labels": {
"key": "newval"
},
"record": {
"key3": "val1",
"key2": "val2",
"newkey": "newval"
}
}
```

</td>
</tr>
</table>

<hr>

Copy a value within the record
```yaml
- type: copy
from: obj.nested
to: newkey
```
<table>
<tr><td> Input Entry</td> <td> Output Entry </td></tr>
<tr>
<td>
```json
{
"resource": { },
"labels": { },
"record": {
"obj": {
"nested":"nestedvalue"
}
}
}
```

</td>
<td>

```json
{
"resource": { },
"labels": { },
"record": {
"obj": {
"nested":"nestedvalue"
},
"newkey":"nestedvalue"
}
}
```

</td>
</tr>
</table>
87 changes: 87 additions & 0 deletions operator/builtin/transformer/copy/copy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package copy

import (
"context"
"fmt"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
"github.com/observiq/stanza/operator/helper"
)

func init() {
operator.Register("copy", func() operator.Builder { return NewCopyOperatorConfig("") })
}

// NewCopyOperatorConfig creates a new copy operator config with default values
func NewCopyOperatorConfig(operatorID string) *CopyOperatorConfig {
return &CopyOperatorConfig{
TransformerConfig: helper.NewTransformerConfig(operatorID, "copy"),
}
}

// CopyOperatorConfig is the configuration of a copy operator
type CopyOperatorConfig struct {
helper.TransformerConfig `mapstructure:",squash" yaml:",inline"`
From entry.Field `mapstructure:"from" json:"from" yaml:"from"`
To entry.Field `mapstructure:"to" json:"to" yaml:"to"`
}

// Build will build a copy operator from the supplied configuration
func (c CopyOperatorConfig) Build(context operator.BuildContext) ([]operator.Operator, error) {
transformerOperator, err := c.TransformerConfig.Build(context)
if err != nil {
return nil, err
}

if c.From == entry.NewNilField() {
return nil, fmt.Errorf("copy: missing from field")
}

if c.To == entry.NewNilField() {
return nil, fmt.Errorf("copy: missing to field")
}

copyOp := &CopyOperator{
TransformerOperator: transformerOperator,
From: c.From,
To: c.To,
}

return []operator.Operator{copyOp}, nil
}

// CopyOperator copies a value from one field and creates a new field with that value
type CopyOperator struct {
helper.TransformerOperator
From entry.Field
To entry.Field
}

// Process will process an entry with a copy transformation.
func (p *CopyOperator) Process(ctx context.Context, entry *entry.Entry) error {
return p.ProcessWith(ctx, entry, p.Transform)
}

// Transform will apply the copy operation to an entry
func (p *CopyOperator) Transform(e *entry.Entry) error {
val, exist := p.From.Get(e)
if !exist {
return fmt.Errorf("copy: from field does not exist in this entry: %s", p.From.String())
}
return p.To.Set(e, val)
}
Loading

0 comments on commit 012b857

Please sign in to comment.