diff --git a/CHANGELOG.md b/CHANGELOG.md index d4af91002..028f7dd58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.13.22] - 2021-05-07 + +### Added +- Added remove operator + ## [0.13.21] - 2021-05-07 ### Changed diff --git a/cmd/stanza/init_common.go b/cmd/stanza/init_common.go index a28a29323..b356eb486 100644 --- a/cmd/stanza/init_common.go +++ b/cmd/stanza/init_common.go @@ -28,6 +28,7 @@ import ( _ "github.com/observiq/stanza/operator/builtin/transformer/noop" _ "github.com/observiq/stanza/operator/builtin/transformer/ratelimit" _ "github.com/observiq/stanza/operator/builtin/transformer/recombine" + _ "github.com/observiq/stanza/operator/builtin/transformer/remove" _ "github.com/observiq/stanza/operator/builtin/transformer/restructure" _ "github.com/observiq/stanza/operator/builtin/transformer/router" diff --git a/docs/operators/remove.md b/docs/operators/remove.md new file mode 100644 index 000000000..528a3d00b --- /dev/null +++ b/docs/operators/remove.md @@ -0,0 +1,181 @@ +## `remove` operator + +The `remove` operator removes a field from a record. + +### Configuration Fields + +| Field | Default | Description | +| --- | --- | --- | +| `id` | `remove` | A unique identifier for the operator | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries | +| `field` | required | The [field](/docs/types/field.md) to remove. +| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) | +| `if` | | An [expression](/docs/types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. | + +Example usage: + +
+ +Remove a value from the record +```yaml +- type: remove + field: key1 +``` + + + + + + + +
Input Entry Output Entry
+ +```json +{ + "resource": { }, + "labels": { }, + "record": { + "key1": "val1", + } +} +``` + + + +```json +{ + "resource": { }, + "labels": { }, + "record": { } +} +``` + +
+ +
+ +Remove an object from the record +```yaml +- type: remove + field: object +``` + + + + + + + +
Input Entry Output Entry
+ +```json +{ + "resource": { }, + "labels": { }, + "record": { + "object": { + "nestedkey": "nestedval" + }, + "key": "val" + }, +} +``` + + + +```json +{ + "resource": { }, + "labels": { }, + "record": { + "key": "val" + } +} +``` + +
+ +
+ +Remove a value from labels +```yaml +- type: remove + field: $labels.otherkey +``` + + + + + + + +
Input Entry Output Entry
+ +```json +{ + "resource": { }, + "labels": { + "otherkey": "val" + }, + "record": { + "key": "val" + }, +} +``` + + + +```json +{ + "resource": { }, + "labels": { }, + "record": { + "key": "val" + } +} +``` + +
+ +
+ +Remove a value from resource + +```yaml +- type: remove + field: $resource.otherkey +``` + + + + + + + +
Input Entry Output Entry
+ +```json +{ + "resource": { + "otherkey": "val" + }, + "labels": { }, + "record": { + "key": "val" + }, +} +``` + + + +```json +{ + "resource": { }, + "labels": { }, + "record": { + "key": "val" + } +} +``` + +
\ No newline at end of file diff --git a/operator/builtin/transformer/remove/remove.go b/operator/builtin/transformer/remove/remove.go new file mode 100644 index 000000000..65f218403 --- /dev/null +++ b/operator/builtin/transformer/remove/remove.go @@ -0,0 +1,66 @@ +package remove + +import ( + "context" + "fmt" + + "github.com/observiq/stanza/entry" + "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/operator/helper" +) + +func init() { + operator.Register("remove", func() operator.Builder { return NewRemoveOperatorConfig("") }) +} + +// NewRemoveOperatorConfig creates a new restructure operator config with default values +func NewRemoveOperatorConfig(operatorID string) *RemoveOperatorConfig { + return &RemoveOperatorConfig{ + TransformerConfig: helper.NewTransformerConfig(operatorID, "remove"), + } +} + +// RemoveOperatorConfig is the configuration of a restructure operator +type RemoveOperatorConfig struct { + helper.TransformerConfig `mapstructure:",squash" yaml:",inline"` + + Field entry.Field `mapstructure:"field" json:"field" yaml:"field"` +} + +// Build will build a Remove operator from the supplied configuration +func (c RemoveOperatorConfig) Build(context operator.BuildContext) ([]operator.Operator, error) { + transformerOperator, err := c.TransformerConfig.Build(context) + if err != nil { + return nil, err + } + if c.Field == entry.NewNilField() { + return nil, fmt.Errorf("remove: field is empty") + } + + removeOperator := &RemoveOperator{ + TransformerOperator: transformerOperator, + Field: c.Field, + } + + return []operator.Operator{removeOperator}, nil +} + +// RemoveOperator is an operator that deletes a field +type RemoveOperator struct { + helper.TransformerOperator + Field entry.Field +} + +// Process will process an entry with a restructure transformation. +func (p *RemoveOperator) Process(ctx context.Context, entry *entry.Entry) error { + return p.ProcessWith(ctx, entry, p.Transform) +} + +// Transform will apply the restructure operations to an entry +func (p *RemoveOperator) Transform(entry *entry.Entry) error { + _, exist := entry.Delete(p.Field) + if !exist { + return fmt.Errorf("remove: field does not exist") + } + return nil +} diff --git a/operator/builtin/transformer/remove/remove_test.go b/operator/builtin/transformer/remove/remove_test.go new file mode 100644 index 000000000..85724e546 --- /dev/null +++ b/operator/builtin/transformer/remove/remove_test.go @@ -0,0 +1,160 @@ +package remove + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/observiq/stanza/entry" + "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/testutil" +) + +type testCase struct { + name string + op *RemoveOperatorConfig + input func() *entry.Entry + output func() *entry.Entry + expectErr bool +} + +func TestProcessAndBuild(t *testing.T) { + newTestEntry := func() *entry.Entry { + e := entry.New() + e.Timestamp = time.Unix(1586632809, 0) + e.Record = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + }, + } + return e + } + + cases := []testCase{ + { + "remove_one", + func() *RemoveOperatorConfig { + cfg := defaultCfg() + cfg.Field = entry.NewRecordField("key") + return cfg + }(), + newTestEntry, + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + }, + } + return e + }, + false, + }, + { + "remove_nestedkey", + func() *RemoveOperatorConfig { + cfg := defaultCfg() + cfg.Field = entry.NewRecordField("nested", "nestedkey") + return cfg + }(), + newTestEntry, + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{}, + } + return e + }, + false, + }, + { + "remove_obj", + func() *RemoveOperatorConfig { + cfg := defaultCfg() + cfg.Field = entry.NewRecordField("nested") + return cfg + }(), + newTestEntry, + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "key": "val", + } + return e + }, + false, + }, + { + "remove_single_attribute", + func() *RemoveOperatorConfig { + cfg := defaultCfg() + cfg.Field = entry.NewLabelField("key") + return cfg + }(), + func() *entry.Entry { + e := newTestEntry() + e.Labels = map[string]string{ + "key": "val", + } + return e + }, + func() *entry.Entry { + e := newTestEntry() + e.Labels = map[string]string{} + return e + }, + false, + }, + { + "remove_single_resource", + func() *RemoveOperatorConfig { + cfg := defaultCfg() + cfg.Field = entry.NewResourceField("key") + return cfg + }(), + func() *entry.Entry { + e := newTestEntry() + e.Resource = map[string]string{ + "key": "val", + } + return e + }, + func() *entry.Entry { + e := newTestEntry() + e.Resource = map[string]string{} + return e + }, + false, + }, + } + for _, tc := range cases { + t.Run("BuildandProcess/"+tc.name, func(t *testing.T) { + cfg := tc.op + cfg.OutputIDs = []string{"fake"} + cfg.OnError = "drop" + ops, err := cfg.Build(testutil.NewBuildContext(t)) + require.NoError(t, err) + op := ops[0] + + remove := op.(*RemoveOperator) + fake := testutil.NewFakeOutput(t) + remove.SetOutputs([]operator.Operator{fake}) + val := tc.input() + err = remove.Process(context.Background(), val) + if tc.expectErr { + require.Error(t, err) + } else { + require.NoError(t, err) + fake.ExpectEntry(t, tc.output()) + } + }) + } +} + +func defaultCfg() *RemoveOperatorConfig { + return NewRemoveOperatorConfig("remove") +}