Skip to content

Commit

Permalink
Journalbeat: add index option to input (#15071)
Browse files Browse the repository at this point in the history
* Refactoring: renaming var to avoid collision

* Refactoring: extract function

* Refactoring: moving AddFormattedIndex processor into libbeat

* Add constructor for addFormattedIndex processor

* Export processor struct

* Add comment for exported constructor

* Adding comment for exported method

* Adding index option to doc

* Refactoring: extracting common code

* Adding unit tests

* Adding godoc for exported methods
  • Loading branch information
ycombinator authored Dec 13, 2019
1 parent 6a22ee8 commit fb4dd37
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 50 deletions.
48 changes: 6 additions & 42 deletions filebeat/channel/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
package channel

import (
"fmt"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/add_formatted_index"
)

// ConnectorFunc is an adapter for using ordinary functions as Connector.
Expand All @@ -34,14 +33,6 @@ type pipelineConnector struct {
pipeline beat.Pipeline
}

// addFormattedIndex is a Processor to set an event's "raw_index" metadata field
// with a given TimestampFormatString. The elasticsearch output interprets
// that field as specifying the (raw string) index the event should be sent to;
// in other outputs it is just included in the metadata.
type addFormattedIndex struct {
formatString *fmtstr.TimestampFormatString
}

// Connect passes the cfg and the zero value of beat.ClientConfig to the underlying function.
func (fn ConnectorFunc) Connect(cfg *common.Config) (Outleter, error) {
return fn(cfg, beat.ClientConfig{})
Expand Down Expand Up @@ -132,48 +123,21 @@ func processorsForConfig(
if err != nil {
return nil, err
}
indexProcessor := &addFormattedIndex{timestampFormat}
procs.List = append(procs.List, indexProcessor)
indexProcessor := add_formatted_index.New(timestampFormat)
procs.AddProcessor(indexProcessor)
}

// 2. ClientConfig processors
if lst := clientCfg.Processing.Processor; lst != nil {
procs.List = append(procs.List, lst)
procs.AddProcessor(lst)
}

// 3. User processors
userProcessors, err := processors.New(config.Processors)
if err != nil {
return nil, err
}
// Subtlety: it is important here that we append the individual elements of
// userProcessors, rather than userProcessors itself, even though
// userProcessors implements the processors.Processor interface. This is
// because the contents of what we return are later pulled out into a
// processing.group rather than a processors.Processors, and the two have
// different error semantics: processors.Processors aborts processing on
// any error, whereas processing.group only aborts on fatal errors. The
// latter is the most common behavior, and the one we are preserving here for
// backwards compatibility.
// We are unhappy about this and have plans to fix this inconsistency at a
// higher level, but for now we need to respect the existing semantics.
procs.List = append(procs.List, userProcessors.List...)
return procs, nil
}
procs.AddProcessors(*userProcessors)

func (p *addFormattedIndex) Run(event *beat.Event) (*beat.Event, error) {
index, err := p.formatString.Run(event.Timestamp)
if err != nil {
return nil, err
}

if event.Meta == nil {
event.Meta = common.MapStr{}
}
event.Meta["raw_index"] = index
return event, nil
}

func (p *addFormattedIndex) String() string {
return fmt.Sprintf("add_index_pattern=%v", p.formatString)
return procs, nil
}
2 changes: 1 addition & 1 deletion journalbeat/beater/journalbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {

var inputs []*input.Input
for _, c := range config.Inputs {
i, err := input.New(c, b.Publisher, done, cp.States())
i, err := input.New(c, b, done, cp.States())
if err != nil {
return nil, err
}
Expand Down
13 changes: 13 additions & 0 deletions journalbeat/docs/config-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,16 @@ available:
`CONTAINER_NAME`:: `container.name`
`CONTAINER_PARTIAL_MESSAGE`:: `container.partial`
`CONTAINER_TAG`:: `container.image.tag`

[float]
[id="index"]
==== `index`

If present, this formatted string overrides the index for events from this input
(for elasticsearch outputs), or sets the `raw_index` field of the event's
metadata (for other outputs). This string can only refer to the agent name and
version and the event timestamp; for access to dynamic fields, use
`output.elasticsearch.index` or a processor.

Example value: `"%{[agent.name]}-myindex-%{+yyyy.MM.dd}"` might
expand to `"journalbeat-myindex-2019.12.13"`.
3 changes: 3 additions & 0 deletions journalbeat/input/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/elastic/beats/journalbeat/config"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/processors"
)

Expand All @@ -47,6 +48,8 @@ type Config struct {
common.EventMetadata `config:",inline"`
// Processors to run on events.
Processors processors.PluginConfig `config:"processors"`
// ES output index pattern
Index fmtstr.EventFormatString `config:"index"`
}

var (
Expand Down
38 changes: 34 additions & 4 deletions journalbeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"fmt"
"sync"

"github.com/elastic/beats/libbeat/processors/add_formatted_index"

"github.com/elastic/beats/libbeat/common/fmtstr"

"github.com/gofrs/uuid"

"github.com/elastic/beats/journalbeat/checkpoint"
Expand Down Expand Up @@ -48,7 +52,7 @@ type Input struct {
// New returns a new Inout
func New(
c *common.Config,
pipeline beat.Pipeline,
b *beat.Beat,
done chan struct{},
states map[string]checkpoint.JournalState,
) (*Input, error) {
Expand Down Expand Up @@ -102,7 +106,7 @@ func New(
readers = append(readers, r)
}

processors, err := processors.New(config.Processors)
inputProcessors, err := processorsForInput(b.Info, config)
if err != nil {
return nil, err
}
Expand All @@ -113,12 +117,12 @@ func New(
readers: readers,
done: done,
config: config,
pipeline: pipeline,
pipeline: b.Publisher,
states: states,
id: id,
logger: logger,
eventMeta: config.EventMetadata,
processors: processors,
processors: inputProcessors,
}, nil
}

Expand Down Expand Up @@ -203,3 +207,29 @@ func (i *Input) Stop() {
func (i *Input) Wait() {
i.Stop()
}

func processorsForInput(beatInfo beat.Info, config Config) (*processors.Processors, error) {
procs := processors.NewList(nil)

// Processor ordering is important:
// 1. Index configuration
if !config.Index.IsEmpty() {
staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version)
timestampFormat, err :=
fmtstr.NewTimestampFormatString(&config.Index, staticFields)
if err != nil {
return nil, err
}
indexProcessor := add_formatted_index.New(timestampFormat)
procs.AddProcessor(indexProcessor)
}

// 2. User processors
userProcessors, err := processors.New(config.Processors)
if err != nil {
return nil, err
}
procs.AddProcessors(*userProcessors)

return procs, nil
}
164 changes: 164 additions & 0 deletions journalbeat/input/input_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package input

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
_ "github.com/elastic/beats/libbeat/processors/actions"
)

func TestProcessorsForInput(t *testing.T) {
testCases := map[string]struct {
beatInfo beat.Info
configStr string
event beat.Event
expectedFields map[string]string
}{
"Simple static index": {
configStr: "index: 'test'",
expectedFields: map[string]string{
"@metadata.raw_index": "test",
},
},
"Index with agent info + timestamp": {
beatInfo: beat.Info{Beat: "TestBeat", Version: "3.9.27"},
configStr: "index: 'beat-%{[agent.name]}-%{[agent.version]}-%{+yyyy.MM.dd}'",
event: beat.Event{Timestamp: time.Date(1999, time.December, 31, 23, 0, 0, 0, time.UTC)},
expectedFields: map[string]string{
"@metadata.raw_index": "beat-TestBeat-3.9.27-1999.12.31",
},
},
"Set field in input config": {
configStr: `processors: [add_fields: {fields: {testField: inputConfig}}]`,
expectedFields: map[string]string{
"fields.testField": "inputConfig",
},
},
}
for description, test := range testCases {
if test.event.Fields == nil {
test.event.Fields = common.MapStr{}
}
config, err := inputConfigFromString(test.configStr)
if err != nil {
t.Errorf("[%s] %v", description, err)
continue
}
processors, err := processorsForInput(test.beatInfo, config)
if err != nil {
t.Errorf("[%s] %v", description, err)
continue
}
processedEvent, err := processors.Run(&test.event)
// We don't check if err != nil, because we are testing the final outcome
// of running the processors, including when some of them fail.
if processedEvent == nil {
t.Errorf("[%s] Unexpected fatal error running processors: %v\n",
description, err)
}
for key, value := range test.expectedFields {
field, err := processedEvent.GetValue(key)
if err != nil {
t.Errorf("[%s] Couldn't get field %s from event: %v", description, key, err)
continue
}
assert.Equal(t, field, value)
fieldStr, ok := field.(string)
if !ok {
// Note that requiring a string here is just to simplify the test setup,
// not a requirement of the underlying api.
t.Errorf("[%s] Field [%s] should be a string", description, key)
continue
}
if fieldStr != value {
t.Errorf("[%s] Event field [%s]: expected [%s], got [%s]", description, key, value, fieldStr)
}
}
}
}

func TestProcessorsForInputIsFlat(t *testing.T) {
// This test is regrettable, and exists because of inconsistencies in
// processor handling between processors.Processors and processing.group
// (which implements beat.ProcessorList) -- see processorsForConfig for
// details. The upshot is that, for now, if the input configuration specifies
// processors, they must be returned as direct children of the resulting
// processors.Processors (rather than being collected in additional tree
// structure).
// This test should be removed once we have a more consistent mechanism for
// collecting and running processors.
configStr := `processors:
- add_fields: {fields: {testField: value}}
- add_fields: {fields: {testField2: stuff}}`
config, err := inputConfigFromString(configStr)
if err != nil {
t.Fatal(err)
}
processors, err := processorsForInput(
beat.Info{}, config)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 2, len(processors.List))
}

// setRawIndex is a bare-bones processor to set the raw_index field to a
// constant string in the event metadata. It is used to test order of operations
// for processorsForConfig.
type setRawIndex struct {
indexStr string
}

func (p *setRawIndex) Run(event *beat.Event) (*beat.Event, error) {
if event.Meta == nil {
event.Meta = common.MapStr{}
}
event.Meta["raw_index"] = p.indexStr
return event, nil
}

func (p *setRawIndex) String() string {
return fmt.Sprintf("set_raw_index=%v", p.indexStr)
}

// Helper function to convert from YML input string to an unpacked
// Config
func inputConfigFromString(s string) (Config, error) {
config := Config{}
cfg, err := common.NewConfigFrom(s)
if err != nil {
return config, err
}
err = cfg.Unpack(&config)
return config, err
}

// makeProcessors wraps one or more bare Processor objects in Processors.
func makeProcessors(procs ...processors.Processor) *processors.Processors {
procList := processors.NewList(nil)
procList.List = procs
return procList
}
Loading

0 comments on commit fb4dd37

Please sign in to comment.