Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] allow ingest pipelines in YAML format #11209

Merged
merged 4 commits into from
Mar 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ The list below covers the major changes between 7.0.0-beta1 and master only.
- Assertion for documented fields in tests fails if any of the fields in the tested event is documented as an alias. {pull}10921[10921]
- Support for Logger in the Metricset base instance. {pull}11106[11106]
- Introduce processing.Support to instance.Setting. This allows Beats to fully modify the event processing. {pull}10801[10801]
- Filebeat modules can now use ingest pipelines in YAML format. {pull}11209[11209]
16 changes: 14 additions & 2 deletions docs/devguide/modules-dev-guide.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ The `ingest/` folder contains Elasticsearch
Node pipelines are responsible for parsing the log lines and doing other
manipulations on the data.

The files in this folder are JSON documents representing
The files in this folder are JSON or YAML documents representing
{ref}/pipeline.html[pipeline definitions]. Just like with the `config/`
folder, you can define multiple pipelines, but a single one is loaded at runtime
based on the information from `manifest.yml`.
Expand All @@ -330,6 +330,18 @@ The generator creates a JSON object similar to this one:
}
----

Alternatively, you can use YAML formatted pipelines, which uses a simpler syntax:

[source,yaml]
----
description: "Pipeline for parsing {module} {fileset} logs"
processors:
on_failure:
- set:
field: error.message
value: "{{ _ingest.on_failure_message }}"
----

From here, you would typically add processors to the `processors` array to do
the actual parsing. For details on how to use ingest node processors, see the
{ref}/ingest-processors.html[ingest node documentation]. In
Expand Down Expand Up @@ -407,7 +419,7 @@ file. The first pipeline in the list is considered to be the entry point pipelin
----
ingest_pipeline:
- ingest/main.json
- ingest/plain_logs.json
- ingest/plain_logs.yml
- ingest/json_logs.json
----

Expand Down
56 changes: 52 additions & 4 deletions filebeat/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"text/template"

errw "github.com/pkg/errors"
"gopkg.in/yaml.v2"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
Expand Down Expand Up @@ -421,15 +422,28 @@ func (fs *Fileset) GetPipelines(esVersion common.Version) (pipelines []pipeline,
return nil, fmt.Errorf("Error reading pipeline file %s: %v", path, err)
}

jsonString, err := applyTemplate(vars, string(strContents), true)
encodedString, err := applyTemplate(vars, string(strContents), true)
if err != nil {
return nil, fmt.Errorf("Error interpreting the template of the ingest pipeline: %v", err)
}

var content map[string]interface{}
err = json.Unmarshal([]byte(jsonString), &content)
if err != nil {
return nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err)
switch extension := strings.ToLower(filepath.Ext(path)); extension {
case ".json":
if err = json.Unmarshal([]byte(encodedString), &content); err != nil {
return nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err)
}
case ".yaml", ".yml":
if err = yaml.Unmarshal([]byte(encodedString), &content); err != nil {
return nil, fmt.Errorf("Error YAML decoding the pipeline file: %s: %v", path, err)
}
newContent, err := fixYAMLMaps(content)
if err != nil {
return nil, fmt.Errorf("Failed to sanitize the YAML pipeline file: %s: %v", path, err)
}
content = newContent.(map[string]interface{})
default:
return nil, fmt.Errorf("Unsupported extension '%s' for pipeline file: %s", extension, path)
}

pipelineID := fs.pipelineIDs[idx]
Expand All @@ -444,6 +458,40 @@ func (fs *Fileset) GetPipelines(esVersion common.Version) (pipelines []pipeline,
return pipelines, nil
}

// This function recursively converts maps with interface{} keys, as returned by
// yaml.Unmarshal, to maps of string keys, as expected by the json encoder
// that will be used when delivering the pipeline to Elasticsearch.
// Will return an error when something other than a string is used as a key.
func fixYAMLMaps(elem interface{}) (_ interface{}, err error) {
switch v := elem.(type) {
case map[interface{}]interface{}:
result := make(map[string]interface{}, len(v))
for key, value := range v {
keyS, ok := key.(string)
if !ok {
return nil, fmt.Errorf("key '%v' is not string but %T", key, key)
}
if result[keyS], err = fixYAMLMaps(value); err != nil {
return nil, err
}
}
return result, nil
case map[string]interface{}:
for key, value := range v {
if v[key], err = fixYAMLMaps(value); err != nil {
return nil, err
}
}
case []interface{}:
for idx, value := range v {
if v[idx], err = fixYAMLMaps(value); err != nil {
return nil, err
}
}
}
return elem, nil
}

// formatPipelineID generates the ID to be used for the pipeline ID in Elasticsearch
func formatPipelineID(module, fileset, path, beatVersion string) string {
return fmt.Sprintf("filebeat-%s-%s-%s-%s", beatVersion, module, fileset, removeExt(filepath.Base(path)))
Expand Down