diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 1826feff7d72..a398ef0d3c6a 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -117,6 +117,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Add json.ignore_decoding_error config to not log json decoding erors. {issue}6547[6547] - Make registry file permission configurable. {pull}6455[6455] - Add MongoDB module. {pull}6283[6238] +- Add Ingest pipeline loading to setup. {pull}6814[6814] *Heartbeat* diff --git a/filebeat/_meta/common.reference.p2.yml b/filebeat/_meta/common.reference.p2.yml index 94c03723a9cc..16f30c5f8208 100644 --- a/filebeat/_meta/common.reference.p2.yml +++ b/filebeat/_meta/common.reference.p2.yml @@ -286,6 +286,11 @@ filebeat.inputs: # This option is not supported on Windows. #filebeat.registry_file_permissions: 0600 +# By default Ingest pipelines are not updated if a pipeline with the same ID +# already exists. If this option is enabled Filebeat overwrites pipelines +# everytime a new Elasticsearch connection is established. +#filebeat.overwrite_pipelines: false + # These config files must have the full filebeat config part inside, but only # the input part is processed. All global options like spool_size are ignored. # The config_dir MUST point to a different directory then where the main filebeat config file is in. diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 20e253f43a81..39596849c4b2 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -35,8 +35,7 @@ const pipelinesWarning = "Filebeat is unable to load the Ingest Node pipelines f " can ignore this warning." var ( - once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF") - updatePipelines = flag.Bool("update-pipelines", false, "Update Ingest pipelines") + once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF") ) // Filebeat is a beater object. Contains all objects needed to run the beat @@ -127,9 +126,33 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { b.SetupMLCallback = func(b *beat.Beat, kibanaConfig *common.Config) error { return fb.loadModulesML(b, kibanaConfig) } + + err = fb.setupPipelineLoaderCallback(b) + if err != nil { + return nil, err + } + return fb, nil } +func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error { + if !fb.moduleRegistry.Empty() { + overwritePipelines := fb.config.OverwritePipelines + if b.InSetupCmd { + overwritePipelines = true + } + + b.OverwritePipelinesCallback = func(esConfig *common.Config) error { + esClient, err := elasticsearch.NewConnectedClient(esConfig) + if err != nil { + return err + } + return fb.moduleRegistry.LoadPipelines(esClient, overwritePipelines) + } + } + return nil +} + // loadModulesPipelines is called when modules are configured to do the initial // setup. func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error { @@ -138,10 +161,15 @@ func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error { return nil } + overwritePipelines := fb.config.OverwritePipelines + if b.InSetupCmd { + overwritePipelines = true + } + // register pipeline loading to happen every time a new ES connection is // established callback := func(esClient *elasticsearch.Client) error { - return fb.moduleRegistry.LoadPipelines(esClient, *updatePipelines) + return fb.moduleRegistry.LoadPipelines(esClient, overwritePipelines) } elasticsearch.RegisterConnectCallback(callback) @@ -314,11 +342,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error { logp.Warn(pipelinesWarning) } - if *updatePipelines { + if config.OverwritePipelines { logp.Debug("modules", "Existing Ingest pipelines will be updated") } - err = crawler.Start(registrar, config.ConfigInput, config.ConfigModules, pipelineLoaderFactory, *updatePipelines) + err = crawler.Start(registrar, config.ConfigInput, config.ConfigModules, pipelineLoaderFactory, config.OverwritePipelines) if err != nil { crawler.Stop() return err diff --git a/filebeat/cmd/root.go b/filebeat/cmd/root.go index 52cbac1ab618..250f183913ce 100644 --- a/filebeat/cmd/root.go +++ b/filebeat/cmd/root.go @@ -20,7 +20,6 @@ func init() { var runFlags = pflag.NewFlagSet(Name, pflag.ExitOnError) runFlags.AddGoFlag(flag.CommandLine.Lookup("once")) runFlags.AddGoFlag(flag.CommandLine.Lookup("modules")) - runFlags.AddGoFlag(flag.CommandLine.Lookup("update-pipelines")) RootCmd = cmd.GenRootCmdWithRunFlags(Name, "", beater.New, runFlags) RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("M")) diff --git a/filebeat/config/config.go b/filebeat/config/config.go index 2ebeb7a8a697..2a1aa3baf277 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -34,6 +34,7 @@ type Config struct { ConfigProspector *common.Config `config:"config.prospectors"` ConfigModules *common.Config `config:"config.modules"` Autodiscover *autodiscover.Config `config:"autodiscover"` + OverwritePipelines bool `config:"overwrite_pipelines"` } var ( @@ -41,6 +42,7 @@ var ( RegistryFile: "registry", RegistryFilePermissions: 0600, ShutdownTimeout: 0, + OverwritePipelines: false, } ) diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index 183a8dd688bc..a1f92005625d 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -43,7 +43,7 @@ func New(out channel.Factory, inputConfigs []*common.Config, beatVersion string, // Start starts the crawler with all inputs func (c *Crawler) Start(r *registrar.Registrar, configInputs *common.Config, - configModules *common.Config, pipelineLoaderFactory fileset.PipelineLoaderFactory, updatePipelines bool) error { + configModules *common.Config, pipelineLoaderFactory fileset.PipelineLoaderFactory, overwritePipelines bool) error { logp.Info("Loading Inputs: %v", len(c.inputConfigs)) @@ -67,7 +67,7 @@ func (c *Crawler) Start(r *registrar.Registrar, configInputs *common.Config, }() } - c.ModulesFactory = fileset.NewFactory(c.out, r, c.beatVersion, pipelineLoaderFactory, updatePipelines, c.beatDone) + c.ModulesFactory = fileset.NewFactory(c.out, r, c.beatVersion, pipelineLoaderFactory, overwritePipelines, c.beatDone) if configModules.Enabled() { c.modulesReloader = cfgfile.NewReloader(configModules) if err := c.modulesReloader.Check(c.ModulesFactory); err != nil { diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 2b6e86f73aca..1ade1cf2d653 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -595,6 +595,11 @@ filebeat.inputs: # This option is not supported on Windows. #filebeat.registry_file_permissions: 0600 +# By default Ingest pipelines are not updated if a pipeline with the same ID +# already exists. If this option is enabled Filebeat overwrites pipelines +# everytime a new Elasticsearch connection is established. +#filebeat.overwrite_pipelines: false + # These config files must have the full filebeat config part inside, but only # the input part is processed. All global options like spool_size are ignored. # The config_dir MUST point to a different directory then where the main filebeat config file is in. diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go index 0c8fe39e3c31..473877fbf4c6 100644 --- a/filebeat/fileset/factory.go +++ b/filebeat/fileset/factory.go @@ -18,7 +18,7 @@ type Factory struct { registrar *registrar.Registrar beatVersion string pipelineLoaderFactory PipelineLoaderFactory - updatePipelines bool + overwritePipelines bool beatDone chan struct{} } @@ -28,19 +28,19 @@ type inputsRunner struct { moduleRegistry *ModuleRegistry inputs []*input.Runner pipelineLoaderFactory PipelineLoaderFactory - updatePipelines bool + overwritePipelines bool } // NewFactory instantiates a new Factory func NewFactory(outlet channel.Factory, registrar *registrar.Registrar, beatVersion string, - pipelineLoaderFactory PipelineLoaderFactory, updatePipelines bool, beatDone chan struct{}) *Factory { + pipelineLoaderFactory PipelineLoaderFactory, overwritePipelines bool, beatDone chan struct{}) *Factory { return &Factory{ outlet: outlet, registrar: registrar, beatVersion: beatVersion, beatDone: beatDone, pipelineLoaderFactory: pipelineLoaderFactory, - updatePipelines: updatePipelines, + overwritePipelines: overwritePipelines, } } @@ -79,7 +79,7 @@ func (f *Factory) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile. moduleRegistry: m, inputs: inputs, pipelineLoaderFactory: f.pipelineLoaderFactory, - updatePipelines: f.updatePipelines, + overwritePipelines: f.overwritePipelines, }, nil } @@ -91,7 +91,7 @@ func (p *inputsRunner) Start() { if err != nil { logp.Err("Error loading pipeline: %s", err) } else { - err := p.moduleRegistry.LoadPipelines(pipelineLoader, p.updatePipelines) + err := p.moduleRegistry.LoadPipelines(pipelineLoader, p.overwritePipelines) if err != nil { // Log error and continue logp.Err("Error loading pipeline: %s", err) @@ -100,7 +100,7 @@ func (p *inputsRunner) Start() { // Callback: callback := func(esClient *elasticsearch.Client) error { - return p.moduleRegistry.LoadPipelines(esClient, p.updatePipelines) + return p.moduleRegistry.LoadPipelines(esClient, p.overwritePipelines) } elasticsearch.RegisterConnectCallback(callback) } diff --git a/filebeat/fileset/modules.go b/filebeat/fileset/modules.go index 666ee0e45eb8..6c2cc897c22e 100644 --- a/filebeat/fileset/modules.go +++ b/filebeat/fileset/modules.go @@ -257,51 +257,13 @@ func (reg *ModuleRegistry) GetInputConfigs() ([]*common.Config, error) { return result, nil } -// PipelineLoader factory builds and returns a PipelineLoader -type PipelineLoaderFactory func() (PipelineLoader, error) - -// PipelineLoader is a subset of the Elasticsearch client API capable of loading -// the pipelines. -type PipelineLoader interface { - LoadJSON(path string, json map[string]interface{}) ([]byte, error) - Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error) - GetVersion() string -} - -// LoadPipelines loads the pipelines for each configured fileset. -func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, forceUpdate bool) error { - for module, filesets := range reg.registry { - for name, fileset := range filesets { - // check that all the required Ingest Node plugins are available - requiredProcessors := fileset.GetRequiredProcessors() - logp.Debug("modules", "Required processors: %s", requiredProcessors) - if len(requiredProcessors) > 0 { - err := checkAvailableProcessors(esClient, requiredProcessors) - if err != nil { - return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) - } - } - - pipelineID, content, err := fileset.GetPipeline(esClient.GetVersion()) - if err != nil { - return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err) - } - err = loadPipeline(esClient, pipelineID, content, forceUpdate) - if err != nil { - return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) - } - } - } - return nil -} - // InfoString returns the enabled modules and filesets in a single string, ready to // be shown to the user func (reg *ModuleRegistry) InfoString() string { var result string for module, filesets := range reg.registry { var filesetNames string - for name, _ := range filesets { + for name := range filesets { if filesetNames != "" { filesetNames += ", " } @@ -374,87 +336,6 @@ func checkAvailableProcessors(esClient PipelineLoader, requiredProcessors []Proc return nil } -func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}, forceUpdate bool) error { - path := "/_ingest/pipeline/" + pipelineID - if !forceUpdate { - status, _, _ := esClient.Request("GET", path, "", nil, nil) - if status == 200 { - logp.Debug("modules", "Pipeline %s already loaded", pipelineID) - return nil - } - } - body, err := esClient.LoadJSON(path, content) - if err != nil { - return interpretError(err, body) - } - logp.Info("Elasticsearch pipeline with ID '%s' loaded", pipelineID) - return nil -} - -func interpretError(initialErr error, body []byte) error { - var response struct { - Error struct { - RootCause []struct { - Type string `json:"type"` - Reason string `json:"reason"` - Header struct { - ProcessorType string `json:"processor_type"` - } `json:"header"` - Index string `json:"index"` - } `json:"root_cause"` - } `json:"error"` - } - err := json.Unmarshal(body, &response) - if err != nil { - // this might be ES < 2.0. Do a best effort to check for ES 1.x - var response1x struct { - Error string `json:"error"` - } - err1x := json.Unmarshal(body, &response1x) - if err1x == nil && response1x.Error != "" { - return fmt.Errorf("The Filebeat modules require Elasticsearch >= 5.0. "+ - "This is the response I got from Elasticsearch: %s", body) - } - - return fmt.Errorf("couldn't load pipeline: %v. Additionally, error decoding response body: %s", - initialErr, body) - } - - // missing plugins? - if len(response.Error.RootCause) > 0 && - response.Error.RootCause[0].Type == "parse_exception" && - strings.HasPrefix(response.Error.RootCause[0].Reason, "No processor type exists with name") && - response.Error.RootCause[0].Header.ProcessorType != "" { - - plugins := map[string]string{ - "geoip": "ingest-geoip", - "user_agent": "ingest-user-agent", - } - plugin, ok := plugins[response.Error.RootCause[0].Header.ProcessorType] - if !ok { - return fmt.Errorf("This module requires an Elasticsearch plugin that provides the %s processor. "+ - "Please visit the Elasticsearch documentation for instructions on how to install this plugin. "+ - "Response body: %s", response.Error.RootCause[0].Header.ProcessorType, body) - } - - return fmt.Errorf("This module requires the %s plugin to be installed in Elasticsearch. "+ - "You can install it using the following command in the Elasticsearch home directory:\n"+ - " sudo bin/elasticsearch-plugin install %s", plugin, plugin) - } - - // older ES version? - if len(response.Error.RootCause) > 0 && - response.Error.RootCause[0].Type == "invalid_index_name_exception" && - response.Error.RootCause[0].Index == "_ingest" { - - return fmt.Errorf("The Ingest Node functionality seems to be missing from Elasticsearch. "+ - "The Filebeat modules require Elasticsearch >= 5.0. "+ - "This is the response I got from Elasticsearch: %s", body) - } - - return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body) -} - // LoadML loads the machine-learning configurations into Elasticsearch, if X-Pack is available func (reg *ModuleRegistry) LoadML(esClient PipelineLoader) error { haveXpack, err := mlimporter.HaveXpackML(esClient) diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go new file mode 100644 index 000000000000..e56d6824e7db --- /dev/null +++ b/filebeat/fileset/pipelines.go @@ -0,0 +1,128 @@ +package fileset + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/elastic/beats/libbeat/logp" +) + +// PipelineLoaderFactory builds and returns a PipelineLoader +type PipelineLoaderFactory func() (PipelineLoader, error) + +// PipelineLoader is a subset of the Elasticsearch client API capable of loading +// the pipelines. +type PipelineLoader interface { + LoadJSON(path string, json map[string]interface{}) ([]byte, error) + Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error) + GetVersion() string +} + +// LoadPipelines loads the pipelines for each configured fileset. +func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool) error { + for module, filesets := range reg.registry { + for name, fileset := range filesets { + // check that all the required Ingest Node plugins are available + requiredProcessors := fileset.GetRequiredProcessors() + logp.Debug("modules", "Required processors: %s", requiredProcessors) + if len(requiredProcessors) > 0 { + err := checkAvailableProcessors(esClient, requiredProcessors) + if err != nil { + return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) + } + } + + pipelineID, content, err := fileset.GetPipeline(esClient.GetVersion()) + if err != nil { + return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err) + } + err = loadPipeline(esClient, pipelineID, content, overwrite) + if err != nil { + return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) + } + } + } + return nil +} + +func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}, overwrite bool) error { + path := "/_ingest/pipeline/" + pipelineID + if !overwrite { + status, _, _ := esClient.Request("GET", path, "", nil, nil) + if status == 200 { + logp.Debug("modules", "Pipeline %s already loaded", pipelineID) + return nil + } + } + body, err := esClient.LoadJSON(path, content) + if err != nil { + return interpretError(err, body) + } + logp.Info("Elasticsearch pipeline with ID '%s' loaded", pipelineID) + return nil +} + +func interpretError(initialErr error, body []byte) error { + var response struct { + Error struct { + RootCause []struct { + Type string `json:"type"` + Reason string `json:"reason"` + Header struct { + ProcessorType string `json:"processor_type"` + } `json:"header"` + Index string `json:"index"` + } `json:"root_cause"` + } `json:"error"` + } + err := json.Unmarshal(body, &response) + if err != nil { + // this might be ES < 2.0. Do a best effort to check for ES 1.x + var response1x struct { + Error string `json:"error"` + } + err1x := json.Unmarshal(body, &response1x) + if err1x == nil && response1x.Error != "" { + return fmt.Errorf("The Filebeat modules require Elasticsearch >= 5.0. "+ + "This is the response I got from Elasticsearch: %s", body) + } + + return fmt.Errorf("couldn't load pipeline: %v. Additionally, error decoding response body: %s", + initialErr, body) + } + + // missing plugins? + if len(response.Error.RootCause) > 0 && + response.Error.RootCause[0].Type == "parse_exception" && + strings.HasPrefix(response.Error.RootCause[0].Reason, "No processor type exists with name") && + response.Error.RootCause[0].Header.ProcessorType != "" { + + plugins := map[string]string{ + "geoip": "ingest-geoip", + "user_agent": "ingest-user-agent", + } + plugin, ok := plugins[response.Error.RootCause[0].Header.ProcessorType] + if !ok { + return fmt.Errorf("This module requires an Elasticsearch plugin that provides the %s processor. "+ + "Please visit the Elasticsearch documentation for instructions on how to install this plugin. "+ + "Response body: %s", response.Error.RootCause[0].Header.ProcessorType, body) + } + + return fmt.Errorf("This module requires the %s plugin to be installed in Elasticsearch. "+ + "You can install it using the following command in the Elasticsearch home directory:\n"+ + " sudo bin/elasticsearch-plugin install %s", plugin, plugin) + } + + // older ES version? + if len(response.Error.RootCause) > 0 && + response.Error.RootCause[0].Type == "invalid_index_name_exception" && + response.Error.RootCause[0].Index == "_ingest" { + + return fmt.Errorf("The Ingest Node functionality seems to be missing from Elasticsearch. "+ + "The Filebeat modules require Elasticsearch >= 5.0. "+ + "This is the response I got from Elasticsearch: %s", body) + } + + return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body) +} diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index 60e2783483c9..f30f474818ee 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -38,6 +38,7 @@ type Beat struct { SetupMLCallback SetupMLCallback // setup callback for ML job configs InSetupCmd bool // this is set to true when the `setup` command is called + OverwritePipelinesCallback OverwritePipelinesCallback // ingest pipeline loader callback // XXX: remove Config from public interface. // It's currently used by filebeat modules to setup the Ingest Node // pipeline and ML jobs. @@ -55,3 +56,7 @@ type BeatConfig struct { // SetupMLCallback can be used by the Beat to register MachineLearning configurations // for the enabled modules. type SetupMLCallback func(*Beat, *common.Config) error + +// OverwritePipelinesCallback can be used by the Beat to register Ingest pipeline loader +// for the enabled modules. +type OverwritePipelinesCallback func(*common.Config) error diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 7d6a292eaf76..e908054ae0ac 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -341,7 +341,7 @@ func (b *Beat) TestConfig(bt beat.Creator) error { } // Setup registers ES index template and kibana dashboards -func (b *Beat) Setup(bt beat.Creator, template, dashboards, machineLearning bool) error { +func (b *Beat) Setup(bt beat.Creator, template, dashboards, machineLearning, pipelines bool) error { return handleError(func() error { err := b.Init() if err != nil { @@ -404,6 +404,16 @@ func (b *Beat) Setup(bt beat.Creator, template, dashboards, machineLearning bool fmt.Println("Loaded machine learning job configurations") } + if pipelines && b.OverwritePipelinesCallback != nil { + esConfig := b.Config.Output.Config() + err = b.OverwritePipelinesCallback(esConfig) + if err != nil { + return err + } + + fmt.Println("Loaded Ingest pipelines") + } + return nil }()) } diff --git a/libbeat/cmd/setup.go b/libbeat/cmd/setup.go index b1b0306950a8..fa04e97b3c79 100644 --- a/libbeat/cmd/setup.go +++ b/libbeat/cmd/setup.go @@ -19,6 +19,7 @@ func genSetupCmd(name, idxPrefix, version string, beatCreator beat.Creator) *cob * Index mapping template in Elasticsearch to ensure fields are mapped. * Kibana dashboards (where available). * ML jobs (where available). + * Ingest pipelines (where available). `, Run: func(cmd *cobra.Command, args []string) { beat, err := instance.NewBeat(name, idxPrefix, version) @@ -30,15 +31,16 @@ func genSetupCmd(name, idxPrefix, version string, beatCreator beat.Creator) *cob template, _ := cmd.Flags().GetBool("template") dashboards, _ := cmd.Flags().GetBool("dashboards") machineLearning, _ := cmd.Flags().GetBool("machine-learning") + pipelines, _ := cmd.Flags().GetBool("pipelines") // No flags: setup all - if !template && !dashboards && !machineLearning { + if !template && !dashboards && !machineLearning && !pipelines { template = true dashboards = true machineLearning = true } - if err = beat.Setup(beatCreator, template, dashboards, machineLearning); err != nil { + if err = beat.Setup(beatCreator, template, dashboards, machineLearning, pipelines); err != nil { os.Exit(1) } }, @@ -47,6 +49,7 @@ func genSetupCmd(name, idxPrefix, version string, beatCreator beat.Creator) *cob setup.Flags().Bool("template", false, "Setup index template only") setup.Flags().Bool("dashboards", false, "Setup dashboards only") setup.Flags().Bool("machine-learning", false, "Setup machine learning job configurations only") + setup.Flags().Bool("pipelines", false, "Setup Ingest pipelines only") return &setup }