From 58468cc178804883fd2ffb17d4e2dbf4c9c66cfa Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 11 Dec 2019 15:38:57 -0800 Subject: [PATCH 01/11] Refactoring: renaming var to avoid collision --- journalbeat/input/input.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index d45c69e08128..66ae2aeb2eca 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -102,7 +102,7 @@ func New( readers = append(readers, r) } - processors, err := processors.New(config.Processors) + inputProcessors, err := processors.New(config.Processors) if err != nil { return nil, err } @@ -118,7 +118,7 @@ func New( id: id, logger: logger, eventMeta: config.EventMetadata, - processors: processors, + processors: inputProcessors, }, nil } From 468e954074c33b2b5b323ae6fe8f6c1f47adf49a Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 11 Dec 2019 15:39:37 -0800 Subject: [PATCH 02/11] Refactoring: extract function --- journalbeat/input/input.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index 66ae2aeb2eca..23a46f84af75 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -102,7 +102,7 @@ func New( readers = append(readers, r) } - inputProcessors, err := processors.New(config.Processors) + inputProcessors, err := processorsForInput(config) if err != nil { return nil, err } @@ -203,3 +203,7 @@ func (i *Input) Stop() { func (i *Input) Wait() { i.Stop() } + +func processorsForInput(config Config) (*processors.Processors, error) { + return processors.New(config.Processors) +} From d80fe156f58ca98f6284aa6a88d2f7f20df97861 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 11 Dec 2019 15:48:33 -0800 Subject: [PATCH 03/11] Refactoring: moving AddFormattedIndex processor into libbeat --- filebeat/channel/connector.go | 30 +---------- .../add_formatted_index.go | 51 +++++++++++++++++++ 2 files changed, 53 insertions(+), 28 deletions(-) create mode 100644 libbeat/processors/add_formatted_index/add_formatted_index.go diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go index ebd5983a3f7b..346f1fe80670 100644 --- a/filebeat/channel/connector.go +++ b/filebeat/channel/connector.go @@ -18,12 +18,11 @@ package channel import ( - "fmt" - "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/add_formatted_index" ) // ConnectorFunc is an adapter for using ordinary functions as Connector. @@ -34,14 +33,6 @@ type pipelineConnector struct { pipeline beat.Pipeline } -// addFormattedIndex is a Processor to set an event's "raw_index" metadata field -// with a given TimestampFormatString. The elasticsearch output interprets -// that field as specifying the (raw string) index the event should be sent to; -// in other outputs it is just included in the metadata. -type addFormattedIndex struct { - formatString *fmtstr.TimestampFormatString -} - // Connect passes the cfg and the zero value of beat.ClientConfig to the underlying function. func (fn ConnectorFunc) Connect(cfg *common.Config) (Outleter, error) { return fn(cfg, beat.ClientConfig{}) @@ -132,7 +123,7 @@ func processorsForConfig( if err != nil { return nil, err } - indexProcessor := &addFormattedIndex{timestampFormat} + indexProcessor := &add_formatted_index.AddFormattedIndex{timestampFormat} procs.List = append(procs.List, indexProcessor) } @@ -160,20 +151,3 @@ func processorsForConfig( procs.List = append(procs.List, userProcessors.List...) return procs, nil } - -func (p *addFormattedIndex) Run(event *beat.Event) (*beat.Event, error) { - index, err := p.formatString.Run(event.Timestamp) - if err != nil { - return nil, err - } - - if event.Meta == nil { - event.Meta = common.MapStr{} - } - event.Meta["raw_index"] = index - return event, nil -} - -func (p *addFormattedIndex) String() string { - return fmt.Sprintf("add_index_pattern=%v", p.formatString) -} diff --git a/libbeat/processors/add_formatted_index/add_formatted_index.go b/libbeat/processors/add_formatted_index/add_formatted_index.go new file mode 100644 index 000000000000..23801cd6bb7b --- /dev/null +++ b/libbeat/processors/add_formatted_index/add_formatted_index.go @@ -0,0 +1,51 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_formatted_index + +import ( + "fmt" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/fmtstr" +) + +// AddFormattedIndex is a Processor to set an event's "raw_index" metadata field +// with a given TimestampFormatString. The elasticsearch output interprets +// that field as specifying the (raw string) index the event should be sent to; +// in other outputs it is just included in the metadata. +type AddFormattedIndex struct { + formatString *fmtstr.TimestampFormatString +} + +func (p *AddFormattedIndex) Run(event *beat.Event) (*beat.Event, error) { + index, err := p.formatString.Run(event.Timestamp) + if err != nil { + return nil, err + } + + if event.Meta == nil { + event.Meta = common.MapStr{} + } + event.Meta["raw_index"] = index + return event, nil +} + +func (p *AddFormattedIndex) String() string { + return fmt.Sprintf("add_index_pattern=%v", p.formatString) +} From d2ced940abaf4a845d79438aa2facf9e3301dcfb Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 11 Dec 2019 15:56:41 -0800 Subject: [PATCH 04/11] Add constructor for addFormattedIndex processor --- filebeat/channel/connector.go | 2 +- journalbeat/beater/journalbeat.go | 2 +- journalbeat/input/config.go | 3 ++ journalbeat/input/input.go | 47 +++++++++++++++++-- .../add_formatted_index.go | 12 +++-- 5 files changed, 55 insertions(+), 11 deletions(-) diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go index 346f1fe80670..e5ab0172d58d 100644 --- a/filebeat/channel/connector.go +++ b/filebeat/channel/connector.go @@ -123,7 +123,7 @@ func processorsForConfig( if err != nil { return nil, err } - indexProcessor := &add_formatted_index.AddFormattedIndex{timestampFormat} + indexProcessor := add_formatted_index.New(timestampFormat) procs.List = append(procs.List, indexProcessor) } diff --git a/journalbeat/beater/journalbeat.go b/journalbeat/beater/journalbeat.go index aefe6f8abe1f..7a54163f832e 100644 --- a/journalbeat/beater/journalbeat.go +++ b/journalbeat/beater/journalbeat.go @@ -67,7 +67,7 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { var inputs []*input.Input for _, c := range config.Inputs { - i, err := input.New(c, b.Publisher, done, cp.States()) + i, err := input.New(c, b, done, cp.States()) if err != nil { return nil, err } diff --git a/journalbeat/input/config.go b/journalbeat/input/config.go index 5ed5f49101f3..3916f9ad515b 100644 --- a/journalbeat/input/config.go +++ b/journalbeat/input/config.go @@ -22,6 +22,7 @@ import ( "github.com/elastic/beats/journalbeat/config" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/processors" ) @@ -47,6 +48,8 @@ type Config struct { common.EventMetadata `config:",inline"` // Processors to run on events. Processors processors.PluginConfig `config:"processors"` + // ES output index pattern + Index fmtstr.EventFormatString `config:"index"` } var ( diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index 23a46f84af75..c9fb2797a73c 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -21,6 +21,10 @@ import ( "fmt" "sync" + "github.com/elastic/beats/libbeat/processors/add_formatted_index" + + "github.com/elastic/beats/libbeat/common/fmtstr" + "github.com/gofrs/uuid" "github.com/elastic/beats/journalbeat/checkpoint" @@ -48,7 +52,7 @@ type Input struct { // New returns a new Inout func New( c *common.Config, - pipeline beat.Pipeline, + b *beat.Beat, done chan struct{}, states map[string]checkpoint.JournalState, ) (*Input, error) { @@ -102,7 +106,7 @@ func New( readers = append(readers, r) } - inputProcessors, err := processorsForInput(config) + inputProcessors, err := processorsForInput(b.Info, config) if err != nil { return nil, err } @@ -113,7 +117,7 @@ func New( readers: readers, done: done, config: config, - pipeline: pipeline, + pipeline: b.Publisher, states: states, id: id, logger: logger, @@ -204,6 +208,39 @@ func (i *Input) Wait() { i.Stop() } -func processorsForInput(config Config) (*processors.Processors, error) { - return processors.New(config.Processors) +func processorsForInput(beatInfo beat.Info, config Config) (*processors.Processors, error) { + procs := processors.NewList(nil) + + // Processor ordering is important: + // 1. Index configuration + if !config.Index.IsEmpty() { + staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version) + timestampFormat, err := + fmtstr.NewTimestampFormatString(&config.Index, staticFields) + if err != nil { + return nil, err + } + indexProcessor := add_formatted_index.New(timestampFormat) + procs.List = append(procs.List, indexProcessor) + } + + // 2. User processors + userProcessors, err := processors.New(config.Processors) + if err != nil { + return nil, err + } + // Subtlety: it is important here that we append the individual elements of + // userProcessors, rather than userProcessors itself, even though + // userProcessors implements the processors.Processor interface. This is + // because the contents of what we return are later pulled out into a + // processing.group rather than a processors.Processors, and the two have + // different error semantics: processors.Processors aborts processing on + // any error, whereas processing.group only aborts on fatal errors. The + // latter is the most common behavior, and the one we are preserving here for + // backwards compatibility. + // We are unhappy about this and have plans to fix this inconsistency at a + // higher level, but for now we need to respect the existing semantics. + procs.List = append(procs.List, userProcessors.List...) + + return procs, nil } diff --git a/libbeat/processors/add_formatted_index/add_formatted_index.go b/libbeat/processors/add_formatted_index/add_formatted_index.go index 23801cd6bb7b..67f29fc78a9a 100644 --- a/libbeat/processors/add_formatted_index/add_formatted_index.go +++ b/libbeat/processors/add_formatted_index/add_formatted_index.go @@ -25,15 +25,19 @@ import ( "github.com/elastic/beats/libbeat/common/fmtstr" ) -// AddFormattedIndex is a Processor to set an event's "raw_index" metadata field +// addFormattedIndex is a Processor to set an event's "raw_index" metadata field // with a given TimestampFormatString. The elasticsearch output interprets // that field as specifying the (raw string) index the event should be sent to; // in other outputs it is just included in the metadata. -type AddFormattedIndex struct { +type addFormattedIndex struct { formatString *fmtstr.TimestampFormatString } -func (p *AddFormattedIndex) Run(event *beat.Event) (*beat.Event, error) { +func New(formatString *fmtstr.TimestampFormatString) *addFormattedIndex { + return &addFormattedIndex{formatString} +} + +func (p *addFormattedIndex) Run(event *beat.Event) (*beat.Event, error) { index, err := p.formatString.Run(event.Timestamp) if err != nil { return nil, err @@ -46,6 +50,6 @@ func (p *AddFormattedIndex) Run(event *beat.Event) (*beat.Event, error) { return event, nil } -func (p *AddFormattedIndex) String() string { +func (p *addFormattedIndex) String() string { return fmt.Sprintf("add_index_pattern=%v", p.formatString) } From 5cc789e98bf28e1ad15c6138edbc32505e3a9c6a Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 11 Dec 2019 16:09:24 -0800 Subject: [PATCH 05/11] Export processor struct --- .../add_formatted_index/add_formatted_index.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/libbeat/processors/add_formatted_index/add_formatted_index.go b/libbeat/processors/add_formatted_index/add_formatted_index.go index 67f29fc78a9a..8e588dd369a1 100644 --- a/libbeat/processors/add_formatted_index/add_formatted_index.go +++ b/libbeat/processors/add_formatted_index/add_formatted_index.go @@ -25,19 +25,19 @@ import ( "github.com/elastic/beats/libbeat/common/fmtstr" ) -// addFormattedIndex is a Processor to set an event's "raw_index" metadata field +// AddFormattedIndex is a Processor to set an event's "raw_index" metadata field // with a given TimestampFormatString. The elasticsearch output interprets // that field as specifying the (raw string) index the event should be sent to; // in other outputs it is just included in the metadata. -type addFormattedIndex struct { +type AddFormattedIndex struct { formatString *fmtstr.TimestampFormatString } -func New(formatString *fmtstr.TimestampFormatString) *addFormattedIndex { - return &addFormattedIndex{formatString} +func New(formatString *fmtstr.TimestampFormatString) *AddFormattedIndex { + return &AddFormattedIndex{formatString} } -func (p *addFormattedIndex) Run(event *beat.Event) (*beat.Event, error) { +func (p *AddFormattedIndex) Run(event *beat.Event) (*beat.Event, error) { index, err := p.formatString.Run(event.Timestamp) if err != nil { return nil, err @@ -50,6 +50,6 @@ func (p *addFormattedIndex) Run(event *beat.Event) (*beat.Event, error) { return event, nil } -func (p *addFormattedIndex) String() string { +func (p *AddFormattedIndex) String() string { return fmt.Sprintf("add_index_pattern=%v", p.formatString) } From 37aae09f61f9f7b33b8758e029deee959791a52b Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 11 Dec 2019 16:10:31 -0800 Subject: [PATCH 06/11] Add comment for exported constructor --- libbeat/processors/add_formatted_index/add_formatted_index.go | 1 + 1 file changed, 1 insertion(+) diff --git a/libbeat/processors/add_formatted_index/add_formatted_index.go b/libbeat/processors/add_formatted_index/add_formatted_index.go index 8e588dd369a1..61735b230fa6 100644 --- a/libbeat/processors/add_formatted_index/add_formatted_index.go +++ b/libbeat/processors/add_formatted_index/add_formatted_index.go @@ -33,6 +33,7 @@ type AddFormattedIndex struct { formatString *fmtstr.TimestampFormatString } +// New returns a new AddFormattedIndex processor. func New(formatString *fmtstr.TimestampFormatString) *AddFormattedIndex { return &AddFormattedIndex{formatString} } From c1e3bbf9713263dfdaa28cd0e67c5aef973eff4d Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 11 Dec 2019 16:11:33 -0800 Subject: [PATCH 07/11] Adding comment for exported method --- libbeat/processors/add_formatted_index/add_formatted_index.go | 1 + 1 file changed, 1 insertion(+) diff --git a/libbeat/processors/add_formatted_index/add_formatted_index.go b/libbeat/processors/add_formatted_index/add_formatted_index.go index 61735b230fa6..ed947be73491 100644 --- a/libbeat/processors/add_formatted_index/add_formatted_index.go +++ b/libbeat/processors/add_formatted_index/add_formatted_index.go @@ -38,6 +38,7 @@ func New(formatString *fmtstr.TimestampFormatString) *AddFormattedIndex { return &AddFormattedIndex{formatString} } +// Run runs the processor. func (p *AddFormattedIndex) Run(event *beat.Event) (*beat.Event, error) { index, err := p.formatString.Run(event.Timestamp) if err != nil { From 6a377dac7f1d2036d22500b62ffd2eb25f9721bc Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 11 Dec 2019 16:50:35 -0800 Subject: [PATCH 08/11] Adding index option to doc --- journalbeat/docs/config-options.asciidoc | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/journalbeat/docs/config-options.asciidoc b/journalbeat/docs/config-options.asciidoc index a52f741c0c8c..37eac9153d9d 100644 --- a/journalbeat/docs/config-options.asciidoc +++ b/journalbeat/docs/config-options.asciidoc @@ -215,3 +215,16 @@ available: `CONTAINER_NAME`:: `container.name` `CONTAINER_PARTIAL_MESSAGE`:: `container.partial` `CONTAINER_TAG`:: `container.image.tag` + +[float] +[id="index"] +==== `index` + +If present, this formatted string overrides the index for events from this input +(for elasticsearch outputs), or sets the `raw_index` field of the event's +metadata (for other outputs). This string can only refer to the agent name and +version and the event timestamp; for access to dynamic fields, use +`output.elasticsearch.index` or a processor. + +Example value: `"%{[agent.name]}-myindex-%{+yyyy.MM.dd}"` might +expand to `"journalbeat-myindex-2019.12.13"`. From e21f673d404b730aade98a6769b16f3541552b31 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 12 Dec 2019 11:19:49 -0800 Subject: [PATCH 09/11] Refactoring: extracting common code --- filebeat/channel/connector.go | 18 ++++-------------- journalbeat/input/input.go | 15 ++------------- libbeat/processors/processor.go | 21 ++++++++++++++++++--- 3 files changed, 24 insertions(+), 30 deletions(-) diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go index e5ab0172d58d..d2a20dd54a2c 100644 --- a/filebeat/channel/connector.go +++ b/filebeat/channel/connector.go @@ -124,12 +124,12 @@ func processorsForConfig( return nil, err } indexProcessor := add_formatted_index.New(timestampFormat) - procs.List = append(procs.List, indexProcessor) + procs.AddProcessor(indexProcessor) } // 2. ClientConfig processors if lst := clientCfg.Processing.Processor; lst != nil { - procs.List = append(procs.List, lst) + procs.AddProcessor(lst) } // 3. User processors @@ -137,17 +137,7 @@ func processorsForConfig( if err != nil { return nil, err } - // Subtlety: it is important here that we append the individual elements of - // userProcessors, rather than userProcessors itself, even though - // userProcessors implements the processors.Processor interface. This is - // because the contents of what we return are later pulled out into a - // processing.group rather than a processors.Processors, and the two have - // different error semantics: processors.Processors aborts processing on - // any error, whereas processing.group only aborts on fatal errors. The - // latter is the most common behavior, and the one we are preserving here for - // backwards compatibility. - // We are unhappy about this and have plans to fix this inconsistency at a - // higher level, but for now we need to respect the existing semantics. - procs.List = append(procs.List, userProcessors.List...) + procs.AddProcessors(*userProcessors) + return procs, nil } diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index c9fb2797a73c..b3a74dd46b89 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -221,7 +221,7 @@ func processorsForInput(beatInfo beat.Info, config Config) (*processors.Processo return nil, err } indexProcessor := add_formatted_index.New(timestampFormat) - procs.List = append(procs.List, indexProcessor) + procs.AddProcessor(indexProcessor) } // 2. User processors @@ -229,18 +229,7 @@ func processorsForInput(beatInfo beat.Info, config Config) (*processors.Processo if err != nil { return nil, err } - // Subtlety: it is important here that we append the individual elements of - // userProcessors, rather than userProcessors itself, even though - // userProcessors implements the processors.Processor interface. This is - // because the contents of what we return are later pulled out into a - // processing.group rather than a processors.Processors, and the two have - // different error semantics: processors.Processors aborts processing on - // any error, whereas processing.group only aborts on fatal errors. The - // latter is the most common behavior, and the one we are preserving here for - // backwards compatibility. - // We are unhappy about this and have plans to fix this inconsistency at a - // higher level, but for now we need to respect the existing semantics. - procs.List = append(procs.List, userProcessors.List...) + procs.AddProcessors(*userProcessors) return procs, nil } diff --git a/libbeat/processors/processor.go b/libbeat/processors/processor.go index 56900a70d9d2..17455dad4382 100644 --- a/libbeat/processors/processor.go +++ b/libbeat/processors/processor.go @@ -60,7 +60,7 @@ func New(config PluginConfig) (*Processors, error) { if err != nil { return nil, errors.Wrap(err, "failed to make if/then/else processor") } - procs.add(p) + procs.AddProcessor(p) continue } @@ -94,7 +94,7 @@ func New(config PluginConfig) (*Processors, error) { return nil, err } - procs.add(plugin) + procs.AddProcessor(plugin) } if len(procs.List) > 0 { @@ -103,10 +103,25 @@ func New(config PluginConfig) (*Processors, error) { return procs, nil } -func (procs *Processors) add(p Processor) { +func (procs *Processors) AddProcessor(p Processor) { procs.List = append(procs.List, p) } +func (procs *Processors) AddProcessors(p Processors) { + // Subtlety: it is important here that we append the individual elements of + // p, rather than p itself, even though + // p implements the processors.Processor interface. This is + // because the contents of what we return are later pulled out into a + // processing.group rather than a processors.Processors, and the two have + // different error semantics: processors.Processors aborts processing on + // any error, whereas processing.group only aborts on fatal errors. The + // latter is the most common behavior, and the one we are preserving here for + // backwards compatibility. + // We are unhappy about this and have plans to fix this inconsistency at a + // higher level, but for now we need to respect the existing semantics. + procs.List = append(procs.List, p.List...) +} + // RunBC (run backwards-compatible) applies the processors, by providing the // old interface based on common.MapStr. // The event us temporarily converted to beat.Event. By this 'conversion' the From c5d054518db101f60d018c22c9b82e44203c71bc Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 12 Dec 2019 11:32:03 -0800 Subject: [PATCH 10/11] Adding unit tests --- journalbeat/input/input_test.go | 164 ++++++++++++++++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 journalbeat/input/input_test.go diff --git a/journalbeat/input/input_test.go b/journalbeat/input/input_test.go new file mode 100644 index 000000000000..3cbc416133ec --- /dev/null +++ b/journalbeat/input/input_test.go @@ -0,0 +1,164 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package input + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" + _ "github.com/elastic/beats/libbeat/processors/actions" +) + +func TestProcessorsForInput(t *testing.T) { + testCases := map[string]struct { + beatInfo beat.Info + configStr string + event beat.Event + expectedFields map[string]string + }{ + "Simple static index": { + configStr: "index: 'test'", + expectedFields: map[string]string{ + "@metadata.raw_index": "test", + }, + }, + "Index with agent info + timestamp": { + beatInfo: beat.Info{Beat: "TestBeat", Version: "3.9.27"}, + configStr: "index: 'beat-%{[agent.name]}-%{[agent.version]}-%{+yyyy.MM.dd}'", + event: beat.Event{Timestamp: time.Date(1999, time.December, 31, 23, 0, 0, 0, time.UTC)}, + expectedFields: map[string]string{ + "@metadata.raw_index": "beat-TestBeat-3.9.27-1999.12.31", + }, + }, + "Set field in input config": { + configStr: `processors: [add_fields: {fields: {testField: inputConfig}}]`, + expectedFields: map[string]string{ + "fields.testField": "inputConfig", + }, + }, + } + for description, test := range testCases { + if test.event.Fields == nil { + test.event.Fields = common.MapStr{} + } + config, err := inputConfigFromString(test.configStr) + if err != nil { + t.Errorf("[%s] %v", description, err) + continue + } + processors, err := processorsForInput(test.beatInfo, config) + if err != nil { + t.Errorf("[%s] %v", description, err) + continue + } + processedEvent, err := processors.Run(&test.event) + // We don't check if err != nil, because we are testing the final outcome + // of running the processors, including when some of them fail. + if processedEvent == nil { + t.Errorf("[%s] Unexpected fatal error running processors: %v\n", + description, err) + } + for key, value := range test.expectedFields { + field, err := processedEvent.GetValue(key) + if err != nil { + t.Errorf("[%s] Couldn't get field %s from event: %v", description, key, err) + continue + } + assert.Equal(t, field, value) + fieldStr, ok := field.(string) + if !ok { + // Note that requiring a string here is just to simplify the test setup, + // not a requirement of the underlying api. + t.Errorf("[%s] Field [%s] should be a string", description, key) + continue + } + if fieldStr != value { + t.Errorf("[%s] Event field [%s]: expected [%s], got [%s]", description, key, value, fieldStr) + } + } + } +} + +func TestProcessorsForInputIsFlat(t *testing.T) { + // This test is regrettable, and exists because of inconsistencies in + // processor handling between processors.Processors and processing.group + // (which implements beat.ProcessorList) -- see processorsForConfig for + // details. The upshot is that, for now, if the input configuration specifies + // processors, they must be returned as direct children of the resulting + // processors.Processors (rather than being collected in additional tree + // structure). + // This test should be removed once we have a more consistent mechanism for + // collecting and running processors. + configStr := `processors: +- add_fields: {fields: {testField: value}} +- add_fields: {fields: {testField2: stuff}}` + config, err := inputConfigFromString(configStr) + if err != nil { + t.Fatal(err) + } + processors, err := processorsForInput( + beat.Info{}, config) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, 2, len(processors.List)) +} + +// setRawIndex is a bare-bones processor to set the raw_index field to a +// constant string in the event metadata. It is used to test order of operations +// for processorsForConfig. +type setRawIndex struct { + indexStr string +} + +func (p *setRawIndex) Run(event *beat.Event) (*beat.Event, error) { + if event.Meta == nil { + event.Meta = common.MapStr{} + } + event.Meta["raw_index"] = p.indexStr + return event, nil +} + +func (p *setRawIndex) String() string { + return fmt.Sprintf("set_raw_index=%v", p.indexStr) +} + +// Helper function to convert from YML input string to an unpacked +// Config +func inputConfigFromString(s string) (Config, error) { + config := Config{} + cfg, err := common.NewConfigFrom(s) + if err != nil { + return config, err + } + err = cfg.Unpack(&config) + return config, err +} + +// makeProcessors wraps one or more bare Processor objects in Processors. +func makeProcessors(procs ...processors.Processor) *processors.Processors { + procList := processors.NewList(nil) + procList.List = procs + return procList +} From 1dc8d72a3a9ea30fa5ef20c30d63755dd7f2b634 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 12 Dec 2019 11:33:32 -0800 Subject: [PATCH 11/11] Adding godoc for exported methods --- libbeat/processors/processor.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/libbeat/processors/processor.go b/libbeat/processors/processor.go index 17455dad4382..5eaa6dd2fb4c 100644 --- a/libbeat/processors/processor.go +++ b/libbeat/processors/processor.go @@ -103,10 +103,12 @@ func New(config PluginConfig) (*Processors, error) { return procs, nil } +// AddProcessor adds a single Processor to Processors func (procs *Processors) AddProcessor(p Processor) { procs.List = append(procs.List, p) } +// AddProcessors adds more Processors to Processors func (procs *Processors) AddProcessors(p Processors) { // Subtlety: it is important here that we append the individual elements of // p, rather than p itself, even though