Skip to content

Commit

Permalink
[Filebeat] Select output index based on the source input (elastic#14010)
Browse files Browse the repository at this point in the history
(cherry picked from commit 6a03478)
  • Loading branch information
faec committed Nov 20, 2019
1 parent c67ff19 commit 9d64b3d
Show file tree
Hide file tree
Showing 12 changed files with 574 additions and 49 deletions.
41 changes: 41 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,47 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Filebeat*

- `container` and `docker` inputs now support reading of labels and env vars written by docker JSON file logging driver. {issue}8358[8358]
- Add specific date processor to convert timezones so same pipeline can be used when convert_timezone is enabled or disabled. {pull}12253[12253]
- Add MSSQL module {pull}12079[12079]
- Add ISO8601 date parsing support for system module. {pull}12568[12568] {pull}12578[12579]
- Update Kubernetes deployment manifest to use `container` input. {pull}12632[12632]
- Use correct OS path separator in `add_kubernetes_metadata` to support Windows nodes. {pull}9205[9205]
- Add support for virtual host in Apache access logs {pull}12778[12778]
- Add support for client addresses with port in Apache error logs {pull}12695[12695]
- Add `google-pubsub` input type for consuming messages from a Google Cloud Pub/Sub topic subscription. {pull}12746[12746]
- Add module for ingesting Cisco IOS logs over syslog. {pull}12748[12748]
- Add module for ingesting Google Cloud VPC flow logs. {pull}12747[12747]
- Report host metadata for Filebeat logs in Kubernetes. {pull}12790[12790]
- Add netflow dashboards based on Logstash netflow. {pull}12857[12857]
- Parse more fields from Elasticsearch slowlogs. {pull}11939[11939]
- Update module pipelines to enrich events with autonomous system fields. {pull}13036[13036]
- Add module for ingesting IBM MQ logs. {pull}8782[8782]
- Add S3 input to retrieve logs from AWS S3 buckets. {pull}12640[12640] {issue}12582[12582]
- Add aws module s3access metricset. {pull}13170[13170] {issue}12880[12880]
- Update Suricata module to populate ECS DNS fields and handle EVE DNS version 2. {issue}13320[13320] {pull}13329[13329]
- Update PAN-OS fileset to use the ECS NAT fields. {issue}13320[13320] {pull}13330[13330]
- Add fields to the Zeek DNS fileset for ECS DNS. {issue}13320[13320] {pull}13324[13324]
- Add container image in Kubernetes metadata {pull}13356[13356] {issue}12688[12688]
- Add timezone information to apache error fileset. {issue}12772[12772] {pull}13304[13304]
- Add module for ingesting Cisco FTD logs over syslog. {pull}13286[13286]
- Update CoreDNS module to populate ECS DNS fields. {issue}13320[13320] {pull}13505[13505]
- Parse query steps in PostgreSQL slowlogs. {issue}13496[13496] {pull}13701[13701]
- Add filebeat azure module with activitylogs, auditlogs, signinlogs filesets. {pull}13776[13776] {pull}14033[14033]
- Add support to set the document id in the json reader. {pull}5844[5844]
- Add input httpjson. {issue}13545[13545] {pull}13546[13546]
- Filebeat Netflow input: Remove beta label. {pull}13858[13858]
- Remove `event.timezone` from events that don't need it in some modules that support log formats with and without timezones. {pull}13918[13918]
- Add ExpandEventListFromField config option in the kafka input. {pull}13965[13965]
- Add ELB fileset to AWS module. {pull}14020[14020]
- Add module for MISP (Malware Information Sharing Platform). {pull}13805[13805]
- Add `source.bytes` and `source.packets` for uni-directional netflow events. {pull}14111[14111]
- Add support for gzipped files in S3 input. {pull}13980[13980]
- Add support for all the ObjectCreated events in S3 input. {pull}14077[14077]
- Add Kibana Dashboard for MISP module. {pull}14147[14147]
- Add JSON options to autodiscover hints {pull}14208[14208]
- Add more filesets to Zeek module. {pull}14150[14150]
- Add `index` option to all inputs to directly set a per-input index value. {pull}14010[14010]
- Remove beta flag for some filebeat modules. {pull}14374[14374]

*Heartbeat*

Expand Down
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {

outDone := make(chan struct{}) // outDone closes down all active pipeline connections
crawler, err := crawler.New(
channel.NewOutletFactory(outDone, wgEvents).Create,
channel.NewOutletFactory(outDone, wgEvents, b.Info).Create,
config.Inputs,
b.Info.Version,
fb.done,
Expand Down
89 changes: 74 additions & 15 deletions filebeat/channel/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package channel

import (
"fmt"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/processors"
)

Expand All @@ -31,6 +34,14 @@ type pipelineConnector struct {
pipeline beat.Pipeline
}

// addFormattedIndex is a Processor to set an event's "raw_index" metadata field
// with a given TimestampFormatString. The elasticsearch output interprets
// that field as specifying the (raw string) index the event should be sent to;
// in other outputs it is just included in the metadata.
type addFormattedIndex struct {
formatString *fmtstr.TimestampFormatString
}

// Connect passes the cfg and the zero value of beat.ClientConfig to the underlying function.
func (fn ConnectorFunc) Connect(cfg *common.Config) (Outleter, error) {
return fn(cfg, beat.ClientConfig{})
Expand All @@ -51,24 +62,11 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien
return nil, err
}

var err error
var userProcessors beat.ProcessorList

userProcessors, err = processors.New(config.Processors)
procs, err := processorsForConfig(c.parent.beatInfo, config, clientCfg)
if err != nil {
return nil, err
}

if lst := clientCfg.Processing.Processor; lst != nil {
if len(userProcessors.All()) == 0 {
userProcessors = lst
} else if orig := lst.All(); len(orig) > 0 {
newLst := processors.NewList(nil)
newLst.List = append(newLst.List, lst, userProcessors)
userProcessors = newLst
}
}

setOptional := func(to common.MapStr, key string, value string) {
if value != "" {
to.Put(key, value)
Expand Down Expand Up @@ -105,7 +103,7 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien
clientCfg.Processing.EventMetadata = config.EventMetadata
clientCfg.Processing.Meta = meta
clientCfg.Processing.Fields = fields
clientCfg.Processing.Processor = userProcessors
clientCfg.Processing.Processor = procs
clientCfg.Processing.KeepNull = config.KeepNull
client, err := c.pipeline.ConnectWith(clientCfg)
if err != nil {
Expand All @@ -118,3 +116,64 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien
}
return outlet, nil
}

// processorsForConfig assembles the Processors for a pipelineConnector.
func processorsForConfig(
beatInfo beat.Info, config inputOutletConfig, clientCfg beat.ClientConfig,
) (*processors.Processors, error) {
procs := processors.NewList(nil)

// Processor ordering is important:
// 1. Index configuration
if !config.Index.IsEmpty() {
staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version)
timestampFormat, err :=
fmtstr.NewTimestampFormatString(&config.Index, staticFields)
if err != nil {
return nil, err
}
indexProcessor := &addFormattedIndex{timestampFormat}
procs.List = append(procs.List, indexProcessor)
}

// 2. ClientConfig processors
if lst := clientCfg.Processing.Processor; lst != nil {
procs.List = append(procs.List, lst)
}

// 3. User processors
userProcessors, err := processors.New(config.Processors)
if err != nil {
return nil, err
}
// Subtlety: it is important here that we append the individual elements of
// userProcessors, rather than userProcessors itself, even though
// userProcessors implements the processors.Processor interface. This is
// because the contents of what we return are later pulled out into a
// processing.group rather than a processors.Processors, and the two have
// different error semantics: processors.Processors aborts processing on
// any error, whereas processing.group only aborts on fatal errors. The
// latter is the most common behavior, and the one we are preserving here for
// backwards compatibility.
// We are unhappy about this and have plans to fix this inconsistency at a
// higher level, but for now we need to respect the existing semantics.
procs.List = append(procs.List, userProcessors.List...)
return procs, nil
}

func (p *addFormattedIndex) Run(event *beat.Event) (*beat.Event, error) {
index, err := p.formatString.Run(event.Timestamp)
if err != nil {
return nil, err
}

if event.Meta == nil {
event.Meta = common.MapStr{}
}
event.Meta["raw_index"] = index
return event, nil
}

func (p *addFormattedIndex) String() string {
return fmt.Sprintf("add_index_pattern=%v", p.formatString)
}
213 changes: 213 additions & 0 deletions filebeat/channel/connector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package channel

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/actions"
)

func TestProcessorsForConfig(t *testing.T) {
testCases := map[string]struct {
beatInfo beat.Info
configStr string
clientCfg beat.ClientConfig
event beat.Event
expectedFields map[string]string
}{
"Simple static index": {
configStr: "index: 'test'",
expectedFields: map[string]string{
"@metadata.raw_index": "test",
},
},
"Index with agent info + timestamp": {
beatInfo: beat.Info{Beat: "TestBeat", Version: "3.9.27"},
configStr: "index: 'beat-%{[agent.name]}-%{[agent.version]}-%{+yyyy.MM.dd}'",
event: beat.Event{Timestamp: time.Date(1999, time.December, 31, 23, 0, 0, 0, time.UTC)},
expectedFields: map[string]string{
"@metadata.raw_index": "beat-TestBeat-3.9.27-1999.12.31",
},
},
"Set index in ClientConfig": {
clientCfg: beat.ClientConfig{
Processing: beat.ProcessingConfig{
Processor: makeProcessors(&setRawIndex{"clientCfgIndex"}),
},
},
expectedFields: map[string]string{
"@metadata.raw_index": "clientCfgIndex",
},
},
"ClientConfig processor runs after beat input Index": {
configStr: "index: 'test'",
clientCfg: beat.ClientConfig{
Processing: beat.ProcessingConfig{
Processor: makeProcessors(&setRawIndex{"clientCfgIndex"}),
},
},
expectedFields: map[string]string{
"@metadata.raw_index": "clientCfgIndex",
},
},
"Set field in input config": {
configStr: `processors: [add_fields: {fields: {testField: inputConfig}}]`,
expectedFields: map[string]string{
"fields.testField": "inputConfig",
},
},
"Set field in ClientConfig": {
clientCfg: beat.ClientConfig{
Processing: beat.ProcessingConfig{
Processor: makeProcessors(actions.NewAddFields(common.MapStr{
"fields": common.MapStr{"testField": "clientConfig"},
}, false)),
},
},
expectedFields: map[string]string{
"fields.testField": "clientConfig",
},
},
"Input config processors run after ClientConfig": {
configStr: `processors: [add_fields: {fields: {testField: inputConfig}}]`,
clientCfg: beat.ClientConfig{
Processing: beat.ProcessingConfig{
Processor: makeProcessors(actions.NewAddFields(common.MapStr{
"fields": common.MapStr{"testField": "clientConfig"},
}, false)),
},
},
expectedFields: map[string]string{
"fields.testField": "inputConfig",
},
},
}
for description, test := range testCases {
if test.event.Fields == nil {
test.event.Fields = common.MapStr{}
}
config, err := outletConfigFromString(test.configStr)
if err != nil {
t.Errorf("[%s] %v", description, err)
continue
}
processors, err := processorsForConfig(test.beatInfo, config, test.clientCfg)
if err != nil {
t.Errorf("[%s] %v", description, err)
continue
}
processedEvent, err := processors.Run(&test.event)
// We don't check if err != nil, because we are testing the final outcome
// of running the processors, including when some of them fail.
if processedEvent == nil {
t.Errorf("[%s] Unexpected fatal error running processors: %v\n",
description, err)
}
for key, value := range test.expectedFields {
field, err := processedEvent.GetValue(key)
if err != nil {
t.Errorf("[%s] Couldn't get field %s from event: %v", description, key, err)
continue
}
assert.Equal(t, field, value)
fieldStr, ok := field.(string)
if !ok {
// Note that requiring a string here is just to simplify the test setup,
// not a requirement of the underlying api.
t.Errorf("[%s] Field [%s] should be a string", description, key)
continue
}
if fieldStr != value {
t.Errorf("[%s] Event field [%s]: expected [%s], got [%s]", description, key, value, fieldStr)
}
}
}
}

func TestProcessorsForConfigIsFlat(t *testing.T) {
// This test is regrettable, and exists because of inconsistencies in
// processor handling between processors.Processors and processing.group
// (which implements beat.ProcessorList) -- see processorsForConfig for
// details. The upshot is that, for now, if the input configuration specifies
// processors, they must be returned as direct children of the resulting
// processors.Processors (rather than being collected in additional tree
// structure).
// This test should be removed once we have a more consistent mechanism for
// collecting and running processors.
configStr := `processors:
- add_fields: {fields: {testField: value}}
- add_fields: {fields: {testField2: stuff}}`
config, err := outletConfigFromString(configStr)
if err != nil {
t.Fatal(err)
}
processors, err := processorsForConfig(
beat.Info{}, config, beat.ClientConfig{})
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 2, len(processors.List))
}

// setRawIndex is a bare-bones processor to set the raw_index field to a
// constant string in the event metadata. It is used to test order of operations
// for processorsForConfig.
type setRawIndex struct {
indexStr string
}

func (p *setRawIndex) Run(event *beat.Event) (*beat.Event, error) {
if event.Meta == nil {
event.Meta = common.MapStr{}
}
event.Meta["raw_index"] = p.indexStr
return event, nil
}

func (p *setRawIndex) String() string {
return fmt.Sprintf("set_raw_index=%v", p.indexStr)
}

// Helper function to convert from YML input string to an unpacked
// inputOutletConfig
func outletConfigFromString(s string) (inputOutletConfig, error) {
config := inputOutletConfig{}
cfg, err := common.NewConfigFrom(s)
if err != nil {
return config, err
}
if err := cfg.Unpack(&config); err != nil {
return config, err
}
return config, nil
}

// makeProcessors wraps one or more bare Processor objects in Processors.
func makeProcessors(procs ...processors.Processor) *processors.Processors {
procList := processors.NewList(nil)
procList.List = procs
return procList
}
Loading

0 comments on commit 9d64b3d

Please sign in to comment.