Skip to content

Commit

Permalink
[winlogbeat] Add formatted index setting to event log configs (#15198)
Browse files Browse the repository at this point in the history
  • Loading branch information
faec authored Dec 19, 2019
1 parent 3c0dc41 commit ac5f014
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 6 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ https://github.com/elastic/beats/compare/v7.0.0-beta1...v7.0.0-rc1[Check the HEA

- Add filters and pie chart for AWS EC2 dashboard. {pull}10596[10596]

*Winlogbeat*

- Add an `index` option to all event logs to specify the output index for events from that source. {pull}15062[15062]


==== Known Issue

Expand Down
39 changes: 36 additions & 3 deletions winlogbeat/beater/eventlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/add_formatted_index"

"github.com/elastic/beats/winlogbeat/checkpoint"
"github.com/elastic/beats/winlogbeat/eventlog"
Expand All @@ -38,14 +40,17 @@ type eventLogger struct {
}

type eventLoggerConfig struct {
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
Processors processors.PluginConfig `config:"processors"`
common.EventMetadata `config:",inline"` // Fields and tags to add to events.

Processors processors.PluginConfig `config:"processors"`
Index fmtstr.EventFormatString `config:"index"`

// KeepNull determines whether published events will keep null values or omit them.
KeepNull bool `config:"keep_null"`
}

func newEventLogger(
beatInfo beat.Info,
source eventlog.EventLog,
options *common.Config,
) (*eventLogger, error) {
Expand All @@ -54,7 +59,7 @@ func newEventLogger(
return nil, err
}

processors, err := processors.New(config.Processors)
processors, err := processorsForConfig(beatInfo, config)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -156,3 +161,31 @@ func (e *eventLogger) run(
}
}
}

// processorsForConfig assembles the Processors for an eventLogger.
func processorsForConfig(
beatInfo beat.Info, config eventLoggerConfig,
) (*processors.Processors, error) {
procs := processors.NewList(nil)

// Processor order is important! The index processor, if present, must be
// added before the user processors.
if !config.Index.IsEmpty() {
staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version)
timestampFormat, err :=
fmtstr.NewTimestampFormatString(&config.Index, staticFields)
if err != nil {
return nil, err
}
indexProcessor := add_formatted_index.New(timestampFormat)
procs.AddProcessor(indexProcessor)
}

userProcs, err := processors.New(config.Processors)
if err != nil {
return nil, err
}
procs.AddProcessors(*userProcs)

return procs, nil
}
106 changes: 106 additions & 0 deletions winlogbeat/beater/eventlogger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package beater

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

func TestProcessorsForConfig(t *testing.T) {
testCases := map[string]struct {
beatInfo beat.Info
configStr string
event beat.Event
expectedFields map[string]string
}{
"Simple static index": {
configStr: "index: 'test'",
expectedFields: map[string]string{
"@metadata.raw_index": "test",
},
},
"Index with agent info + timestamp": {
beatInfo: beat.Info{Beat: "TestBeat", Version: "3.9.27"},
configStr: "index: 'beat-%{[agent.name]}-%{[agent.version]}-%{+yyyy.MM.dd}'",
event: beat.Event{Timestamp: time.Date(1999, time.December, 31, 23, 0, 0, 0, time.UTC)},
expectedFields: map[string]string{
"@metadata.raw_index": "beat-TestBeat-3.9.27-1999.12.31",
},
},
}
for description, test := range testCases {
if test.event.Fields == nil {
test.event.Fields = common.MapStr{}
}
config, err := eventLoggerConfigFromString(test.configStr)
if err != nil {
t.Errorf("[%s] %v", description, err)
continue
}
processors, err := processorsForConfig(test.beatInfo, config)
if err != nil {
t.Errorf("[%s] %v", description, err)
continue
}
processedEvent, err := processors.Run(&test.event)
// We don't check if err != nil, because we are testing the final outcome
// of running the processors, including when some of them fail.
if processedEvent == nil {
t.Errorf("[%s] Unexpected fatal error running processors: %v\n",
description, err)
}
for key, value := range test.expectedFields {
field, err := processedEvent.GetValue(key)
if err != nil {
t.Errorf("[%s] Couldn't get field %s from event: %v", description, key, err)
continue
}
assert.Equal(t, field, value)
fieldStr, ok := field.(string)
if !ok {
// Note that requiring a string here is just to simplify the test setup,
// not a requirement of the underlying api.
t.Errorf("[%s] Field [%s] should be a string", description, key)
continue
}
if fieldStr != value {
t.Errorf("[%s] Event field [%s]: expected [%s], got [%s]", description, key, value, fieldStr)
}
}
}
}

// Helper function to convert from YML input string to an unpacked
// eventLoggerConfig
func eventLoggerConfigFromString(s string) (eventLoggerConfig, error) {
config := eventLoggerConfig{}
cfg, err := common.NewConfigFrom(s)
if err != nil {
return config, err
}
if err := cfg.Unpack(&config); err != nil {
return config, err
}
return config, nil
}
2 changes: 1 addition & 1 deletion winlogbeat/beater/winlogbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (eb *Winlogbeat) init(b *beat.Beat) error {
}
debugf("Initialized EventLog[%s]", eventLog.Name())

logger, err := newEventLogger(eventLog, config)
logger, err := newEventLogger(b.Info, eventLog, config)
if err != nil {
return fmt.Errorf("Failed to create new event log. %v", err)
}
Expand Down
12 changes: 12 additions & 0 deletions winlogbeat/docs/winlogbeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,18 @@ A list of processors to apply to the data generated by the event log.
See <<filtering-and-enhancing-data>> for information about specifying
processors in your config.

[float]
==== `event_logs.index`

If present, this formatted string overrides the index for events from this
event log (for elasticsearch outputs), or sets the `raw_index` field of the event's
metadata (for other outputs). This string can only refer to the agent name and
version and the event timestamp; for access to dynamic fields, use
`output.elasticsearch.index` or a processor.

Example value: `"%{[agent.name]}-myindex-%{+yyyy.MM.dd}"` might
expand to `"winlogbeat-myindex-2019.12.13"`.

[float]
==== `event_logs.keep_null`

Expand Down
2 changes: 1 addition & 1 deletion winlogbeat/eventlog/eventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type Record struct {
Offset checkpoint.EventLogState // Position of the record within its source stream.
}

// ToMapStr returns a new MapStr containing the data from this Record.
// ToEvent returns a new beat.Event containing the data from this Record.
func (e Record) ToEvent() beat.Event {
// Windows Log Specific data
win := common.MapStr{
Expand Down
2 changes: 1 addition & 1 deletion winlogbeat/eventlog/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

var commonConfigKeys = []string{"api", "name", "fields", "fields_under_root",
"tags", "processors"}
"tags", "processors", "index"}

// ConfigCommon is the common configuration data used to instantiate a new
// EventLog. Each implementation is free to support additional configuration
Expand Down

0 comments on commit ac5f014

Please sign in to comment.