diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6a423409d..fbee188b4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -10,7 +10,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [0.13.22] - Unreleased
-
## [0.13.21] - 2021-05-07
### Changed
diff --git a/cmd/stanza/init_common.go b/cmd/stanza/init_common.go
index 5132067be..51aa6d39f 100644
--- a/cmd/stanza/init_common.go
+++ b/cmd/stanza/init_common.go
@@ -21,6 +21,7 @@ import (
_ "github.com/observiq/stanza/operator/builtin/parser/time"
_ "github.com/observiq/stanza/operator/builtin/parser/uri"
+ _ "github.com/observiq/stanza/operator/builtin/transformer/copy"
_ "github.com/observiq/stanza/operator/builtin/transformer/filter"
_ "github.com/observiq/stanza/operator/builtin/transformer/hostmetadata"
_ "github.com/observiq/stanza/operator/builtin/transformer/k8smetadata"
diff --git a/docs/operators/copy.md b/docs/operators/copy.md
new file mode 100644
index 000000000..bebf387ab
--- /dev/null
+++ b/docs/operators/copy.md
@@ -0,0 +1,198 @@
+## `copy` operator
+
+The `copy` operator copies a value from one [field](/docs/types/field.md) to another.
+
+### Configuration Fields
+
+| Field | Default | Description |
+| --- | --- | --- |
+| `id` | `copy` | A unique identifier for the operator |
+| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
+| `from` | required | The [field](/docs/types/field.md) to copy the value of.
+| `to` | required | The [field](/docs/types/field.md) to copy the value into.
+| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) |
+| `if` | | An [expression](/docs/types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. |
+
+Example usage:
+
+
+Copy a value from the record to resource
+
+```yaml
+- type: copy
+ from: key
+ to: $resource.newkey
+```
+
+
+ Input Entry | Output Entry |
+
+
+
+```json
+{
+ "resource": { },
+ "labels": { },
+ "record": {
+ "key":"value"
+ }
+}
+```
+
+ |
+
+
+```json
+{
+ "resource": {
+ "newkey":"value"
+ },
+ "labels": { },
+ "record": {
+ "key":"value"
+ }
+}
+```
+
+ |
+
+
+
+
+
+Copy a value from the record to labels
+```yaml
+- type: copy
+ from: key2
+ to: $labels.newkey
+```
+
+
+ Input Entry | Output Entry |
+
+
+
+```json
+{
+ "resource": { },
+ "labels": { },
+ "record": {
+ "key1": "val1",
+ "key2": "val2"
+ }
+}
+```
+
+ |
+
+
+```json
+{
+ "resource": { },
+ "labels": {
+ "newkey": "val2"
+ },
+ "record": {
+ "key3": "val1",
+ "key2": "val2"
+ }
+}
+```
+
+ |
+
+
+
+
+
+Copy a value from labels to the record
+```yaml
+- type: copy
+ from: $labels.key
+ to: newkey
+```
+
+
+ Input Entry | Output Entry |
+
+
+
+```json
+{
+ "resource": { },
+ "labels": {
+ "key": "newval"
+ },
+ "record": {
+ "key1": "val1",
+ "key2": "val2"
+ }
+}
+```
+
+ |
+
+
+```json
+{
+ "resource": { },
+ "labels": {
+ "key": "newval"
+ },
+ "record": {
+ "key3": "val1",
+ "key2": "val2",
+ "newkey": "newval"
+ }
+}
+```
+
+ |
+
+
+
+
+
+Copy a value within the record
+```yaml
+- type: copy
+ from: obj.nested
+ to: newkey
+```
+
+
+ Input Entry | Output Entry |
+
+
+
+```json
+{
+ "resource": { },
+ "labels": { },
+ "record": {
+ "obj": {
+ "nested":"nestedvalue"
+ }
+ }
+}
+```
+
+ |
+
+
+```json
+{
+ "resource": { },
+ "labels": { },
+ "record": {
+ "obj": {
+ "nested":"nestedvalue"
+ },
+ "newkey":"nestedvalue"
+ }
+}
+```
+
+ |
+
+
\ No newline at end of file
diff --git a/operator/builtin/transformer/copy/copy.go b/operator/builtin/transformer/copy/copy.go
new file mode 100644
index 000000000..66d3cac91
--- /dev/null
+++ b/operator/builtin/transformer/copy/copy.go
@@ -0,0 +1,87 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package copy
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/observiq/stanza/entry"
+ "github.com/observiq/stanza/operator"
+ "github.com/observiq/stanza/operator/helper"
+)
+
+func init() {
+ operator.Register("copy", func() operator.Builder { return NewCopyOperatorConfig("") })
+}
+
+// NewCopyOperatorConfig creates a new copy operator config with default values
+func NewCopyOperatorConfig(operatorID string) *CopyOperatorConfig {
+ return &CopyOperatorConfig{
+ TransformerConfig: helper.NewTransformerConfig(operatorID, "copy"),
+ }
+}
+
+// CopyOperatorConfig is the configuration of a copy operator
+type CopyOperatorConfig struct {
+ helper.TransformerConfig `mapstructure:",squash" yaml:",inline"`
+ From entry.Field `mapstructure:"from" json:"from" yaml:"from"`
+ To entry.Field `mapstructure:"to" json:"to" yaml:"to"`
+}
+
+// Build will build a copy operator from the supplied configuration
+func (c CopyOperatorConfig) Build(context operator.BuildContext) ([]operator.Operator, error) {
+ transformerOperator, err := c.TransformerConfig.Build(context)
+ if err != nil {
+ return nil, err
+ }
+
+ if c.From == entry.NewNilField() {
+ return nil, fmt.Errorf("copy: missing from field")
+ }
+
+ if c.To == entry.NewNilField() {
+ return nil, fmt.Errorf("copy: missing to field")
+ }
+
+ copyOp := &CopyOperator{
+ TransformerOperator: transformerOperator,
+ From: c.From,
+ To: c.To,
+ }
+
+ return []operator.Operator{copyOp}, nil
+}
+
+// CopyOperator copies a value from one field and creates a new field with that value
+type CopyOperator struct {
+ helper.TransformerOperator
+ From entry.Field
+ To entry.Field
+}
+
+// Process will process an entry with a copy transformation.
+func (p *CopyOperator) Process(ctx context.Context, entry *entry.Entry) error {
+ return p.ProcessWith(ctx, entry, p.Transform)
+}
+
+// Transform will apply the copy operation to an entry
+func (p *CopyOperator) Transform(e *entry.Entry) error {
+ val, exist := p.From.Get(e)
+ if !exist {
+ return fmt.Errorf("copy: from field does not exist in this entry: %s", p.From.String())
+ }
+ return p.To.Set(e, val)
+}
diff --git a/operator/builtin/transformer/copy/copy_test.go b/operator/builtin/transformer/copy/copy_test.go
new file mode 100644
index 000000000..cf56fc7e9
--- /dev/null
+++ b/operator/builtin/transformer/copy/copy_test.go
@@ -0,0 +1,270 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package copy
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/observiq/stanza/entry"
+ "github.com/observiq/stanza/operator"
+ "github.com/observiq/stanza/testutil"
+)
+
+type testCase struct {
+ name string
+ expectErr bool
+ op *CopyOperatorConfig
+ input func() *entry.Entry
+ output func() *entry.Entry
+}
+
+// Test building and processing a CopyOperatorConfig
+func TestBuildAndProcess(t *testing.T) {
+ newTestEntry := func() *entry.Entry {
+ e := entry.New()
+ e.Timestamp = time.Unix(1586632809, 0)
+ e.Record = map[string]interface{}{
+ "key": "val",
+ "nested": map[string]interface{}{
+ "nestedkey": "nestedval",
+ },
+ }
+ return e
+ }
+
+ cases := []testCase{
+ {
+ "body_to_body",
+ false,
+ func() *CopyOperatorConfig {
+ cfg := defaultCfg()
+ cfg.From = entry.NewRecordField("key")
+ cfg.To = entry.NewRecordField("key2")
+ return cfg
+ }(),
+ newTestEntry,
+ func() *entry.Entry {
+ e := newTestEntry()
+ e.Record = map[string]interface{}{
+ "key": "val",
+ "nested": map[string]interface{}{
+ "nestedkey": "nestedval",
+ },
+ "key2": "val",
+ }
+ return e
+ },
+ },
+ {
+ "nested_to_body",
+ false,
+ func() *CopyOperatorConfig {
+ cfg := defaultCfg()
+ cfg.From = entry.NewRecordField("nested", "nestedkey")
+ cfg.To = entry.NewRecordField("key2")
+ return cfg
+ }(),
+ newTestEntry,
+ func() *entry.Entry {
+ e := newTestEntry()
+ e.Record = map[string]interface{}{
+ "key": "val",
+ "nested": map[string]interface{}{
+ "nestedkey": "nestedval",
+ },
+ "key2": "nestedval",
+ }
+ return e
+ },
+ },
+ {
+ "body_to_nested",
+ false,
+ func() *CopyOperatorConfig {
+ cfg := defaultCfg()
+ cfg.From = entry.NewRecordField("key")
+ cfg.To = entry.NewRecordField("nested", "key2")
+ return cfg
+ }(),
+ newTestEntry,
+ func() *entry.Entry {
+ e := newTestEntry()
+ e.Record = map[string]interface{}{
+ "key": "val",
+ "nested": map[string]interface{}{
+ "nestedkey": "nestedval",
+ "key2": "val",
+ },
+ }
+ return e
+ },
+ },
+ {
+ "body_to_attribute",
+ false,
+ func() *CopyOperatorConfig {
+ cfg := defaultCfg()
+ cfg.From = entry.NewRecordField("key")
+ cfg.To = entry.NewLabelField("key2")
+ return cfg
+ }(),
+ newTestEntry,
+ func() *entry.Entry {
+ e := newTestEntry()
+ e.Record = map[string]interface{}{
+ "key": "val",
+ "nested": map[string]interface{}{
+ "nestedkey": "nestedval",
+ },
+ }
+ e.Labels = map[string]string{"key2": "val"}
+ return e
+ },
+ },
+ {
+ "attribute_to_body",
+ false,
+ func() *CopyOperatorConfig {
+ cfg := defaultCfg()
+ cfg.From = entry.NewLabelField("key")
+ cfg.To = entry.NewRecordField("key2")
+ return cfg
+ }(),
+ func() *entry.Entry {
+ e := newTestEntry()
+ e.Labels = map[string]string{"key": "val"}
+ return e
+ },
+ func() *entry.Entry {
+ e := newTestEntry()
+ e.Record = map[string]interface{}{
+ "key": "val",
+ "nested": map[string]interface{}{
+ "nestedkey": "nestedval",
+ },
+ "key2": "val",
+ }
+ e.Labels = map[string]string{"key": "val"}
+ return e
+ },
+ },
+ {
+ "attribute_to_resource",
+ false,
+ func() *CopyOperatorConfig {
+ cfg := defaultCfg()
+ cfg.From = entry.NewLabelField("key")
+ cfg.To = entry.NewResourceField("key2")
+ return cfg
+ }(),
+ func() *entry.Entry {
+ e := newTestEntry()
+ e.Labels = map[string]string{"key": "val"}
+ return e
+ },
+ func() *entry.Entry {
+ e := newTestEntry()
+ e.Labels = map[string]string{"key": "val"}
+ e.Resource = map[string]string{"key2": "val"}
+ return e
+ },
+ },
+ {
+ "overwrite",
+ false,
+ func() *CopyOperatorConfig {
+ cfg := defaultCfg()
+ cfg.From = entry.NewRecordField("key")
+ cfg.To = entry.NewRecordField("nested")
+ return cfg
+ }(),
+ newTestEntry,
+ func() *entry.Entry {
+ e := newTestEntry()
+ e.Record = map[string]interface{}{
+ "key": "val",
+ "nested": "val",
+ }
+ return e
+ },
+ },
+ {
+ "invalid_copy_obj_to_resource",
+ true,
+ func() *CopyOperatorConfig {
+ cfg := defaultCfg()
+ cfg.From = entry.NewRecordField("nested")
+ cfg.To = entry.NewResourceField("invalid")
+ return cfg
+ }(),
+ newTestEntry,
+ nil,
+ },
+ {
+ "invalid_copy_obj_to_attributes",
+ true,
+ func() *CopyOperatorConfig {
+ cfg := defaultCfg()
+ cfg.From = entry.NewRecordField("nested")
+ cfg.To = entry.NewLabelField("invalid")
+ return cfg
+ }(),
+ newTestEntry,
+ nil,
+ },
+ {
+ "invalid_key",
+ true,
+ func() *CopyOperatorConfig {
+ cfg := defaultCfg()
+ cfg.From = entry.NewLabelField("nonexistentkey")
+ cfg.To = entry.NewResourceField("key2")
+ return cfg
+ }(),
+ newTestEntry,
+ nil,
+ },
+ }
+
+ for _, tc := range cases {
+ t.Run("BuildAndProcess/"+tc.name, func(t *testing.T) {
+ cfg := tc.op
+ cfg.OutputIDs = []string{"fake"}
+ cfg.OnError = "drop"
+ ops, err := cfg.Build(testutil.NewBuildContext(t))
+ require.NoError(t, err)
+ op := ops[0]
+
+ copy := op.(*CopyOperator)
+ fake := testutil.NewFakeOutput(t)
+ copy.SetOutputs([]operator.Operator{fake})
+ val := tc.input()
+ err = copy.Process(context.Background(), val)
+ if tc.expectErr {
+ require.Error(t, err)
+ } else {
+ require.NoError(t, err)
+ fake.ExpectEntry(t, tc.output())
+ }
+ })
+ }
+}
+
+func defaultCfg() *CopyOperatorConfig {
+ return NewCopyOperatorConfig("copy")
+}