Skip to content

Commit

Permalink
[Filebeat] Select output index based on the source input (elastic#14010)
Browse files Browse the repository at this point in the history
  • Loading branch information
faec authored Nov 7, 2019
1 parent 847bd2a commit 6a03478
Show file tree
Hide file tree
Showing 12 changed files with 534 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add Kibana Dashboard for MISP module. {pull}14147[14147]
- Add JSON options to autodiscover hints {pull}14208[14208]
- Add more filesets to Zeek module. {pull}14150[14150]
- Add `index` option to all inputs to directly set a per-input index value. {pull}14010[14010]
- Remove beta flag for some filebeat modules. {pull}14374[14374]

*Heartbeat*
Expand Down
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {

outDone := make(chan struct{}) // outDone closes down all active pipeline connections
crawler, err := crawler.New(
channel.NewOutletFactory(outDone, wgEvents).Create,
channel.NewOutletFactory(outDone, wgEvents, b.Info).Create,
config.Inputs,
b.Info.Version,
fb.done,
Expand Down
89 changes: 74 additions & 15 deletions filebeat/channel/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package channel

import (
"fmt"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/processors"
)

Expand All @@ -31,6 +34,14 @@ type pipelineConnector struct {
pipeline beat.Pipeline
}

// addFormattedIndex is a Processor to set an event's "raw_index" metadata field
// with a given TimestampFormatString. The elasticsearch output interprets
// that field as specifying the (raw string) index the event should be sent to;
// in other outputs it is just included in the metadata.
type addFormattedIndex struct {
formatString *fmtstr.TimestampFormatString
}

// Connect passes the cfg and the zero value of beat.ClientConfig to the underlying function.
func (fn ConnectorFunc) Connect(cfg *common.Config) (Outleter, error) {
return fn(cfg, beat.ClientConfig{})
Expand All @@ -51,24 +62,11 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien
return nil, err
}

var err error
var userProcessors beat.ProcessorList

userProcessors, err = processors.New(config.Processors)
procs, err := processorsForConfig(c.parent.beatInfo, config, clientCfg)
if err != nil {
return nil, err
}

if lst := clientCfg.Processing.Processor; lst != nil {
if len(userProcessors.All()) == 0 {
userProcessors = lst
} else if orig := lst.All(); len(orig) > 0 {
newLst := processors.NewList(nil)
newLst.List = append(newLst.List, lst, userProcessors)
userProcessors = newLst
}
}

setOptional := func(to common.MapStr, key string, value string) {
if value != "" {
to.Put(key, value)
Expand Down Expand Up @@ -105,7 +103,7 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien
clientCfg.Processing.EventMetadata = config.EventMetadata
clientCfg.Processing.Meta = meta
clientCfg.Processing.Fields = fields
clientCfg.Processing.Processor = userProcessors
clientCfg.Processing.Processor = procs
clientCfg.Processing.KeepNull = config.KeepNull
client, err := c.pipeline.ConnectWith(clientCfg)
if err != nil {
Expand All @@ -118,3 +116,64 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien
}
return outlet, nil
}

// processorsForConfig assembles the Processors for a pipelineConnector.
func processorsForConfig(
beatInfo beat.Info, config inputOutletConfig, clientCfg beat.ClientConfig,
) (*processors.Processors, error) {
procs := processors.NewList(nil)

// Processor ordering is important:
// 1. Index configuration
if !config.Index.IsEmpty() {
staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version)
timestampFormat, err :=
fmtstr.NewTimestampFormatString(&config.Index, staticFields)
if err != nil {
return nil, err
}
indexProcessor := &addFormattedIndex{timestampFormat}
procs.List = append(procs.List, indexProcessor)
}

// 2. ClientConfig processors
if lst := clientCfg.Processing.Processor; lst != nil {
procs.List = append(procs.List, lst)
}

// 3. User processors
userProcessors, err := processors.New(config.Processors)
if err != nil {
return nil, err
}
// Subtlety: it is important here that we append the individual elements of
// userProcessors, rather than userProcessors itself, even though
// userProcessors implements the processors.Processor interface. This is
// because the contents of what we return are later pulled out into a
// processing.group rather than a processors.Processors, and the two have
// different error semantics: processors.Processors aborts processing on
// any error, whereas processing.group only aborts on fatal errors. The
// latter is the most common behavior, and the one we are preserving here for
// backwards compatibility.
// We are unhappy about this and have plans to fix this inconsistency at a
// higher level, but for now we need to respect the existing semantics.
procs.List = append(procs.List, userProcessors.List...)
return procs, nil
}

func (p *addFormattedIndex) Run(event *beat.Event) (*beat.Event, error) {
index, err := p.formatString.Run(event.Timestamp)
if err != nil {
return nil, err
}

if event.Meta == nil {
event.Meta = common.MapStr{}
}
event.Meta["raw_index"] = index
return event, nil
}

func (p *addFormattedIndex) String() string {
return fmt.Sprintf("add_index_pattern=%v", p.formatString)
}
213 changes: 213 additions & 0 deletions filebeat/channel/connector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package channel

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/actions"
)

func TestProcessorsForConfig(t *testing.T) {
testCases := map[string]struct {
beatInfo beat.Info
configStr string
clientCfg beat.ClientConfig
event beat.Event
expectedFields map[string]string
}{
"Simple static index": {
configStr: "index: 'test'",
expectedFields: map[string]string{
"@metadata.raw_index": "test",
},
},
"Index with agent info + timestamp": {
beatInfo: beat.Info{Beat: "TestBeat", Version: "3.9.27"},
configStr: "index: 'beat-%{[agent.name]}-%{[agent.version]}-%{+yyyy.MM.dd}'",
event: beat.Event{Timestamp: time.Date(1999, time.December, 31, 23, 0, 0, 0, time.UTC)},
expectedFields: map[string]string{
"@metadata.raw_index": "beat-TestBeat-3.9.27-1999.12.31",
},
},
"Set index in ClientConfig": {
clientCfg: beat.ClientConfig{
Processing: beat.ProcessingConfig{
Processor: makeProcessors(&setRawIndex{"clientCfgIndex"}),
},
},
expectedFields: map[string]string{
"@metadata.raw_index": "clientCfgIndex",
},
},
"ClientConfig processor runs after beat input Index": {
configStr: "index: 'test'",
clientCfg: beat.ClientConfig{
Processing: beat.ProcessingConfig{
Processor: makeProcessors(&setRawIndex{"clientCfgIndex"}),
},
},
expectedFields: map[string]string{
"@metadata.raw_index": "clientCfgIndex",
},
},
"Set field in input config": {
configStr: `processors: [add_fields: {fields: {testField: inputConfig}}]`,
expectedFields: map[string]string{
"fields.testField": "inputConfig",
},
},
"Set field in ClientConfig": {
clientCfg: beat.ClientConfig{
Processing: beat.ProcessingConfig{
Processor: makeProcessors(actions.NewAddFields(common.MapStr{
"fields": common.MapStr{"testField": "clientConfig"},
}, false)),
},
},
expectedFields: map[string]string{
"fields.testField": "clientConfig",
},
},
"Input config processors run after ClientConfig": {
configStr: `processors: [add_fields: {fields: {testField: inputConfig}}]`,
clientCfg: beat.ClientConfig{
Processing: beat.ProcessingConfig{
Processor: makeProcessors(actions.NewAddFields(common.MapStr{
"fields": common.MapStr{"testField": "clientConfig"},
}, false)),
},
},
expectedFields: map[string]string{
"fields.testField": "inputConfig",
},
},
}
for description, test := range testCases {
if test.event.Fields == nil {
test.event.Fields = common.MapStr{}
}
config, err := outletConfigFromString(test.configStr)
if err != nil {
t.Errorf("[%s] %v", description, err)
continue
}
processors, err := processorsForConfig(test.beatInfo, config, test.clientCfg)
if err != nil {
t.Errorf("[%s] %v", description, err)
continue
}
processedEvent, err := processors.Run(&test.event)
// We don't check if err != nil, because we are testing the final outcome
// of running the processors, including when some of them fail.
if processedEvent == nil {
t.Errorf("[%s] Unexpected fatal error running processors: %v\n",
description, err)
}
for key, value := range test.expectedFields {
field, err := processedEvent.GetValue(key)
if err != nil {
t.Errorf("[%s] Couldn't get field %s from event: %v", description, key, err)
continue
}
assert.Equal(t, field, value)
fieldStr, ok := field.(string)
if !ok {
// Note that requiring a string here is just to simplify the test setup,
// not a requirement of the underlying api.
t.Errorf("[%s] Field [%s] should be a string", description, key)
continue
}
if fieldStr != value {
t.Errorf("[%s] Event field [%s]: expected [%s], got [%s]", description, key, value, fieldStr)
}
}
}
}

func TestProcessorsForConfigIsFlat(t *testing.T) {
// This test is regrettable, and exists because of inconsistencies in
// processor handling between processors.Processors and processing.group
// (which implements beat.ProcessorList) -- see processorsForConfig for
// details. The upshot is that, for now, if the input configuration specifies
// processors, they must be returned as direct children of the resulting
// processors.Processors (rather than being collected in additional tree
// structure).
// This test should be removed once we have a more consistent mechanism for
// collecting and running processors.
configStr := `processors:
- add_fields: {fields: {testField: value}}
- add_fields: {fields: {testField2: stuff}}`
config, err := outletConfigFromString(configStr)
if err != nil {
t.Fatal(err)
}
processors, err := processorsForConfig(
beat.Info{}, config, beat.ClientConfig{})
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 2, len(processors.List))
}

// setRawIndex is a bare-bones processor to set the raw_index field to a
// constant string in the event metadata. It is used to test order of operations
// for processorsForConfig.
type setRawIndex struct {
indexStr string
}

func (p *setRawIndex) Run(event *beat.Event) (*beat.Event, error) {
if event.Meta == nil {
event.Meta = common.MapStr{}
}
event.Meta["raw_index"] = p.indexStr
return event, nil
}

func (p *setRawIndex) String() string {
return fmt.Sprintf("set_raw_index=%v", p.indexStr)
}

// Helper function to convert from YML input string to an unpacked
// inputOutletConfig
func outletConfigFromString(s string) (inputOutletConfig, error) {
config := inputOutletConfig{}
cfg, err := common.NewConfigFrom(s)
if err != nil {
return config, err
}
if err := cfg.Unpack(&config); err != nil {
return config, err
}
return config, nil
}

// makeProcessors wraps one or more bare Processor objects in Processors.
func makeProcessors(procs ...processors.Processor) *processors.Processors {
procList := processors.NewList(nil)
procList.List = procs
return procList
}
8 changes: 6 additions & 2 deletions filebeat/channel/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package channel
import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/processors"
)

Expand All @@ -28,6 +29,7 @@ type OutletFactory struct {

eventer beat.ClientEventer
wgEvents eventCounter
beatInfo beat.Info
}

type eventCounter interface {
Expand Down Expand Up @@ -57,19 +59,21 @@ type inputOutletConfig struct {
Fileset string `config:"_fileset_name"` // hidden setting

// Output meta data settings
Pipeline string `config:"pipeline"` // ES Ingest pipeline name

Pipeline string `config:"pipeline"` // ES Ingest pipeline name
Index fmtstr.EventFormatString `config:"index"` // ES output index pattern
}

// NewOutletFactory creates a new outlet factory for
// connecting an input to the publisher pipeline.
func NewOutletFactory(
done <-chan struct{},
wgEvents eventCounter,
beatInfo beat.Info,
) *OutletFactory {
o := &OutletFactory{
done: done,
wgEvents: wgEvents,
beatInfo: beatInfo,
}

if wgEvents != nil {
Expand Down
Loading

0 comments on commit 6a03478

Please sign in to comment.