diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index e8ab8e7f82d7..ccbe92355ae4 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -35,3 +35,4 @@ The list below covers the major changes between 7.0.0-beta1 and master only. - Assertion for documented fields in tests fails if any of the fields in the tested event is documented as an alias. {pull}10921[10921] - Support for Logger in the Metricset base instance. {pull}11106[11106] - Introduce processing.Support to instance.Setting. This allows Beats to fully modify the event processing. {pull}10801[10801] +- Filebeat modules can now use ingest pipelines in YAML format. {pull}11209[11209] diff --git a/docs/devguide/modules-dev-guide.asciidoc b/docs/devguide/modules-dev-guide.asciidoc index ddb1932a403e..eb15ae54d185 100644 --- a/docs/devguide/modules-dev-guide.asciidoc +++ b/docs/devguide/modules-dev-guide.asciidoc @@ -308,7 +308,7 @@ The `ingest/` folder contains Elasticsearch Node pipelines are responsible for parsing the log lines and doing other manipulations on the data. -The files in this folder are JSON documents representing +The files in this folder are JSON or YAML documents representing {ref}/pipeline.html[pipeline definitions]. Just like with the `config/` folder, you can define multiple pipelines, but a single one is loaded at runtime based on the information from `manifest.yml`. @@ -330,6 +330,18 @@ The generator creates a JSON object similar to this one: } ---- +Alternatively, you can use YAML formatted pipelines, which uses a simpler syntax: + +[source,yaml] +---- +description: "Pipeline for parsing {module} {fileset} logs" +processors: +on_failure: + - set: + field: error.message + value: "{{ _ingest.on_failure_message }}" +---- + From here, you would typically add processors to the `processors` array to do the actual parsing. For details on how to use ingest node processors, see the {ref}/ingest-processors.html[ingest node documentation]. In @@ -407,7 +419,7 @@ file. The first pipeline in the list is considered to be the entry point pipelin ---- ingest_pipeline: - ingest/main.json - - ingest/plain_logs.json + - ingest/plain_logs.yml - ingest/json_logs.json ---- diff --git a/filebeat/fileset/fileset.go b/filebeat/fileset/fileset.go index e4f50407ac28..d1867b64b8eb 100644 --- a/filebeat/fileset/fileset.go +++ b/filebeat/fileset/fileset.go @@ -35,6 +35,7 @@ import ( "text/template" errw "github.com/pkg/errors" + "gopkg.in/yaml.v2" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" @@ -421,15 +422,28 @@ func (fs *Fileset) GetPipelines(esVersion common.Version) (pipelines []pipeline, return nil, fmt.Errorf("Error reading pipeline file %s: %v", path, err) } - jsonString, err := applyTemplate(vars, string(strContents), true) + encodedString, err := applyTemplate(vars, string(strContents), true) if err != nil { return nil, fmt.Errorf("Error interpreting the template of the ingest pipeline: %v", err) } var content map[string]interface{} - err = json.Unmarshal([]byte(jsonString), &content) - if err != nil { - return nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err) + switch extension := strings.ToLower(filepath.Ext(path)); extension { + case ".json": + if err = json.Unmarshal([]byte(encodedString), &content); err != nil { + return nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err) + } + case ".yaml", ".yml": + if err = yaml.Unmarshal([]byte(encodedString), &content); err != nil { + return nil, fmt.Errorf("Error YAML decoding the pipeline file: %s: %v", path, err) + } + newContent, err := fixYAMLMaps(content) + if err != nil { + return nil, fmt.Errorf("Failed to sanitize the YAML pipeline file: %s: %v", path, err) + } + content = newContent.(map[string]interface{}) + default: + return nil, fmt.Errorf("Unsupported extension '%s' for pipeline file: %s", extension, path) } pipelineID := fs.pipelineIDs[idx] @@ -444,6 +458,40 @@ func (fs *Fileset) GetPipelines(esVersion common.Version) (pipelines []pipeline, return pipelines, nil } +// This function recursively converts maps with interface{} keys, as returned by +// yaml.Unmarshal, to maps of string keys, as expected by the json encoder +// that will be used when delivering the pipeline to Elasticsearch. +// Will return an error when something other than a string is used as a key. +func fixYAMLMaps(elem interface{}) (_ interface{}, err error) { + switch v := elem.(type) { + case map[interface{}]interface{}: + result := make(map[string]interface{}, len(v)) + for key, value := range v { + keyS, ok := key.(string) + if !ok { + return nil, fmt.Errorf("key '%v' is not string but %T", key, key) + } + if result[keyS], err = fixYAMLMaps(value); err != nil { + return nil, err + } + } + return result, nil + case map[string]interface{}: + for key, value := range v { + if v[key], err = fixYAMLMaps(value); err != nil { + return nil, err + } + } + case []interface{}: + for idx, value := range v { + if v[idx], err = fixYAMLMaps(value); err != nil { + return nil, err + } + } + } + return elem, nil +} + // formatPipelineID generates the ID to be used for the pipeline ID in Elasticsearch func formatPipelineID(module, fileset, path, beatVersion string) string { return fmt.Sprintf("filebeat-%s-%s-%s-%s", beatVersion, module, fileset, removeExt(filepath.Base(path)))