diff --git a/CHANGELOG.md b/CHANGELOG.md index b14322db4..fbee188b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,13 +9,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added doublestar support ## [0.13.22] - Unreleased -### Added -- Added retain operator - -## [0.13.22] - 2021-05-07 - -### Added -- Added remove operator ## [0.13.21] - 2021-05-07 diff --git a/cmd/stanza/init_common.go b/cmd/stanza/init_common.go index 6909f16c7..e20242d1a 100644 --- a/cmd/stanza/init_common.go +++ b/cmd/stanza/init_common.go @@ -21,6 +21,7 @@ import ( _ "github.com/observiq/stanza/operator/builtin/parser/time" _ "github.com/observiq/stanza/operator/builtin/parser/uri" + _ "github.com/observiq/stanza/operator/builtin/transformer/copy" _ "github.com/observiq/stanza/operator/builtin/transformer/filter" _ "github.com/observiq/stanza/operator/builtin/transformer/hostmetadata" _ "github.com/observiq/stanza/operator/builtin/transformer/k8smetadata" diff --git a/docs/operators/copy.md b/docs/operators/copy.md new file mode 100644 index 000000000..bebf387ab --- /dev/null +++ b/docs/operators/copy.md @@ -0,0 +1,198 @@ +## `copy` operator + +The `copy` operator copies a value from one [field](/docs/types/field.md) to another. + +### Configuration Fields + +| Field | Default | Description | +| --- | --- | --- | +| `id` | `copy` | A unique identifier for the operator | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries | +| `from` | required | The [field](/docs/types/field.md) to copy the value of. +| `to` | required | The [field](/docs/types/field.md) to copy the value into. +| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) | +| `if` | | An [expression](/docs/types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. | + +Example usage: + +
+Copy a value from the record to resource + +```yaml +- type: copy + from: key + to: $resource.newkey +``` + + + + + + + +
Input Entry Output Entry
+ +```json +{ + "resource": { }, + "labels": { }, + "record": { + "key":"value" + } +} +``` + + + +```json +{ + "resource": { + "newkey":"value" + }, + "labels": { }, + "record": { + "key":"value" + } +} +``` + +
+ +
+ +Copy a value from the record to labels +```yaml +- type: copy + from: key2 + to: $labels.newkey +``` + + + + + + + +
Input Entry Output Entry
+ +```json +{ + "resource": { }, + "labels": { }, + "record": { + "key1": "val1", + "key2": "val2" + } +} +``` + + + +```json +{ + "resource": { }, + "labels": { + "newkey": "val2" + }, + "record": { + "key3": "val1", + "key2": "val2" + } +} +``` + +
+ +
+ +Copy a value from labels to the record +```yaml +- type: copy + from: $labels.key + to: newkey +``` + + + + + + + +
Input Entry Output Entry
+ +```json +{ + "resource": { }, + "labels": { + "key": "newval" + }, + "record": { + "key1": "val1", + "key2": "val2" + } +} +``` + + + +```json +{ + "resource": { }, + "labels": { + "key": "newval" + }, + "record": { + "key3": "val1", + "key2": "val2", + "newkey": "newval" + } +} +``` + +
+ +
+ +Copy a value within the record +```yaml +- type: copy + from: obj.nested + to: newkey +``` + + + + + + + +
Input Entry Output Entry
+ +```json +{ + "resource": { }, + "labels": { }, + "record": { + "obj": { + "nested":"nestedvalue" + } + } +} +``` + + + +```json +{ + "resource": { }, + "labels": { }, + "record": { + "obj": { + "nested":"nestedvalue" + }, + "newkey":"nestedvalue" + } +} +``` + +
\ No newline at end of file diff --git a/operator/builtin/transformer/copy/copy.go b/operator/builtin/transformer/copy/copy.go new file mode 100644 index 000000000..66d3cac91 --- /dev/null +++ b/operator/builtin/transformer/copy/copy.go @@ -0,0 +1,87 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package copy + +import ( + "context" + "fmt" + + "github.com/observiq/stanza/entry" + "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/operator/helper" +) + +func init() { + operator.Register("copy", func() operator.Builder { return NewCopyOperatorConfig("") }) +} + +// NewCopyOperatorConfig creates a new copy operator config with default values +func NewCopyOperatorConfig(operatorID string) *CopyOperatorConfig { + return &CopyOperatorConfig{ + TransformerConfig: helper.NewTransformerConfig(operatorID, "copy"), + } +} + +// CopyOperatorConfig is the configuration of a copy operator +type CopyOperatorConfig struct { + helper.TransformerConfig `mapstructure:",squash" yaml:",inline"` + From entry.Field `mapstructure:"from" json:"from" yaml:"from"` + To entry.Field `mapstructure:"to" json:"to" yaml:"to"` +} + +// Build will build a copy operator from the supplied configuration +func (c CopyOperatorConfig) Build(context operator.BuildContext) ([]operator.Operator, error) { + transformerOperator, err := c.TransformerConfig.Build(context) + if err != nil { + return nil, err + } + + if c.From == entry.NewNilField() { + return nil, fmt.Errorf("copy: missing from field") + } + + if c.To == entry.NewNilField() { + return nil, fmt.Errorf("copy: missing to field") + } + + copyOp := &CopyOperator{ + TransformerOperator: transformerOperator, + From: c.From, + To: c.To, + } + + return []operator.Operator{copyOp}, nil +} + +// CopyOperator copies a value from one field and creates a new field with that value +type CopyOperator struct { + helper.TransformerOperator + From entry.Field + To entry.Field +} + +// Process will process an entry with a copy transformation. +func (p *CopyOperator) Process(ctx context.Context, entry *entry.Entry) error { + return p.ProcessWith(ctx, entry, p.Transform) +} + +// Transform will apply the copy operation to an entry +func (p *CopyOperator) Transform(e *entry.Entry) error { + val, exist := p.From.Get(e) + if !exist { + return fmt.Errorf("copy: from field does not exist in this entry: %s", p.From.String()) + } + return p.To.Set(e, val) +} diff --git a/operator/builtin/transformer/copy/copy_test.go b/operator/builtin/transformer/copy/copy_test.go new file mode 100644 index 000000000..cf56fc7e9 --- /dev/null +++ b/operator/builtin/transformer/copy/copy_test.go @@ -0,0 +1,270 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package copy + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/observiq/stanza/entry" + "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/testutil" +) + +type testCase struct { + name string + expectErr bool + op *CopyOperatorConfig + input func() *entry.Entry + output func() *entry.Entry +} + +// Test building and processing a CopyOperatorConfig +func TestBuildAndProcess(t *testing.T) { + newTestEntry := func() *entry.Entry { + e := entry.New() + e.Timestamp = time.Unix(1586632809, 0) + e.Record = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + }, + } + return e + } + + cases := []testCase{ + { + "body_to_body", + false, + func() *CopyOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewRecordField("key") + cfg.To = entry.NewRecordField("key2") + return cfg + }(), + newTestEntry, + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + }, + "key2": "val", + } + return e + }, + }, + { + "nested_to_body", + false, + func() *CopyOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewRecordField("nested", "nestedkey") + cfg.To = entry.NewRecordField("key2") + return cfg + }(), + newTestEntry, + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + }, + "key2": "nestedval", + } + return e + }, + }, + { + "body_to_nested", + false, + func() *CopyOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewRecordField("key") + cfg.To = entry.NewRecordField("nested", "key2") + return cfg + }(), + newTestEntry, + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + "key2": "val", + }, + } + return e + }, + }, + { + "body_to_attribute", + false, + func() *CopyOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewRecordField("key") + cfg.To = entry.NewLabelField("key2") + return cfg + }(), + newTestEntry, + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + }, + } + e.Labels = map[string]string{"key2": "val"} + return e + }, + }, + { + "attribute_to_body", + false, + func() *CopyOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewLabelField("key") + cfg.To = entry.NewRecordField("key2") + return cfg + }(), + func() *entry.Entry { + e := newTestEntry() + e.Labels = map[string]string{"key": "val"} + return e + }, + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + }, + "key2": "val", + } + e.Labels = map[string]string{"key": "val"} + return e + }, + }, + { + "attribute_to_resource", + false, + func() *CopyOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewLabelField("key") + cfg.To = entry.NewResourceField("key2") + return cfg + }(), + func() *entry.Entry { + e := newTestEntry() + e.Labels = map[string]string{"key": "val"} + return e + }, + func() *entry.Entry { + e := newTestEntry() + e.Labels = map[string]string{"key": "val"} + e.Resource = map[string]string{"key2": "val"} + return e + }, + }, + { + "overwrite", + false, + func() *CopyOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewRecordField("key") + cfg.To = entry.NewRecordField("nested") + return cfg + }(), + newTestEntry, + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "key": "val", + "nested": "val", + } + return e + }, + }, + { + "invalid_copy_obj_to_resource", + true, + func() *CopyOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewRecordField("nested") + cfg.To = entry.NewResourceField("invalid") + return cfg + }(), + newTestEntry, + nil, + }, + { + "invalid_copy_obj_to_attributes", + true, + func() *CopyOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewRecordField("nested") + cfg.To = entry.NewLabelField("invalid") + return cfg + }(), + newTestEntry, + nil, + }, + { + "invalid_key", + true, + func() *CopyOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewLabelField("nonexistentkey") + cfg.To = entry.NewResourceField("key2") + return cfg + }(), + newTestEntry, + nil, + }, + } + + for _, tc := range cases { + t.Run("BuildAndProcess/"+tc.name, func(t *testing.T) { + cfg := tc.op + cfg.OutputIDs = []string{"fake"} + cfg.OnError = "drop" + ops, err := cfg.Build(testutil.NewBuildContext(t)) + require.NoError(t, err) + op := ops[0] + + copy := op.(*CopyOperator) + fake := testutil.NewFakeOutput(t) + copy.SetOutputs([]operator.Operator{fake}) + val := tc.input() + err = copy.Process(context.Background(), val) + if tc.expectErr { + require.Error(t, err) + } else { + require.NoError(t, err) + fake.ExpectEntry(t, tc.output()) + } + }) + } +} + +func defaultCfg() *CopyOperatorConfig { + return NewCopyOperatorConfig("copy") +}