Skip to content

Commit

Permalink
[Filebeat] allow ingest pipelines in YAML format (#11209)
Browse files Browse the repository at this point in the history
This updates the ingest pipeline loading to look at the
pipeline definition file extension and use a JSON or YAML
decoder depending on the type.
  • Loading branch information
adriansr authored Mar 13, 2019
1 parent 822b95a commit a93a09e
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ The list below covers the major changes between 7.0.0-beta1 and master only.
- Assertion for documented fields in tests fails if any of the fields in the tested event is documented as an alias. {pull}10921[10921]
- Support for Logger in the Metricset base instance. {pull}11106[11106]
- Introduce processing.Support to instance.Setting. This allows Beats to fully modify the event processing. {pull}10801[10801]
- Filebeat modules can now use ingest pipelines in YAML format. {pull}11209[11209]
16 changes: 14 additions & 2 deletions docs/devguide/modules-dev-guide.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ The `ingest/` folder contains Elasticsearch
Node pipelines are responsible for parsing the log lines and doing other
manipulations on the data.

The files in this folder are JSON documents representing
The files in this folder are JSON or YAML documents representing
{ref}/pipeline.html[pipeline definitions]. Just like with the `config/`
folder, you can define multiple pipelines, but a single one is loaded at runtime
based on the information from `manifest.yml`.
Expand All @@ -330,6 +330,18 @@ The generator creates a JSON object similar to this one:
}
----

Alternatively, you can use YAML formatted pipelines, which uses a simpler syntax:

[source,yaml]
----
description: "Pipeline for parsing {module} {fileset} logs"
processors:
on_failure:
- set:
field: error.message
value: "{{ _ingest.on_failure_message }}"
----

From here, you would typically add processors to the `processors` array to do
the actual parsing. For details on how to use ingest node processors, see the
{ref}/ingest-processors.html[ingest node documentation]. In
Expand Down Expand Up @@ -407,7 +419,7 @@ file. The first pipeline in the list is considered to be the entry point pipelin
----
ingest_pipeline:
- ingest/main.json
- ingest/plain_logs.json
- ingest/plain_logs.yml
- ingest/json_logs.json
----

Expand Down
56 changes: 52 additions & 4 deletions filebeat/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"text/template"

errw "github.com/pkg/errors"
"gopkg.in/yaml.v2"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
Expand Down Expand Up @@ -421,15 +422,28 @@ func (fs *Fileset) GetPipelines(esVersion common.Version) (pipelines []pipeline,
return nil, fmt.Errorf("Error reading pipeline file %s: %v", path, err)
}

jsonString, err := applyTemplate(vars, string(strContents), true)
encodedString, err := applyTemplate(vars, string(strContents), true)
if err != nil {
return nil, fmt.Errorf("Error interpreting the template of the ingest pipeline: %v", err)
}

var content map[string]interface{}
err = json.Unmarshal([]byte(jsonString), &content)
if err != nil {
return nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err)
switch extension := strings.ToLower(filepath.Ext(path)); extension {
case ".json":
if err = json.Unmarshal([]byte(encodedString), &content); err != nil {
return nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err)
}
case ".yaml", ".yml":
if err = yaml.Unmarshal([]byte(encodedString), &content); err != nil {
return nil, fmt.Errorf("Error YAML decoding the pipeline file: %s: %v", path, err)
}
newContent, err := fixYAMLMaps(content)
if err != nil {
return nil, fmt.Errorf("Failed to sanitize the YAML pipeline file: %s: %v", path, err)
}
content = newContent.(map[string]interface{})
default:
return nil, fmt.Errorf("Unsupported extension '%s' for pipeline file: %s", extension, path)
}

pipelineID := fs.pipelineIDs[idx]
Expand All @@ -444,6 +458,40 @@ func (fs *Fileset) GetPipelines(esVersion common.Version) (pipelines []pipeline,
return pipelines, nil
}

// This function recursively converts maps with interface{} keys, as returned by
// yaml.Unmarshal, to maps of string keys, as expected by the json encoder
// that will be used when delivering the pipeline to Elasticsearch.
// Will return an error when something other than a string is used as a key.
func fixYAMLMaps(elem interface{}) (_ interface{}, err error) {
switch v := elem.(type) {
case map[interface{}]interface{}:
result := make(map[string]interface{}, len(v))
for key, value := range v {
keyS, ok := key.(string)
if !ok {
return nil, fmt.Errorf("key '%v' is not string but %T", key, key)
}
if result[keyS], err = fixYAMLMaps(value); err != nil {
return nil, err
}
}
return result, nil
case map[string]interface{}:
for key, value := range v {
if v[key], err = fixYAMLMaps(value); err != nil {
return nil, err
}
}
case []interface{}:
for idx, value := range v {
if v[idx], err = fixYAMLMaps(value); err != nil {
return nil, err
}
}
}
return elem, nil
}

// formatPipelineID generates the ID to be used for the pipeline ID in Elasticsearch
func formatPipelineID(module, fileset, path, beatVersion string) string {
return fmt.Sprintf("filebeat-%s-%s-%s-%s", beatVersion, module, fileset, removeExt(filepath.Base(path)))
Expand Down

0 comments on commit a93a09e

Please sign in to comment.