From be728cc16a78d596c5f8ff62ce4b5dcaea2b4d04 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 7 Nov 2019 16:38:09 -0500 Subject: [PATCH] [Filebeat] Select output index based on the source input (#14010) (cherry picked from commit 6a034782a2aa363c539000e3b307b00178d29080) --- CHANGELOG.next.asciidoc | 41 ++++ filebeat/beater/filebeat.go | 2 +- filebeat/channel/connector.go | 89 ++++++-- filebeat/channel/connector_test.go | 213 ++++++++++++++++++ filebeat/channel/factory.go | 8 +- .../docs/inputs/input-common-options.asciidoc | 14 +- filebeat/input/input.go | 4 +- filebeat/kafka.yml | 8 + libbeat/common/fmtstr/formattimestamp.go | 80 +++++++ libbeat/common/fmtstr/formattimestamp_test.go | 108 +++++++++ libbeat/idxmgmt/ilm/ilm.go | 24 +- libbeat/idxmgmt/std.go | 32 ++- 12 files changed, 574 insertions(+), 49 deletions(-) create mode 100644 filebeat/channel/connector_test.go create mode 100644 filebeat/kafka.yml create mode 100644 libbeat/common/fmtstr/formattimestamp.go create mode 100644 libbeat/common/fmtstr/formattimestamp_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7731450332db..dc3013645039 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -89,6 +89,47 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Filebeat* - `container` and `docker` inputs now support reading of labels and env vars written by docker JSON file logging driver. {issue}8358[8358] +- Add specific date processor to convert timezones so same pipeline can be used when convert_timezone is enabled or disabled. {pull}12253[12253] +- Add MSSQL module {pull}12079[12079] +- Add ISO8601 date parsing support for system module. {pull}12568[12568] {pull}12578[12579] +- Update Kubernetes deployment manifest to use `container` input. {pull}12632[12632] +- Use correct OS path separator in `add_kubernetes_metadata` to support Windows nodes. {pull}9205[9205] +- Add support for virtual host in Apache access logs {pull}12778[12778] +- Add support for client addresses with port in Apache error logs {pull}12695[12695] +- Add `google-pubsub` input type for consuming messages from a Google Cloud Pub/Sub topic subscription. {pull}12746[12746] +- Add module for ingesting Cisco IOS logs over syslog. {pull}12748[12748] +- Add module for ingesting Google Cloud VPC flow logs. {pull}12747[12747] +- Report host metadata for Filebeat logs in Kubernetes. {pull}12790[12790] +- Add netflow dashboards based on Logstash netflow. {pull}12857[12857] +- Parse more fields from Elasticsearch slowlogs. {pull}11939[11939] +- Update module pipelines to enrich events with autonomous system fields. {pull}13036[13036] +- Add module for ingesting IBM MQ logs. {pull}8782[8782] +- Add S3 input to retrieve logs from AWS S3 buckets. {pull}12640[12640] {issue}12582[12582] +- Add aws module s3access metricset. {pull}13170[13170] {issue}12880[12880] +- Update Suricata module to populate ECS DNS fields and handle EVE DNS version 2. {issue}13320[13320] {pull}13329[13329] +- Update PAN-OS fileset to use the ECS NAT fields. {issue}13320[13320] {pull}13330[13330] +- Add fields to the Zeek DNS fileset for ECS DNS. {issue}13320[13320] {pull}13324[13324] +- Add container image in Kubernetes metadata {pull}13356[13356] {issue}12688[12688] +- Add timezone information to apache error fileset. {issue}12772[12772] {pull}13304[13304] +- Add module for ingesting Cisco FTD logs over syslog. {pull}13286[13286] +- Update CoreDNS module to populate ECS DNS fields. {issue}13320[13320] {pull}13505[13505] +- Parse query steps in PostgreSQL slowlogs. {issue}13496[13496] {pull}13701[13701] +- Add filebeat azure module with activitylogs, auditlogs, signinlogs filesets. {pull}13776[13776] {pull}14033[14033] +- Add support to set the document id in the json reader. {pull}5844[5844] +- Add input httpjson. {issue}13545[13545] {pull}13546[13546] +- Filebeat Netflow input: Remove beta label. {pull}13858[13858] +- Remove `event.timezone` from events that don't need it in some modules that support log formats with and without timezones. {pull}13918[13918] +- Add ExpandEventListFromField config option in the kafka input. {pull}13965[13965] +- Add ELB fileset to AWS module. {pull}14020[14020] +- Add module for MISP (Malware Information Sharing Platform). {pull}13805[13805] +- Add `source.bytes` and `source.packets` for uni-directional netflow events. {pull}14111[14111] +- Add support for gzipped files in S3 input. {pull}13980[13980] +- Add support for all the ObjectCreated events in S3 input. {pull}14077[14077] +- Add Kibana Dashboard for MISP module. {pull}14147[14147] +- Add JSON options to autodiscover hints {pull}14208[14208] +- Add more filesets to Zeek module. {pull}14150[14150] +- Add `index` option to all inputs to directly set a per-input index value. {pull}14010[14010] +- Remove beta flag for some filebeat modules. {pull}14374[14374] *Heartbeat* diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index c30203dea16b..76e7e8b12f80 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -326,7 +326,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { outDone := make(chan struct{}) // outDone closes down all active pipeline connections crawler, err := crawler.New( - channel.NewOutletFactory(outDone, wgEvents).Create, + channel.NewOutletFactory(outDone, wgEvents, b.Info).Create, config.Inputs, b.Info.Version, fb.done, diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go index 3d584df38f5b..ebd5983a3f7b 100644 --- a/filebeat/channel/connector.go +++ b/filebeat/channel/connector.go @@ -18,8 +18,11 @@ package channel import ( + "fmt" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/processors" ) @@ -31,6 +34,14 @@ type pipelineConnector struct { pipeline beat.Pipeline } +// addFormattedIndex is a Processor to set an event's "raw_index" metadata field +// with a given TimestampFormatString. The elasticsearch output interprets +// that field as specifying the (raw string) index the event should be sent to; +// in other outputs it is just included in the metadata. +type addFormattedIndex struct { + formatString *fmtstr.TimestampFormatString +} + // Connect passes the cfg and the zero value of beat.ClientConfig to the underlying function. func (fn ConnectorFunc) Connect(cfg *common.Config) (Outleter, error) { return fn(cfg, beat.ClientConfig{}) @@ -51,24 +62,11 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien return nil, err } - var err error - var userProcessors beat.ProcessorList - - userProcessors, err = processors.New(config.Processors) + procs, err := processorsForConfig(c.parent.beatInfo, config, clientCfg) if err != nil { return nil, err } - if lst := clientCfg.Processing.Processor; lst != nil { - if len(userProcessors.All()) == 0 { - userProcessors = lst - } else if orig := lst.All(); len(orig) > 0 { - newLst := processors.NewList(nil) - newLst.List = append(newLst.List, lst, userProcessors) - userProcessors = newLst - } - } - setOptional := func(to common.MapStr, key string, value string) { if value != "" { to.Put(key, value) @@ -105,7 +103,7 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien clientCfg.Processing.EventMetadata = config.EventMetadata clientCfg.Processing.Meta = meta clientCfg.Processing.Fields = fields - clientCfg.Processing.Processor = userProcessors + clientCfg.Processing.Processor = procs clientCfg.Processing.KeepNull = config.KeepNull client, err := c.pipeline.ConnectWith(clientCfg) if err != nil { @@ -118,3 +116,64 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien } return outlet, nil } + +// processorsForConfig assembles the Processors for a pipelineConnector. +func processorsForConfig( + beatInfo beat.Info, config inputOutletConfig, clientCfg beat.ClientConfig, +) (*processors.Processors, error) { + procs := processors.NewList(nil) + + // Processor ordering is important: + // 1. Index configuration + if !config.Index.IsEmpty() { + staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version) + timestampFormat, err := + fmtstr.NewTimestampFormatString(&config.Index, staticFields) + if err != nil { + return nil, err + } + indexProcessor := &addFormattedIndex{timestampFormat} + procs.List = append(procs.List, indexProcessor) + } + + // 2. ClientConfig processors + if lst := clientCfg.Processing.Processor; lst != nil { + procs.List = append(procs.List, lst) + } + + // 3. User processors + userProcessors, err := processors.New(config.Processors) + if err != nil { + return nil, err + } + // Subtlety: it is important here that we append the individual elements of + // userProcessors, rather than userProcessors itself, even though + // userProcessors implements the processors.Processor interface. This is + // because the contents of what we return are later pulled out into a + // processing.group rather than a processors.Processors, and the two have + // different error semantics: processors.Processors aborts processing on + // any error, whereas processing.group only aborts on fatal errors. The + // latter is the most common behavior, and the one we are preserving here for + // backwards compatibility. + // We are unhappy about this and have plans to fix this inconsistency at a + // higher level, but for now we need to respect the existing semantics. + procs.List = append(procs.List, userProcessors.List...) + return procs, nil +} + +func (p *addFormattedIndex) Run(event *beat.Event) (*beat.Event, error) { + index, err := p.formatString.Run(event.Timestamp) + if err != nil { + return nil, err + } + + if event.Meta == nil { + event.Meta = common.MapStr{} + } + event.Meta["raw_index"] = index + return event, nil +} + +func (p *addFormattedIndex) String() string { + return fmt.Sprintf("add_index_pattern=%v", p.formatString) +} diff --git a/filebeat/channel/connector_test.go b/filebeat/channel/connector_test.go new file mode 100644 index 000000000000..4708a7e45a5a --- /dev/null +++ b/filebeat/channel/connector_test.go @@ -0,0 +1,213 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package channel + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/actions" +) + +func TestProcessorsForConfig(t *testing.T) { + testCases := map[string]struct { + beatInfo beat.Info + configStr string + clientCfg beat.ClientConfig + event beat.Event + expectedFields map[string]string + }{ + "Simple static index": { + configStr: "index: 'test'", + expectedFields: map[string]string{ + "@metadata.raw_index": "test", + }, + }, + "Index with agent info + timestamp": { + beatInfo: beat.Info{Beat: "TestBeat", Version: "3.9.27"}, + configStr: "index: 'beat-%{[agent.name]}-%{[agent.version]}-%{+yyyy.MM.dd}'", + event: beat.Event{Timestamp: time.Date(1999, time.December, 31, 23, 0, 0, 0, time.UTC)}, + expectedFields: map[string]string{ + "@metadata.raw_index": "beat-TestBeat-3.9.27-1999.12.31", + }, + }, + "Set index in ClientConfig": { + clientCfg: beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + Processor: makeProcessors(&setRawIndex{"clientCfgIndex"}), + }, + }, + expectedFields: map[string]string{ + "@metadata.raw_index": "clientCfgIndex", + }, + }, + "ClientConfig processor runs after beat input Index": { + configStr: "index: 'test'", + clientCfg: beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + Processor: makeProcessors(&setRawIndex{"clientCfgIndex"}), + }, + }, + expectedFields: map[string]string{ + "@metadata.raw_index": "clientCfgIndex", + }, + }, + "Set field in input config": { + configStr: `processors: [add_fields: {fields: {testField: inputConfig}}]`, + expectedFields: map[string]string{ + "fields.testField": "inputConfig", + }, + }, + "Set field in ClientConfig": { + clientCfg: beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + Processor: makeProcessors(actions.NewAddFields(common.MapStr{ + "fields": common.MapStr{"testField": "clientConfig"}, + }, false)), + }, + }, + expectedFields: map[string]string{ + "fields.testField": "clientConfig", + }, + }, + "Input config processors run after ClientConfig": { + configStr: `processors: [add_fields: {fields: {testField: inputConfig}}]`, + clientCfg: beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + Processor: makeProcessors(actions.NewAddFields(common.MapStr{ + "fields": common.MapStr{"testField": "clientConfig"}, + }, false)), + }, + }, + expectedFields: map[string]string{ + "fields.testField": "inputConfig", + }, + }, + } + for description, test := range testCases { + if test.event.Fields == nil { + test.event.Fields = common.MapStr{} + } + config, err := outletConfigFromString(test.configStr) + if err != nil { + t.Errorf("[%s] %v", description, err) + continue + } + processors, err := processorsForConfig(test.beatInfo, config, test.clientCfg) + if err != nil { + t.Errorf("[%s] %v", description, err) + continue + } + processedEvent, err := processors.Run(&test.event) + // We don't check if err != nil, because we are testing the final outcome + // of running the processors, including when some of them fail. + if processedEvent == nil { + t.Errorf("[%s] Unexpected fatal error running processors: %v\n", + description, err) + } + for key, value := range test.expectedFields { + field, err := processedEvent.GetValue(key) + if err != nil { + t.Errorf("[%s] Couldn't get field %s from event: %v", description, key, err) + continue + } + assert.Equal(t, field, value) + fieldStr, ok := field.(string) + if !ok { + // Note that requiring a string here is just to simplify the test setup, + // not a requirement of the underlying api. + t.Errorf("[%s] Field [%s] should be a string", description, key) + continue + } + if fieldStr != value { + t.Errorf("[%s] Event field [%s]: expected [%s], got [%s]", description, key, value, fieldStr) + } + } + } +} + +func TestProcessorsForConfigIsFlat(t *testing.T) { + // This test is regrettable, and exists because of inconsistencies in + // processor handling between processors.Processors and processing.group + // (which implements beat.ProcessorList) -- see processorsForConfig for + // details. The upshot is that, for now, if the input configuration specifies + // processors, they must be returned as direct children of the resulting + // processors.Processors (rather than being collected in additional tree + // structure). + // This test should be removed once we have a more consistent mechanism for + // collecting and running processors. + configStr := `processors: +- add_fields: {fields: {testField: value}} +- add_fields: {fields: {testField2: stuff}}` + config, err := outletConfigFromString(configStr) + if err != nil { + t.Fatal(err) + } + processors, err := processorsForConfig( + beat.Info{}, config, beat.ClientConfig{}) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, 2, len(processors.List)) +} + +// setRawIndex is a bare-bones processor to set the raw_index field to a +// constant string in the event metadata. It is used to test order of operations +// for processorsForConfig. +type setRawIndex struct { + indexStr string +} + +func (p *setRawIndex) Run(event *beat.Event) (*beat.Event, error) { + if event.Meta == nil { + event.Meta = common.MapStr{} + } + event.Meta["raw_index"] = p.indexStr + return event, nil +} + +func (p *setRawIndex) String() string { + return fmt.Sprintf("set_raw_index=%v", p.indexStr) +} + +// Helper function to convert from YML input string to an unpacked +// inputOutletConfig +func outletConfigFromString(s string) (inputOutletConfig, error) { + config := inputOutletConfig{} + cfg, err := common.NewConfigFrom(s) + if err != nil { + return config, err + } + if err := cfg.Unpack(&config); err != nil { + return config, err + } + return config, nil +} + +// makeProcessors wraps one or more bare Processor objects in Processors. +func makeProcessors(procs ...processors.Processor) *processors.Processors { + procList := processors.NewList(nil) + procList.List = procs + return procList +} diff --git a/filebeat/channel/factory.go b/filebeat/channel/factory.go index 1fe09039bbeb..d4373beec19d 100644 --- a/filebeat/channel/factory.go +++ b/filebeat/channel/factory.go @@ -20,6 +20,7 @@ package channel import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/processors" ) @@ -28,6 +29,7 @@ type OutletFactory struct { eventer beat.ClientEventer wgEvents eventCounter + beatInfo beat.Info } type eventCounter interface { @@ -57,8 +59,8 @@ type inputOutletConfig struct { Fileset string `config:"_fileset_name"` // hidden setting // Output meta data settings - Pipeline string `config:"pipeline"` // ES Ingest pipeline name - + Pipeline string `config:"pipeline"` // ES Ingest pipeline name + Index fmtstr.EventFormatString `config:"index"` // ES output index pattern } // NewOutletFactory creates a new outlet factory for @@ -66,10 +68,12 @@ type inputOutletConfig struct { func NewOutletFactory( done <-chan struct{}, wgEvents eventCounter, + beatInfo beat.Info, ) *OutletFactory { o := &OutletFactory{ done: done, wgEvents: wgEvents, + beatInfo: beatInfo, } if wgEvents != nil { diff --git a/filebeat/docs/inputs/input-common-options.asciidoc b/filebeat/docs/inputs/input-common-options.asciidoc index 6d09ff493b95..8e08a8074e4f 100644 --- a/filebeat/docs/inputs/input-common-options.asciidoc +++ b/filebeat/docs/inputs/input-common-options.asciidoc @@ -64,7 +64,7 @@ If this option is set to true, the custom <<{beatname_lc}-input-{type}-fields,fields>> are stored as top-level fields in the output document instead of being grouped under a `fields` sub-dictionary. If the custom field names conflict with other field names added by {beatname_uc}, -then the custom fields overwrite the other fields. +then the custom fields overwrite the other fields. [float] ===== `processors` @@ -89,3 +89,15 @@ input is used. If this option is set to true, fields with `null` values will be published in the output document. By default, `keep_null` is set to `false`. + +[float] +===== `index` + +If present, this formatted string overrides the index for events from this input +(for elasticsearch outputs), or sets the `raw_index` field of the event's +metadata (for other outputs). This string can only refer to the agent name and +version and the event timestamp; for access to dynamic fields, use +`output.elasticsearch.index` or a processor. + +Example value: `"%{[agent.name]}-myindex-%{+yyyy.MM.dd}"` might +expand to `"filebeat-myindex-2019.11.01"`. diff --git a/filebeat/input/input.go b/filebeat/input/input.go index 98eea51db8a5..e1931d4e4bfa 100644 --- a/filebeat/input/input.go +++ b/filebeat/input/input.go @@ -60,7 +60,7 @@ type Runner struct { // New instantiates a new Runner func New( conf *common.Config, - outlet channel.Connector, + connector channel.Connector, beatDone chan struct{}, states []file.State, dynFields *common.MapStrPointer, @@ -99,7 +99,7 @@ func New( Meta: nil, } var ipt Input - ipt, err = f(conf, outlet, context) + ipt, err = f(conf, connector, context) if err != nil { return input, err } diff --git a/filebeat/kafka.yml b/filebeat/kafka.yml new file mode 100644 index 000000000000..81842758971d --- /dev/null +++ b/filebeat/kafka.yml @@ -0,0 +1,8 @@ +version: '2.3' +services: + kafka: + build: ${ES_BEATS}/testing/environments/docker/kafka + expose: + - 9092 + environment: + - ADVERTISED_HOST=kafka diff --git a/libbeat/common/fmtstr/formattimestamp.go b/libbeat/common/fmtstr/formattimestamp.go new file mode 100644 index 000000000000..68c22d636496 --- /dev/null +++ b/libbeat/common/fmtstr/formattimestamp.go @@ -0,0 +1,80 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package fmtstr + +import ( + "time" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +// TimestampFormatString is a wrapper around EventFormatString for the +// common special case where the format expression should only have access to +// shared static fields (typically agent / version) and the event timestamp. +type TimestampFormatString struct { + eventFormatString *EventFormatString + fields common.MapStr +} + +// NewTimestampFormatString creates from the given event format string a +// TimestampFormatString that includes only the given static fields and +// a timestamp. +func NewTimestampFormatString( + eventFormatString *EventFormatString, staticFields common.MapStr, +) (*TimestampFormatString, error) { + return &TimestampFormatString{ + eventFormatString: eventFormatString, + fields: staticFields.Clone(), + }, nil +} + +// FieldsForBeat returns a common.MapStr with the given beat name and +// version assigned to their standard field names. +func FieldsForBeat(beat string, version string) common.MapStr { + return common.MapStr{ + // beat object was left in for backward compatibility reason for older configs. + "beat": common.MapStr{ + "name": beat, + "version": version, + }, + "agent": common.MapStr{ + "name": beat, + "version": version, + }, + // For the Beats that have an observer role + "observer": common.MapStr{ + "name": beat, + "version": version, + }, + } +} + +// Run executes the format string returning a new expanded string or an error +// if execution or event field expansion fails. +func (fs *TimestampFormatString) Run(timestamp time.Time) (string, error) { + placeholderEvent := &beat.Event{ + Fields: fs.fields, + Timestamp: timestamp, + } + return fs.eventFormatString.Run(placeholderEvent) +} + +func (fs *TimestampFormatString) String() string { + return fs.eventFormatString.expression +} diff --git a/libbeat/common/fmtstr/formattimestamp_test.go b/libbeat/common/fmtstr/formattimestamp_test.go new file mode 100644 index 000000000000..d194f597ad5f --- /dev/null +++ b/libbeat/common/fmtstr/formattimestamp_test.go @@ -0,0 +1,108 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package fmtstr + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" +) + +func TestTimestampFormatString(t *testing.T) { + tests := []struct { + title string + format string + staticFields common.MapStr + timestamp time.Time + expected string + }{ + { + "empty string", + "", + nil, + time.Time{}, + "", + }, + { + "no fields configured", + "format string", + nil, + time.Time{}, + "format string", + }, + { + "expand field", + "%{[key]}", + common.MapStr{"key": "value"}, + time.Time{}, + "value", + }, + { + "expand with default", + "%{[key]:default}", + nil, + time.Time{}, + "default", + }, + { + "expand nested field", + "%{[nested.key]}", + common.MapStr{"nested": common.MapStr{"key": "value"}}, + time.Time{}, + "value", + }, + { + "test timestamp formatter", + "%{[key]}: %{+YYYY.MM.dd}", + common.MapStr{"key": "timestamp"}, + time.Date(2015, 5, 1, 20, 12, 34, 0, time.Local), + "timestamp: 2015.05.01", + }, + { + "test timestamp formatter", + "%{[@timestamp]}: %{+YYYY.MM.dd}", + common.MapStr{"key": "timestamp"}, + time.Date(2015, 5, 1, 20, 12, 34, 0, time.Local), + "2015-05-01T20:12:34.000Z: 2015.05.01", + }, + } + + for i, test := range tests { + t.Logf("test(%v): %v", i, test.title) + + efs, err := CompileEvent(test.format) + if err != nil { + t.Error(err) + continue + } + + fs, err := NewTimestampFormatString(efs, test.staticFields) + if err != nil { + t.Error(err) + continue + } + + actual, err := fs.Run(test.timestamp) + + assert.NoError(t, err) + assert.Equal(t, test.expected, actual) + } +} diff --git a/libbeat/idxmgmt/ilm/ilm.go b/libbeat/idxmgmt/ilm/ilm.go index 07c924214ece..84ca92c432e1 100644 --- a/libbeat/idxmgmt/ilm/ilm.go +++ b/libbeat/idxmgmt/ilm/ilm.go @@ -142,23 +142,9 @@ func NoopSupport(_ *logp.Logger, info beat.Info, config *common.Config) (Support } func applyStaticFmtstr(info beat.Info, fmt *fmtstr.EventFormatString) (string, error) { - return fmt.Run(&beat.Event{ - Fields: common.MapStr{ - // beat object was left in for backward compatibility reason for older configs. - "beat": common.MapStr{ - "name": info.Beat, - "version": info.Version, - }, - "agent": common.MapStr{ - "name": info.Beat, - "version": info.Version, - }, - // For the Beats that have an observer role - "observer": common.MapStr{ - "name": info.Beat, - "version": info.Version, - }, - }, - Timestamp: time.Now(), - }) + return fmt.Run( + &beat.Event{ + Fields: fmtstr.FieldsForBeat(info.Beat, info.Version), + Timestamp: time.Now(), + }) } diff --git a/libbeat/idxmgmt/std.go b/libbeat/idxmgmt/std.go index 13d1a201fcf4..f6951d729add 100644 --- a/libbeat/idxmgmt/std.go +++ b/libbeat/idxmgmt/std.go @@ -54,12 +54,16 @@ type indexManager struct { assets Asseter } -type indexSelector outil.Selector +type indexSelector struct { + sel outil.Selector + beatInfo beat.Info +} type ilmIndexSelector struct { - index outil.Selector - alias outil.Selector - st *indexState + index outil.Selector + alias outil.Selector + st *indexState + beatInfo beat.Info } type componentType uint8 @@ -201,7 +205,7 @@ func (s *indexSupport) BuildSelector(cfg *common.Config) (outputs.IndexSelector, } if mode != ilm.ModeAuto { - return indexSelector(indexSel), nil + return indexSelector{indexSel, s.info}, nil } selCfg.SetString("index", -1, alias) @@ -321,7 +325,7 @@ func (m *indexManager) setupWithILM() (bool, error) { } func (s *ilmIndexSelector) Select(evt *beat.Event) (string, error) { - if idx := getEventCustomIndex(evt); idx != "" { + if idx := getEventCustomIndex(evt, s.beatInfo); idx != "" { return idx, nil } @@ -335,13 +339,13 @@ func (s *ilmIndexSelector) Select(evt *beat.Event) (string, error) { } func (s indexSelector) Select(evt *beat.Event) (string, error) { - if idx := getEventCustomIndex(evt); idx != "" { + if idx := getEventCustomIndex(evt, s.beatInfo); idx != "" { return idx, nil } - return outil.Selector(s).Select(evt) + return s.sel.Select(evt) } -func getEventCustomIndex(evt *beat.Event) string { +func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string { if len(evt.Meta) == 0 { return "" } @@ -360,6 +364,16 @@ func getEventCustomIndex(evt *beat.Event) string { } } + // This is functionally identical to Meta["alias"], returning the overriding + // metadata as the index name if present. It is currently used by Filebeat + // to send the index for particular inputs to formatted string templates, + // which are then expanded by a processor to the "raw_index" field. + if tmp := evt.Meta["raw_index"]; tmp != nil { + if idx, ok := tmp.(string); ok { + return idx + } + } + return "" }