From 89f93e37e738e114fe5855224ebe90b6d85e5a18 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Thu, 9 May 2019 11:21:06 +0200 Subject: [PATCH] New processor extract_array (#11761) This adds a new processor, extract_array, that allows accessing values inside arrays and copying them to target fields. --- CHANGELOG.next.asciidoc | 1 + libbeat/cmd/instance/imports.go | 1 + libbeat/docs/processors-using.asciidoc | 54 +++- .../processors/extract_array/extract_array.go | 189 ++++++++++++ .../extract_array/extract_array_test.go | 270 ++++++++++++++++++ .../javascript/module/processor/processor.go | 2 + 6 files changed, 512 insertions(+), 5 deletions(-) create mode 100644 libbeat/processors/extract_array/extract_array.go create mode 100644 libbeat/processors/extract_array/extract_array_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 19b0657c8ee4..4b8f0c108ce4 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -143,6 +143,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add `add_observer_metadata` processor. {pull}11394[11394] - Add `decode_csv_fields` processor. {pull}11753[11753] - Add `convert` processor for converting data types of fields. {issue}8124[8124] {pull}11686[11686] +- New `extract_array` processor. {pull}11761[11761] *Auditbeat* diff --git a/libbeat/cmd/instance/imports.go b/libbeat/cmd/instance/imports.go index 0d1dac5513f4..356c4f041fe0 100644 --- a/libbeat/cmd/instance/imports.go +++ b/libbeat/cmd/instance/imports.go @@ -35,5 +35,6 @@ import ( _ "github.com/elastic/beats/libbeat/processors/convert" _ "github.com/elastic/beats/libbeat/processors/dissect" _ "github.com/elastic/beats/libbeat/processors/dns" + _ "github.com/elastic/beats/libbeat/processors/extract_array" _ "github.com/elastic/beats/libbeat/publisher/includes" // Register publisher pipeline modules ) diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index 045a3f41f9fa..81d9b56dac61 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -211,10 +211,11 @@ The supported processors are: * <> * <> ifdef::has_decode_csv_fields_processor[] -* <> + * <> endif::[] * <> * <> + * <> * <> * <> * <> @@ -815,16 +816,16 @@ The `decode_csv_fields` has the following settings: The default is the comma character. For using a TAB character you must set it to "\t". `ignore_missing`:: (Optional) Whether to ignore events which lack the source - field. The default is false, which will fail processing of an - event if a field is missing. + field. The default is `false`, which will fail processing of + an event if a field is missing. `overwrite_keys`:: Whether the target field is overwritten if it already exists. The default is false, which will fail processing of an event when `target` already exists. `trim_leading_space`:: Whether extra space after the separator is trimmed from values. This works even if the separator is also a space. - The default is false. + The default is `false`. `fail_on_error`:: (Optional) If set to true, in case of an error the changes to -the event are reverted and the original event is returned. If set to false, +the event are reverted, and the original event is returned. If set to `false`, processing continues also if an error happens. Default is `true`. endif::[] @@ -1746,3 +1747,46 @@ thrown. *Example*: `event.AppendTo("error.message", "invalid file hash");` |=== endif::[] + +[[extract-array]] +=== Extract array + +experimental[] + +The `extract_array` processor populates fields with values read from an array +field. The following example will populate `source.ip` with the first element of +the `my_array` field, `destination.ip` with the second element, and +`network.transport` with the third. + +[source,yaml] +----------------------------------------------------- +processors: + - extract_array: + field: my_array + mappings: + source.ip: 0 + destination.ip: 1 + network.transport: 2 +----------------------------------------------------- + +The following settings are supported: + +`field`:: The array field whose elements are to be extracted. +`mappings`:: Maps each field name to an array index. Use 0 for the first element in + the array. Multiple fields can be mapped to the same array element. +`ignore_missing`:: (Optional) Whether to ignore events where the array field is + missing. The default is `false`, which will fail processing + of an event if the specified field does not exist. Set it to + `true` to ignore this condition. +`overwrite_keys`:: Whether the target fields specified in the mapping are + overwritten if they already exist. The default is `false`, + which will fail processing if a target field already exists. +`fail_on_error`:: (Optional) If set to `true` and an error happens, changes to + the event are reverted, and the original event is returned. If + set to `false`, processing continues despite errors. + Default is `true`. +`omit_empty`:: (Optional) Whether empty values are extracted from the array. If + set to `true`, instead of the target field being set to an + empty value, it is left unset. The empty string (`""`), an + empty array (`[]`) or an empty object (`{}`) are considered + empty values. Default is `false`. diff --git a/libbeat/processors/extract_array/extract_array.go b/libbeat/processors/extract_array/extract_array.go new file mode 100644 index 000000000000..2bb473589cd9 --- /dev/null +++ b/libbeat/processors/extract_array/extract_array.go @@ -0,0 +1,189 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package extract_array + +import ( + "fmt" + "reflect" + "sort" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/checks" +) + +type config struct { + Field string `config:"field"` + Mappings common.MapStr `config:"mappings"` + IgnoreMissing bool `config:"ignore_missing"` + OmitEmpty bool `config:"omit_empty"` + OverwriteKeys bool `config:"overwrite_keys"` + FailOnError bool `config:"fail_on_error"` +} + +type fieldMapping struct { + from int + to string +} + +type extractArrayProcessor struct { + config + mappings []fieldMapping +} + +var ( + defaultConfig = config{ + FailOnError: true, + } + errNoMappings = errors.New("no mappings defined in extract_array processor") +) + +func init() { + processors.RegisterPlugin("extract_array", + checks.ConfigChecked(New, + checks.RequireFields("field", "mappings"), + checks.AllowedFields("field", "mappings", "ignore_missing", "overwrite_keys", "fail_on_error", "when", "omit_empty"))) +} + +// Unpack unpacks the processor's configuration. +func (f *extractArrayProcessor) Unpack(from *common.Config) error { + tmp := defaultConfig + err := from.Unpack(&tmp) + if err != nil { + return fmt.Errorf("failed to unpack the extract_array configuration: %s", err) + } + f.config = tmp + for field, column := range f.Mappings.Flatten() { + colIdx, ok := common.TryToInt(column) + if !ok || colIdx < 0 { + return fmt.Errorf("bad extract_array mapping for field %s: %+v is not a positive integer", field, column) + } + f.mappings = append(f.mappings, fieldMapping{from: colIdx, to: field}) + } + sort.Slice(f.mappings, func(i, j int) bool { + return f.mappings[i].from < f.mappings[j].from + }) + return nil +} + +// New builds a new extract_array processor. +func New(c *common.Config) (processors.Processor, error) { + p := &extractArrayProcessor{} + err := c.Unpack(p) + if err != nil { + return nil, err + } + if len(p.mappings) == 0 { + return nil, errNoMappings + } + return p, nil +} + +func isEmpty(v reflect.Value) bool { + switch v.Kind() { + case reflect.String: + return v.Len() == 0 + case reflect.Slice, reflect.Map: + return v.IsNil() || v.Len() == 0 + case reflect.Interface: + return v.IsNil() || isEmpty(v.Elem()) + } + return false +} + +func (f *extractArrayProcessor) Run(event *beat.Event) (*beat.Event, error) { + iValue, err := event.GetValue(f.config.Field) + if err != nil { + if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound { + return event, nil + } + return event, errors.Wrapf(err, "could not fetch value for field %s", f.config.Field) + } + + array := reflect.ValueOf(iValue) + if t := array.Type(); t.Kind() != reflect.Slice { + if !f.config.FailOnError { + return event, nil + } + return event, errors.Wrapf(err, "unsupported type for field %s: got: %s needed: array", f.config.Field, t.String()) + } + + saved := *event + if f.config.FailOnError { + saved.Fields = event.Fields.Clone() + saved.Meta = event.Meta.Clone() + } + + n := array.Len() + for _, mapping := range f.mappings { + if mapping.from >= n { + if !f.config.FailOnError { + continue + } + return &saved, errors.Errorf("index %d exceeds length of %d when processing mapping for field %s", mapping.from, n, mapping.to) + } + cell := array.Index(mapping.from) + // checking for CanInterface() here is done to prevent .Interface() from + // panicking, but it can only happen when value points to a private + // field inside a struct. + if !cell.IsValid() || !cell.CanInterface() || (f.config.OmitEmpty && isEmpty(cell)) { + continue + } + if !f.config.OverwriteKeys { + if _, err = event.GetValue(mapping.to); err == nil { + if !f.config.FailOnError { + continue + } + return &saved, errors.Errorf("target field %s already has a value. Set the overwrite_keys flag or drop/rename the field first", mapping.to) + } + } + if _, err = event.PutValue(mapping.to, clone(cell.Interface())); err != nil { + if !f.config.FailOnError { + continue + } + return &saved, errors.Wrapf(err, "failed setting field %s", mapping.to) + } + } + return event, nil +} + +func (f *extractArrayProcessor) String() (r string) { + return fmt.Sprintf("extract_array={field=%s, mappings=%v}", f.config.Field, f.mappings) +} + +func clone(value interface{}) interface{} { + // TODO: This is dangerous but done by most processors. + // Otherwise need to reflect value and deep copy lists / map types. + switch v := value.(type) { + case common.MapStr: + return v.Clone() + case map[string]interface{}: + return common.MapStr(v).Clone() + case []interface{}: + len := len(v) + newArr := make([]interface{}, len) + for idx, val := range v { + newArr[idx] = clone(val) + } + return newArr + } + return value +} diff --git a/libbeat/processors/extract_array/extract_array_test.go b/libbeat/processors/extract_array/extract_array_test.go new file mode 100644 index 000000000000..8dbde97a50a8 --- /dev/null +++ b/libbeat/processors/extract_array/extract_array_test.go @@ -0,0 +1,270 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package extract_array + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +func TestExtractArrayProcessor_String(t *testing.T) { + p, err := New(common.MustNewConfigFrom(common.MapStr{ + "field": "csv", + "mappings": common.MapStr{ + "source.ip": 0, + "network.transport": 2, + "destination.ip": 99, + }, + })) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, "extract_array={field=csv, mappings=[{0 source.ip} {2 network.transport} {99 destination.ip}]}", p.String()) +} + +func TestExtractArrayProcessor_Run(t *testing.T) { + tests := map[string]struct { + config common.MapStr + input beat.Event + expected beat.Event + fail bool + afterFn func(e *beat.Event) + }{ + "sample": { + config: common.MapStr{ + "field": "array", + "mappings": common.MapStr{ + "dest.one": 1, + "dest.two": 2, + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "array": []interface{}{"zero", 1, common.MapStr{"two": 2}}, + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "array": []interface{}{"zero", 1, common.MapStr{"two": 2}}, + "dest.one": 1, + "dest.two": common.MapStr{"two": 2}, + }, + }, + }, + + "modified elements": { + config: common.MapStr{ + "field": "array", + "mappings": common.MapStr{ + "dest.one": 1, + "dest.two": 2, + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "array": []interface{}{"zero", 1, common.MapStr{"two": 2}}, + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "array": []interface{}{"zero", 1, common.MapStr{"two": 2}}, + "dest.one": 1, + "dest.two": common.MapStr{"two": 3}, + }, + }, + afterFn: func(e *beat.Event) { + e.PutValue("dest.two.two", 3) + }, + }, + + "modified array": { + config: common.MapStr{ + "field": "array", + "mappings": common.MapStr{ + "dest.one": 1, + "dest.two": 2, + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "array": []interface{}{"zero", 1, []interface{}{"a", "b"}}, + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "array": []interface{}{"zero", 1, []interface{}{"a", "b"}}, + "dest.one": 1, + "dest.two": []interface{}{"a", "c"}, + }, + }, + afterFn: func(e *beat.Event) { + val, _ := e.GetValue("dest.two") + val.([]interface{})[1] = "c" + }, + }, + + "out of range mapping": { + config: common.MapStr{ + "field": "array", + "mappings": common.MapStr{ + "source.ip": 0, + "destination.ip": 999, + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "array": []interface{}{"127.0.0.1"}, + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "array": []interface{}{"127.0.0.1"}, + }, + }, + fail: true, + }, + + "ignore errors": { + config: common.MapStr{ + "field": "array", + "mappings": common.MapStr{ + "a": 0, + "b.c": 1, + }, + "fail_on_error": false, + }, + input: beat.Event{ + Fields: common.MapStr{ + "array": []interface{}{3.14, 9000.0}, + "b": true, + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "array": []interface{}{3.14, 9000.0}, + "a": 3.14, + "b": true, + }, + }, + }, + + "multicopy": { + config: common.MapStr{ + "field": "array", + "mappings": common.MapStr{ + "a": 1, + "b": 1, + "c": 1, + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "array": []interface{}{0, 42}, + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "array": []interface{}{0, 42}, + "a": 42, + "b": 42, + "c": 42, + }, + }, + }, + + "omit_empty": { + config: common.MapStr{ + "field": "array", + "mappings": common.MapStr{ + "a": 0, + "b": 1, + "c": 2, + "d": 3, + "e": 4, + }, + "omit_empty": true, + }, + input: beat.Event{ + Fields: common.MapStr{ + "array": []interface{}{0, "", []interface{}(nil), make(map[string]string), 0.0}, + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "array": []interface{}{0, "", []interface{}(nil), make(map[string]string), 0.0}, + "a": 0, + "e": 0.0, + }, + }, + }, + + "nil values": { + config: common.MapStr{ + "field": "array", + "mappings": common.MapStr{ + "a": 0, + "b": 1, + "c": 2, + "d": 3, + "e": 4, + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "array": []interface{}{nil, "", []interface{}(nil), map[string]string(nil), (*int)(nil)}, + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "array": []interface{}{nil, "", []interface{}(nil), map[string]string(nil), (*int)(nil)}, + "a": nil, + "b": "", + "c": []interface{}{}, + "d": map[string]string(nil), + "e": (*int)(nil), + }, + }, + }, + } + for title, tt := range tests { + t.Run(title, func(t *testing.T) { + cfg := common.MustNewConfigFrom(tt.config) + processor, err := New(cfg) + if err != nil { + t.Fatal(err) + } + result, err := processor.Run(&tt.input) + if tt.afterFn != nil { + tt.afterFn(result) + } + if tt.fail { + assert.Error(t, err) + t.Log("got expected error", err) + return + } + assert.NoError(t, err) + assert.Equal(t, tt.expected.Fields.Flatten(), result.Fields.Flatten()) + assert.Equal(t, tt.expected.Timestamp, result.Timestamp) + t.Log(result) + }) + } +} diff --git a/libbeat/processors/script/javascript/module/processor/processor.go b/libbeat/processors/script/javascript/module/processor/processor.go index 97b8af138308..096cadadf0fd 100644 --- a/libbeat/processors/script/javascript/module/processor/processor.go +++ b/libbeat/processors/script/javascript/module/processor/processor.go @@ -36,6 +36,7 @@ import ( "github.com/elastic/beats/libbeat/processors/decode_csv_fields" "github.com/elastic/beats/libbeat/processors/dissect" "github.com/elastic/beats/libbeat/processors/dns" + "github.com/elastic/beats/libbeat/processors/extract_array" "github.com/elastic/beats/libbeat/processors/script/javascript" ) @@ -57,6 +58,7 @@ var constructors = map[string]processors.Constructor{ "DecodeJSONFields": actions.NewDecodeJSONFields, "Dissect": dissect.NewProcessor, "DNS": dns.New, + "ExtractArray": extract_array.New, "Rename": actions.NewRenameFields, "TruncateFields": actions.NewTruncateFields, }