From 5a81205580370708e2cf631a635d62ec73ee85dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 10 Apr 2018 14:19:46 +0200 Subject: [PATCH 1/8] make --update-pipelines a config option --- filebeat/_meta/common.reference.p2.yml | 6 +++++- filebeat/beater/filebeat.go | 9 ++++----- filebeat/cmd/root.go | 1 - filebeat/config/config.go | 2 ++ filebeat/filebeat.reference.yml | 6 +++++- 5 files changed, 16 insertions(+), 8 deletions(-) diff --git a/filebeat/_meta/common.reference.p2.yml b/filebeat/_meta/common.reference.p2.yml index 94c03723a9c..9b5ef211044 100644 --- a/filebeat/_meta/common.reference.p2.yml +++ b/filebeat/_meta/common.reference.p2.yml @@ -1,4 +1,4 @@ -#=========================== Filebeat inputs ============================= + new#=========================== Filebeat inputs ============================= # List of inputs to fetch data. filebeat.inputs: @@ -286,6 +286,10 @@ filebeat.inputs: # This option is not supported on Windows. #filebeat.registry_file_permissions: 0600 +# By default Ingest pipelines are not updated. If this option is enabled Filebeat updates +# pipelines everytime a new Elasticsearch connection is established. +#filebeat.update_pipelines: false + # These config files must have the full filebeat config part inside, but only # the input part is processed. All global options like spool_size are ignored. # The config_dir MUST point to a different directory then where the main filebeat config file is in. diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 20e253f43a8..6e4f4124d50 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -35,8 +35,7 @@ const pipelinesWarning = "Filebeat is unable to load the Ingest Node pipelines f " can ignore this warning." var ( - once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF") - updatePipelines = flag.Bool("update-pipelines", false, "Update Ingest pipelines") + once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF") ) // Filebeat is a beater object. Contains all objects needed to run the beat @@ -141,7 +140,7 @@ func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error { // register pipeline loading to happen every time a new ES connection is // established callback := func(esClient *elasticsearch.Client) error { - return fb.moduleRegistry.LoadPipelines(esClient, *updatePipelines) + return fb.moduleRegistry.LoadPipelines(esClient, updatePipelines) } elasticsearch.RegisterConnectCallback(callback) @@ -314,11 +313,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error { logp.Warn(pipelinesWarning) } - if *updatePipelines { + if config.UpdatePipelines { logp.Debug("modules", "Existing Ingest pipelines will be updated") } - err = crawler.Start(registrar, config.ConfigInput, config.ConfigModules, pipelineLoaderFactory, *updatePipelines) + err = crawler.Start(registrar, config.ConfigInput, config.ConfigModules, pipelineLoaderFactory, config.UpdatePipelines) if err != nil { crawler.Stop() return err diff --git a/filebeat/cmd/root.go b/filebeat/cmd/root.go index 52cbac1ab61..250f183913c 100644 --- a/filebeat/cmd/root.go +++ b/filebeat/cmd/root.go @@ -20,7 +20,6 @@ func init() { var runFlags = pflag.NewFlagSet(Name, pflag.ExitOnError) runFlags.AddGoFlag(flag.CommandLine.Lookup("once")) runFlags.AddGoFlag(flag.CommandLine.Lookup("modules")) - runFlags.AddGoFlag(flag.CommandLine.Lookup("update-pipelines")) RootCmd = cmd.GenRootCmdWithRunFlags(Name, "", beater.New, runFlags) RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("M")) diff --git a/filebeat/config/config.go b/filebeat/config/config.go index 2ebeb7a8a69..6425ae43bda 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -34,6 +34,7 @@ type Config struct { ConfigProspector *common.Config `config:"config.prospectors"` ConfigModules *common.Config `config:"config.modules"` Autodiscover *autodiscover.Config `config:"autodiscover"` + UpdatePipelines bool `config:"update_pipelines"` } var ( @@ -41,6 +42,7 @@ var ( RegistryFile: "registry", RegistryFilePermissions: 0600, ShutdownTimeout: 0, + UpdatePipelines: false, } ) diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 2b6e86f73ac..e65d9cb0839 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -307,7 +307,7 @@ filebeat.modules: #input: -#=========================== Filebeat inputs ============================= + new#=========================== Filebeat inputs ============================= # List of inputs to fetch data. filebeat.inputs: @@ -595,6 +595,10 @@ filebeat.inputs: # This option is not supported on Windows. #filebeat.registry_file_permissions: 0600 +# By default Ingest pipelines are not updated. If this option is enabled Filebeat updates +# pipelines everytime a new Elasticsearch connection is established. +#filebeat.update_pipelines: false + # These config files must have the full filebeat config part inside, but only # the input part is processed. All global options like spool_size are ignored. # The config_dir MUST point to a different directory then where the main filebeat config file is in. From 3b858d47083fe6c07874e0291f0999029dd2f6b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 10 Apr 2018 14:29:41 +0200 Subject: [PATCH 2/8] move pipeline loading to a separate file --- filebeat/fileset/modules.go | 119 ------------------------------- filebeat/fileset/pipelines.go | 129 ++++++++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+), 119 deletions(-) create mode 100644 filebeat/fileset/pipelines.go diff --git a/filebeat/fileset/modules.go b/filebeat/fileset/modules.go index 666ee0e45eb..212865fa65e 100644 --- a/filebeat/fileset/modules.go +++ b/filebeat/fileset/modules.go @@ -257,44 +257,6 @@ func (reg *ModuleRegistry) GetInputConfigs() ([]*common.Config, error) { return result, nil } -// PipelineLoader factory builds and returns a PipelineLoader -type PipelineLoaderFactory func() (PipelineLoader, error) - -// PipelineLoader is a subset of the Elasticsearch client API capable of loading -// the pipelines. -type PipelineLoader interface { - LoadJSON(path string, json map[string]interface{}) ([]byte, error) - Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error) - GetVersion() string -} - -// LoadPipelines loads the pipelines for each configured fileset. -func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, forceUpdate bool) error { - for module, filesets := range reg.registry { - for name, fileset := range filesets { - // check that all the required Ingest Node plugins are available - requiredProcessors := fileset.GetRequiredProcessors() - logp.Debug("modules", "Required processors: %s", requiredProcessors) - if len(requiredProcessors) > 0 { - err := checkAvailableProcessors(esClient, requiredProcessors) - if err != nil { - return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) - } - } - - pipelineID, content, err := fileset.GetPipeline(esClient.GetVersion()) - if err != nil { - return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err) - } - err = loadPipeline(esClient, pipelineID, content, forceUpdate) - if err != nil { - return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) - } - } - } - return nil -} - // InfoString returns the enabled modules and filesets in a single string, ready to // be shown to the user func (reg *ModuleRegistry) InfoString() string { @@ -374,87 +336,6 @@ func checkAvailableProcessors(esClient PipelineLoader, requiredProcessors []Proc return nil } -func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}, forceUpdate bool) error { - path := "/_ingest/pipeline/" + pipelineID - if !forceUpdate { - status, _, _ := esClient.Request("GET", path, "", nil, nil) - if status == 200 { - logp.Debug("modules", "Pipeline %s already loaded", pipelineID) - return nil - } - } - body, err := esClient.LoadJSON(path, content) - if err != nil { - return interpretError(err, body) - } - logp.Info("Elasticsearch pipeline with ID '%s' loaded", pipelineID) - return nil -} - -func interpretError(initialErr error, body []byte) error { - var response struct { - Error struct { - RootCause []struct { - Type string `json:"type"` - Reason string `json:"reason"` - Header struct { - ProcessorType string `json:"processor_type"` - } `json:"header"` - Index string `json:"index"` - } `json:"root_cause"` - } `json:"error"` - } - err := json.Unmarshal(body, &response) - if err != nil { - // this might be ES < 2.0. Do a best effort to check for ES 1.x - var response1x struct { - Error string `json:"error"` - } - err1x := json.Unmarshal(body, &response1x) - if err1x == nil && response1x.Error != "" { - return fmt.Errorf("The Filebeat modules require Elasticsearch >= 5.0. "+ - "This is the response I got from Elasticsearch: %s", body) - } - - return fmt.Errorf("couldn't load pipeline: %v. Additionally, error decoding response body: %s", - initialErr, body) - } - - // missing plugins? - if len(response.Error.RootCause) > 0 && - response.Error.RootCause[0].Type == "parse_exception" && - strings.HasPrefix(response.Error.RootCause[0].Reason, "No processor type exists with name") && - response.Error.RootCause[0].Header.ProcessorType != "" { - - plugins := map[string]string{ - "geoip": "ingest-geoip", - "user_agent": "ingest-user-agent", - } - plugin, ok := plugins[response.Error.RootCause[0].Header.ProcessorType] - if !ok { - return fmt.Errorf("This module requires an Elasticsearch plugin that provides the %s processor. "+ - "Please visit the Elasticsearch documentation for instructions on how to install this plugin. "+ - "Response body: %s", response.Error.RootCause[0].Header.ProcessorType, body) - } - - return fmt.Errorf("This module requires the %s plugin to be installed in Elasticsearch. "+ - "You can install it using the following command in the Elasticsearch home directory:\n"+ - " sudo bin/elasticsearch-plugin install %s", plugin, plugin) - } - - // older ES version? - if len(response.Error.RootCause) > 0 && - response.Error.RootCause[0].Type == "invalid_index_name_exception" && - response.Error.RootCause[0].Index == "_ingest" { - - return fmt.Errorf("The Ingest Node functionality seems to be missing from Elasticsearch. "+ - "The Filebeat modules require Elasticsearch >= 5.0. "+ - "This is the response I got from Elasticsearch: %s", body) - } - - return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body) -} - // LoadML loads the machine-learning configurations into Elasticsearch, if X-Pack is available func (reg *ModuleRegistry) LoadML(esClient PipelineLoader) error { haveXpack, err := mlimporter.HaveXpackML(esClient) diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go new file mode 100644 index 00000000000..75c351c145d --- /dev/null +++ b/filebeat/fileset/pipelines.go @@ -0,0 +1,129 @@ +package fileset + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/elastic/beats/libbeat/logp" +) + +// PipelineLoader factory builds and returns a PipelineLoader +type PipelineLoaderFactory func() (PipelineLoader, error) + +// PipelineLoader is a subset of the Elasticsearch client API capable of loading +// the pipelines. +type PipelineLoader interface { + LoadJSON(path string, json map[string]interface{}) ([]byte, error) + Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error) + GetVersion() string +} + +// LoadPipelines loads the pipelines for each configured fileset. +func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, forceUpdate bool) error { + fmt.Println("itten") + for module, filesets := range reg.registry { + for name, fileset := range filesets { + // check that all the required Ingest Node plugins are available + requiredProcessors := fileset.GetRequiredProcessors() + logp.Debug("modules", "Required processors: %s", requiredProcessors) + if len(requiredProcessors) > 0 { + err := checkAvailableProcessors(esClient, requiredProcessors) + if err != nil { + return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) + } + } + + pipelineID, content, err := fileset.GetPipeline(esClient.GetVersion()) + if err != nil { + return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err) + } + err = loadPipeline(esClient, pipelineID, content, forceUpdate) + if err != nil { + return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) + } + } + } + return nil +} + +func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}, forceUpdate bool) error { + path := "/_ingest/pipeline/" + pipelineID + if !forceUpdate { + status, _, _ := esClient.Request("GET", path, "", nil, nil) + if status == 200 { + logp.Debug("modules", "Pipeline %s already loaded", pipelineID) + return nil + } + } + body, err := esClient.LoadJSON(path, content) + if err != nil { + return interpretError(err, body) + } + logp.Info("Elasticsearch pipeline with ID '%s' loaded", pipelineID) + return nil +} + +func interpretError(initialErr error, body []byte) error { + var response struct { + Error struct { + RootCause []struct { + Type string `json:"type"` + Reason string `json:"reason"` + Header struct { + ProcessorType string `json:"processor_type"` + } `json:"header"` + Index string `json:"index"` + } `json:"root_cause"` + } `json:"error"` + } + err := json.Unmarshal(body, &response) + if err != nil { + // this might be ES < 2.0. Do a best effort to check for ES 1.x + var response1x struct { + Error string `json:"error"` + } + err1x := json.Unmarshal(body, &response1x) + if err1x == nil && response1x.Error != "" { + return fmt.Errorf("The Filebeat modules require Elasticsearch >= 5.0. "+ + "This is the response I got from Elasticsearch: %s", body) + } + + return fmt.Errorf("couldn't load pipeline: %v. Additionally, error decoding response body: %s", + initialErr, body) + } + + // missing plugins? + if len(response.Error.RootCause) > 0 && + response.Error.RootCause[0].Type == "parse_exception" && + strings.HasPrefix(response.Error.RootCause[0].Reason, "No processor type exists with name") && + response.Error.RootCause[0].Header.ProcessorType != "" { + + plugins := map[string]string{ + "geoip": "ingest-geoip", + "user_agent": "ingest-user-agent", + } + plugin, ok := plugins[response.Error.RootCause[0].Header.ProcessorType] + if !ok { + return fmt.Errorf("This module requires an Elasticsearch plugin that provides the %s processor. "+ + "Please visit the Elasticsearch documentation for instructions on how to install this plugin. "+ + "Response body: %s", response.Error.RootCause[0].Header.ProcessorType, body) + } + + return fmt.Errorf("This module requires the %s plugin to be installed in Elasticsearch. "+ + "You can install it using the following command in the Elasticsearch home directory:\n"+ + " sudo bin/elasticsearch-plugin install %s", plugin, plugin) + } + + // older ES version? + if len(response.Error.RootCause) > 0 && + response.Error.RootCause[0].Type == "invalid_index_name_exception" && + response.Error.RootCause[0].Index == "_ingest" { + + return fmt.Errorf("The Ingest Node functionality seems to be missing from Elasticsearch. "+ + "The Filebeat modules require Elasticsearch >= 5.0. "+ + "This is the response I got from Elasticsearch: %s", body) + } + + return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body) +} From a4a4293884e02bfe1cefb4bc8a1de73529de4db5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 10 Apr 2018 14:36:42 +0200 Subject: [PATCH 3/8] add ingest pipeline loading to setup command --- filebeat/beater/filebeat.go | 29 +++++++++++++++++++++++++++++ libbeat/beat/beat.go | 5 +++++ libbeat/cmd/instance/beat.go | 12 +++++++++++- libbeat/cmd/setup.go | 7 +++++-- 4 files changed, 50 insertions(+), 3 deletions(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 6e4f4124d50..32c4056bda7 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -126,9 +126,33 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { b.SetupMLCallback = func(b *beat.Beat, kibanaConfig *common.Config) error { return fb.loadModulesML(b, kibanaConfig) } + + err = fb.setupPipelineLoaderCallback(b) + if err != nil { + return nil, err + } + return fb, nil } +func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error { + if !fb.moduleRegistry.Empty() { + updatePipelines := fb.config.UpdatePipelines + if b.InSetupCmd { + updatePipelines = true + } + + b.UpdatePipelinesCallback = func(esConfig *common.Config) error { + esClient, err := elasticsearch.NewConnectedClient(esConfig) + if err != nil { + return err + } + return fb.moduleRegistry.LoadPipelines(esClient, updatePipelines) + } + } + return nil +} + // loadModulesPipelines is called when modules are configured to do the initial // setup. func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error { @@ -137,6 +161,11 @@ func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error { return nil } + updatePipelines := fb.config.UpdatePipelines + if b.InSetupCmd { + updatePipelines = true + } + // register pipeline loading to happen every time a new ES connection is // established callback := func(esClient *elasticsearch.Client) error { diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index 60e2783483c..d7df8fa4d7c 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -38,6 +38,7 @@ type Beat struct { SetupMLCallback SetupMLCallback // setup callback for ML job configs InSetupCmd bool // this is set to true when the `setup` command is called + UpdatePipelinesCallback UpdatePipelinesCallback // ingest pipeline loader callback // XXX: remove Config from public interface. // It's currently used by filebeat modules to setup the Ingest Node // pipeline and ML jobs. @@ -55,3 +56,7 @@ type BeatConfig struct { // SetupMLCallback can be used by the Beat to register MachineLearning configurations // for the enabled modules. type SetupMLCallback func(*Beat, *common.Config) error + +// UpdatePipelinesCallback can be used by the Beat to register Ingest pipeline loader +// for the enabled modules. +type UpdatePipelinesCallback func(*common.Config) error diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 7d6a292eaf7..6854d192857 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -341,7 +341,7 @@ func (b *Beat) TestConfig(bt beat.Creator) error { } // Setup registers ES index template and kibana dashboards -func (b *Beat) Setup(bt beat.Creator, template, dashboards, machineLearning bool) error { +func (b *Beat) Setup(bt beat.Creator, template, dashboards, machineLearning, pipelines bool) error { return handleError(func() error { err := b.Init() if err != nil { @@ -404,6 +404,16 @@ func (b *Beat) Setup(bt beat.Creator, template, dashboards, machineLearning bool fmt.Println("Loaded machine learning job configurations") } + if pipelines && b.UpdatePipelinesCallback != nil { + esConfig := b.Config.Output.Config() + err = b.UpdatePipelinesCallback(esConfig) + if err != nil { + return err + } + + fmt.Println("Loaded Ingest pipelines") + } + return nil }()) } diff --git a/libbeat/cmd/setup.go b/libbeat/cmd/setup.go index b1b0306950a..fa04e97b3c7 100644 --- a/libbeat/cmd/setup.go +++ b/libbeat/cmd/setup.go @@ -19,6 +19,7 @@ func genSetupCmd(name, idxPrefix, version string, beatCreator beat.Creator) *cob * Index mapping template in Elasticsearch to ensure fields are mapped. * Kibana dashboards (where available). * ML jobs (where available). + * Ingest pipelines (where available). `, Run: func(cmd *cobra.Command, args []string) { beat, err := instance.NewBeat(name, idxPrefix, version) @@ -30,15 +31,16 @@ func genSetupCmd(name, idxPrefix, version string, beatCreator beat.Creator) *cob template, _ := cmd.Flags().GetBool("template") dashboards, _ := cmd.Flags().GetBool("dashboards") machineLearning, _ := cmd.Flags().GetBool("machine-learning") + pipelines, _ := cmd.Flags().GetBool("pipelines") // No flags: setup all - if !template && !dashboards && !machineLearning { + if !template && !dashboards && !machineLearning && !pipelines { template = true dashboards = true machineLearning = true } - if err = beat.Setup(beatCreator, template, dashboards, machineLearning); err != nil { + if err = beat.Setup(beatCreator, template, dashboards, machineLearning, pipelines); err != nil { os.Exit(1) } }, @@ -47,6 +49,7 @@ func genSetupCmd(name, idxPrefix, version string, beatCreator beat.Creator) *cob setup.Flags().Bool("template", false, "Setup index template only") setup.Flags().Bool("dashboards", false, "Setup dashboards only") setup.Flags().Bool("machine-learning", false, "Setup machine learning job configurations only") + setup.Flags().Bool("pipelines", false, "Setup Ingest pipelines only") return &setup } From 8e5cb64a831536c1daf36a19530f4b10dcb3821f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 10 Apr 2018 15:08:18 +0200 Subject: [PATCH 4/8] feed the hound --- filebeat/fileset/modules.go | 2 +- filebeat/fileset/pipelines.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/filebeat/fileset/modules.go b/filebeat/fileset/modules.go index 212865fa65e..6c2cc897c22 100644 --- a/filebeat/fileset/modules.go +++ b/filebeat/fileset/modules.go @@ -263,7 +263,7 @@ func (reg *ModuleRegistry) InfoString() string { var result string for module, filesets := range reg.registry { var filesetNames string - for name, _ := range filesets { + for name := range filesets { if filesetNames != "" { filesetNames += ", " } diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index 75c351c145d..20e18e6a4e7 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -8,7 +8,7 @@ import ( "github.com/elastic/beats/libbeat/logp" ) -// PipelineLoader factory builds and returns a PipelineLoader +// PipelineLoaderFactory builds and returns a PipelineLoader type PipelineLoaderFactory func() (PipelineLoader, error) // PipelineLoader is a subset of the Elasticsearch client API capable of loading @@ -21,7 +21,6 @@ type PipelineLoader interface { // LoadPipelines loads the pipelines for each configured fileset. func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, forceUpdate bool) error { - fmt.Println("itten") for module, filesets := range reg.registry { for name, fileset := range filesets { // check that all the required Ingest Node plugins are available From af1c3ec7a5a15538063ae4fbedad7061e58ad97b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 10 Apr 2018 15:21:23 +0200 Subject: [PATCH 5/8] remove config error --- filebeat/_meta/common.reference.p2.yml | 2 +- filebeat/filebeat.reference.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/filebeat/_meta/common.reference.p2.yml b/filebeat/_meta/common.reference.p2.yml index 9b5ef211044..81e6dcc6698 100644 --- a/filebeat/_meta/common.reference.p2.yml +++ b/filebeat/_meta/common.reference.p2.yml @@ -1,4 +1,4 @@ - new#=========================== Filebeat inputs ============================= +#=========================== Filebeat inputs ============================= # List of inputs to fetch data. filebeat.inputs: diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index e65d9cb0839..ef7bc8a713a 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -307,7 +307,7 @@ filebeat.modules: #input: - new#=========================== Filebeat inputs ============================= +#=========================== Filebeat inputs ============================= # List of inputs to fetch data. filebeat.inputs: From 42c30e23374e51d9c758a696991bc27bfb363f77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 10 Apr 2018 15:24:56 +0200 Subject: [PATCH 6/8] add changelog entry --- CHANGELOG.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 1826feff7d7..a398ef0d3c6 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -117,6 +117,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Add json.ignore_decoding_error config to not log json decoding erors. {issue}6547[6547] - Make registry file permission configurable. {pull}6455[6455] - Add MongoDB module. {pull}6283[6238] +- Add Ingest pipeline loading to setup. {pull}6814[6814] *Heartbeat* From f63fb10328f305c2b676b9d837310e7990d80e9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 10 Apr 2018 15:45:46 +0200 Subject: [PATCH 7/8] add more context to config help --- filebeat/_meta/common.reference.p2.yml | 5 +++-- filebeat/filebeat.reference.yml | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/filebeat/_meta/common.reference.p2.yml b/filebeat/_meta/common.reference.p2.yml index 81e6dcc6698..1a07ebcc0b5 100644 --- a/filebeat/_meta/common.reference.p2.yml +++ b/filebeat/_meta/common.reference.p2.yml @@ -286,8 +286,9 @@ filebeat.inputs: # This option is not supported on Windows. #filebeat.registry_file_permissions: 0600 -# By default Ingest pipelines are not updated. If this option is enabled Filebeat updates -# pipelines everytime a new Elasticsearch connection is established. +# By default Ingest pipelines are not updated if a pipeline with the same ID +# already exists. If this option is enabled Filebeat updates pipelines everytime +# a new Elasticsearch connection is established. #filebeat.update_pipelines: false # These config files must have the full filebeat config part inside, but only diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index ef7bc8a713a..3c97ac45ae0 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -595,8 +595,9 @@ filebeat.inputs: # This option is not supported on Windows. #filebeat.registry_file_permissions: 0600 -# By default Ingest pipelines are not updated. If this option is enabled Filebeat updates -# pipelines everytime a new Elasticsearch connection is established. +# By default Ingest pipelines are not updated if a pipeline with the same ID +# already exists. If this option is enabled Filebeat updates pipelines everytime +# a new Elasticsearch connection is established. #filebeat.update_pipelines: false # These config files must have the full filebeat config part inside, but only From 6a151b5a519b4b093993d1b15f5e5541b7bec435 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 10 Apr 2018 16:43:40 +0200 Subject: [PATCH 8/8] rename update to overwrite both in config and code --- filebeat/_meta/common.reference.p2.yml | 6 +++--- filebeat/beater/filebeat.go | 18 +++++++++--------- filebeat/config/config.go | 4 ++-- filebeat/crawler/crawler.go | 4 ++-- filebeat/filebeat.reference.yml | 6 +++--- filebeat/fileset/factory.go | 14 +++++++------- filebeat/fileset/pipelines.go | 8 ++++---- libbeat/beat/beat.go | 6 +++--- libbeat/cmd/instance/beat.go | 4 ++-- 9 files changed, 35 insertions(+), 35 deletions(-) diff --git a/filebeat/_meta/common.reference.p2.yml b/filebeat/_meta/common.reference.p2.yml index 1a07ebcc0b5..16f30c5f820 100644 --- a/filebeat/_meta/common.reference.p2.yml +++ b/filebeat/_meta/common.reference.p2.yml @@ -287,9 +287,9 @@ filebeat.inputs: #filebeat.registry_file_permissions: 0600 # By default Ingest pipelines are not updated if a pipeline with the same ID -# already exists. If this option is enabled Filebeat updates pipelines everytime -# a new Elasticsearch connection is established. -#filebeat.update_pipelines: false +# already exists. If this option is enabled Filebeat overwrites pipelines +# everytime a new Elasticsearch connection is established. +#filebeat.overwrite_pipelines: false # These config files must have the full filebeat config part inside, but only # the input part is processed. All global options like spool_size are ignored. diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 32c4056bda7..39596849c4b 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -137,17 +137,17 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error { if !fb.moduleRegistry.Empty() { - updatePipelines := fb.config.UpdatePipelines + overwritePipelines := fb.config.OverwritePipelines if b.InSetupCmd { - updatePipelines = true + overwritePipelines = true } - b.UpdatePipelinesCallback = func(esConfig *common.Config) error { + b.OverwritePipelinesCallback = func(esConfig *common.Config) error { esClient, err := elasticsearch.NewConnectedClient(esConfig) if err != nil { return err } - return fb.moduleRegistry.LoadPipelines(esClient, updatePipelines) + return fb.moduleRegistry.LoadPipelines(esClient, overwritePipelines) } } return nil @@ -161,15 +161,15 @@ func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error { return nil } - updatePipelines := fb.config.UpdatePipelines + overwritePipelines := fb.config.OverwritePipelines if b.InSetupCmd { - updatePipelines = true + overwritePipelines = true } // register pipeline loading to happen every time a new ES connection is // established callback := func(esClient *elasticsearch.Client) error { - return fb.moduleRegistry.LoadPipelines(esClient, updatePipelines) + return fb.moduleRegistry.LoadPipelines(esClient, overwritePipelines) } elasticsearch.RegisterConnectCallback(callback) @@ -342,11 +342,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error { logp.Warn(pipelinesWarning) } - if config.UpdatePipelines { + if config.OverwritePipelines { logp.Debug("modules", "Existing Ingest pipelines will be updated") } - err = crawler.Start(registrar, config.ConfigInput, config.ConfigModules, pipelineLoaderFactory, config.UpdatePipelines) + err = crawler.Start(registrar, config.ConfigInput, config.ConfigModules, pipelineLoaderFactory, config.OverwritePipelines) if err != nil { crawler.Stop() return err diff --git a/filebeat/config/config.go b/filebeat/config/config.go index 6425ae43bda..2a1aa3baf27 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -34,7 +34,7 @@ type Config struct { ConfigProspector *common.Config `config:"config.prospectors"` ConfigModules *common.Config `config:"config.modules"` Autodiscover *autodiscover.Config `config:"autodiscover"` - UpdatePipelines bool `config:"update_pipelines"` + OverwritePipelines bool `config:"overwrite_pipelines"` } var ( @@ -42,7 +42,7 @@ var ( RegistryFile: "registry", RegistryFilePermissions: 0600, ShutdownTimeout: 0, - UpdatePipelines: false, + OverwritePipelines: false, } ) diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index 183a8dd688b..a1f92005625 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -43,7 +43,7 @@ func New(out channel.Factory, inputConfigs []*common.Config, beatVersion string, // Start starts the crawler with all inputs func (c *Crawler) Start(r *registrar.Registrar, configInputs *common.Config, - configModules *common.Config, pipelineLoaderFactory fileset.PipelineLoaderFactory, updatePipelines bool) error { + configModules *common.Config, pipelineLoaderFactory fileset.PipelineLoaderFactory, overwritePipelines bool) error { logp.Info("Loading Inputs: %v", len(c.inputConfigs)) @@ -67,7 +67,7 @@ func (c *Crawler) Start(r *registrar.Registrar, configInputs *common.Config, }() } - c.ModulesFactory = fileset.NewFactory(c.out, r, c.beatVersion, pipelineLoaderFactory, updatePipelines, c.beatDone) + c.ModulesFactory = fileset.NewFactory(c.out, r, c.beatVersion, pipelineLoaderFactory, overwritePipelines, c.beatDone) if configModules.Enabled() { c.modulesReloader = cfgfile.NewReloader(configModules) if err := c.modulesReloader.Check(c.ModulesFactory); err != nil { diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 3c97ac45ae0..1ade1cf2d65 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -596,9 +596,9 @@ filebeat.inputs: #filebeat.registry_file_permissions: 0600 # By default Ingest pipelines are not updated if a pipeline with the same ID -# already exists. If this option is enabled Filebeat updates pipelines everytime -# a new Elasticsearch connection is established. -#filebeat.update_pipelines: false +# already exists. If this option is enabled Filebeat overwrites pipelines +# everytime a new Elasticsearch connection is established. +#filebeat.overwrite_pipelines: false # These config files must have the full filebeat config part inside, but only # the input part is processed. All global options like spool_size are ignored. diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go index 0c8fe39e3c3..473877fbf4c 100644 --- a/filebeat/fileset/factory.go +++ b/filebeat/fileset/factory.go @@ -18,7 +18,7 @@ type Factory struct { registrar *registrar.Registrar beatVersion string pipelineLoaderFactory PipelineLoaderFactory - updatePipelines bool + overwritePipelines bool beatDone chan struct{} } @@ -28,19 +28,19 @@ type inputsRunner struct { moduleRegistry *ModuleRegistry inputs []*input.Runner pipelineLoaderFactory PipelineLoaderFactory - updatePipelines bool + overwritePipelines bool } // NewFactory instantiates a new Factory func NewFactory(outlet channel.Factory, registrar *registrar.Registrar, beatVersion string, - pipelineLoaderFactory PipelineLoaderFactory, updatePipelines bool, beatDone chan struct{}) *Factory { + pipelineLoaderFactory PipelineLoaderFactory, overwritePipelines bool, beatDone chan struct{}) *Factory { return &Factory{ outlet: outlet, registrar: registrar, beatVersion: beatVersion, beatDone: beatDone, pipelineLoaderFactory: pipelineLoaderFactory, - updatePipelines: updatePipelines, + overwritePipelines: overwritePipelines, } } @@ -79,7 +79,7 @@ func (f *Factory) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile. moduleRegistry: m, inputs: inputs, pipelineLoaderFactory: f.pipelineLoaderFactory, - updatePipelines: f.updatePipelines, + overwritePipelines: f.overwritePipelines, }, nil } @@ -91,7 +91,7 @@ func (p *inputsRunner) Start() { if err != nil { logp.Err("Error loading pipeline: %s", err) } else { - err := p.moduleRegistry.LoadPipelines(pipelineLoader, p.updatePipelines) + err := p.moduleRegistry.LoadPipelines(pipelineLoader, p.overwritePipelines) if err != nil { // Log error and continue logp.Err("Error loading pipeline: %s", err) @@ -100,7 +100,7 @@ func (p *inputsRunner) Start() { // Callback: callback := func(esClient *elasticsearch.Client) error { - return p.moduleRegistry.LoadPipelines(esClient, p.updatePipelines) + return p.moduleRegistry.LoadPipelines(esClient, p.overwritePipelines) } elasticsearch.RegisterConnectCallback(callback) } diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index 20e18e6a4e7..e56d6824e7d 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -20,7 +20,7 @@ type PipelineLoader interface { } // LoadPipelines loads the pipelines for each configured fileset. -func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, forceUpdate bool) error { +func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool) error { for module, filesets := range reg.registry { for name, fileset := range filesets { // check that all the required Ingest Node plugins are available @@ -37,7 +37,7 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, forceUpdate bo if err != nil { return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err) } - err = loadPipeline(esClient, pipelineID, content, forceUpdate) + err = loadPipeline(esClient, pipelineID, content, overwrite) if err != nil { return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) } @@ -46,9 +46,9 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, forceUpdate bo return nil } -func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}, forceUpdate bool) error { +func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}, overwrite bool) error { path := "/_ingest/pipeline/" + pipelineID - if !forceUpdate { + if !overwrite { status, _, _ := esClient.Request("GET", path, "", nil, nil) if status == 200 { logp.Debug("modules", "Pipeline %s already loaded", pipelineID) diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index d7df8fa4d7c..f30f474818e 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -38,7 +38,7 @@ type Beat struct { SetupMLCallback SetupMLCallback // setup callback for ML job configs InSetupCmd bool // this is set to true when the `setup` command is called - UpdatePipelinesCallback UpdatePipelinesCallback // ingest pipeline loader callback + OverwritePipelinesCallback OverwritePipelinesCallback // ingest pipeline loader callback // XXX: remove Config from public interface. // It's currently used by filebeat modules to setup the Ingest Node // pipeline and ML jobs. @@ -57,6 +57,6 @@ type BeatConfig struct { // for the enabled modules. type SetupMLCallback func(*Beat, *common.Config) error -// UpdatePipelinesCallback can be used by the Beat to register Ingest pipeline loader +// OverwritePipelinesCallback can be used by the Beat to register Ingest pipeline loader // for the enabled modules. -type UpdatePipelinesCallback func(*common.Config) error +type OverwritePipelinesCallback func(*common.Config) error diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 6854d192857..e908054ae0a 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -404,9 +404,9 @@ func (b *Beat) Setup(bt beat.Creator, template, dashboards, machineLearning, pip fmt.Println("Loaded machine learning job configurations") } - if pipelines && b.UpdatePipelinesCallback != nil { + if pipelines && b.OverwritePipelinesCallback != nil { esConfig := b.Config.Output.Config() - err = b.UpdatePipelinesCallback(esConfig) + err = b.OverwritePipelinesCallback(esConfig) if err != nil { return err }