Skip to content

Commit

Permalink
Add Ingest pipeline loading to setup (#6814)
Browse files Browse the repository at this point in the history
* make --update-pipelines a config option

* move pipeline loading to a separate file

* add ingest pipeline loading to setup command

* feed the hound

* remove config error

* add changelog entry

* add more context to config help

* rename update to overwrite both in config and code
  • Loading branch information
kvch authored and Steffen Siering committed Apr 11, 2018
1 parent d0bc8fa commit 2619e13
Show file tree
Hide file tree
Showing 13 changed files with 205 additions and 138 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Add json.ignore_decoding_error config to not log json decoding erors. {issue}6547[6547]
- Make registry file permission configurable. {pull}6455[6455]
- Add MongoDB module. {pull}6283[6238]
- Add Ingest pipeline loading to setup. {pull}6814[6814]

*Heartbeat*

Expand Down
5 changes: 5 additions & 0 deletions filebeat/_meta/common.reference.p2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,11 @@ filebeat.inputs:
# This option is not supported on Windows.
#filebeat.registry_file_permissions: 0600

# By default Ingest pipelines are not updated if a pipeline with the same ID
# already exists. If this option is enabled Filebeat overwrites pipelines
# everytime a new Elasticsearch connection is established.
#filebeat.overwrite_pipelines: false

# These config files must have the full filebeat config part inside, but only
# the input part is processed. All global options like spool_size are ignored.
# The config_dir MUST point to a different directory then where the main filebeat config file is in.
Expand Down
38 changes: 33 additions & 5 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ const pipelinesWarning = "Filebeat is unable to load the Ingest Node pipelines f
" can ignore this warning."

var (
once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF")
updatePipelines = flag.Bool("update-pipelines", false, "Update Ingest pipelines")
once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF")
)

// Filebeat is a beater object. Contains all objects needed to run the beat
Expand Down Expand Up @@ -127,9 +126,33 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
b.SetupMLCallback = func(b *beat.Beat, kibanaConfig *common.Config) error {
return fb.loadModulesML(b, kibanaConfig)
}

err = fb.setupPipelineLoaderCallback(b)
if err != nil {
return nil, err
}

return fb, nil
}

func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
if !fb.moduleRegistry.Empty() {
overwritePipelines := fb.config.OverwritePipelines
if b.InSetupCmd {
overwritePipelines = true
}

b.OverwritePipelinesCallback = func(esConfig *common.Config) error {
esClient, err := elasticsearch.NewConnectedClient(esConfig)
if err != nil {
return err
}
return fb.moduleRegistry.LoadPipelines(esClient, overwritePipelines)
}
}
return nil
}

// loadModulesPipelines is called when modules are configured to do the initial
// setup.
func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {
Expand All @@ -138,10 +161,15 @@ func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {
return nil
}

overwritePipelines := fb.config.OverwritePipelines
if b.InSetupCmd {
overwritePipelines = true
}

// register pipeline loading to happen every time a new ES connection is
// established
callback := func(esClient *elasticsearch.Client) error {
return fb.moduleRegistry.LoadPipelines(esClient, *updatePipelines)
return fb.moduleRegistry.LoadPipelines(esClient, overwritePipelines)
}
elasticsearch.RegisterConnectCallback(callback)

Expand Down Expand Up @@ -314,11 +342,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
logp.Warn(pipelinesWarning)
}

if *updatePipelines {
if config.OverwritePipelines {
logp.Debug("modules", "Existing Ingest pipelines will be updated")
}

err = crawler.Start(registrar, config.ConfigInput, config.ConfigModules, pipelineLoaderFactory, *updatePipelines)
err = crawler.Start(registrar, config.ConfigInput, config.ConfigModules, pipelineLoaderFactory, config.OverwritePipelines)
if err != nil {
crawler.Stop()
return err
Expand Down
1 change: 0 additions & 1 deletion filebeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ func init() {
var runFlags = pflag.NewFlagSet(Name, pflag.ExitOnError)
runFlags.AddGoFlag(flag.CommandLine.Lookup("once"))
runFlags.AddGoFlag(flag.CommandLine.Lookup("modules"))
runFlags.AddGoFlag(flag.CommandLine.Lookup("update-pipelines"))

RootCmd = cmd.GenRootCmdWithRunFlags(Name, "", beater.New, runFlags)
RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("M"))
Expand Down
2 changes: 2 additions & 0 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ type Config struct {
ConfigProspector *common.Config `config:"config.prospectors"`
ConfigModules *common.Config `config:"config.modules"`
Autodiscover *autodiscover.Config `config:"autodiscover"`
OverwritePipelines bool `config:"overwrite_pipelines"`
}

var (
DefaultConfig = Config{
RegistryFile: "registry",
RegistryFilePermissions: 0600,
ShutdownTimeout: 0,
OverwritePipelines: false,
}
)

Expand Down
4 changes: 2 additions & 2 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func New(out channel.Factory, inputConfigs []*common.Config, beatVersion string,

// Start starts the crawler with all inputs
func (c *Crawler) Start(r *registrar.Registrar, configInputs *common.Config,
configModules *common.Config, pipelineLoaderFactory fileset.PipelineLoaderFactory, updatePipelines bool) error {
configModules *common.Config, pipelineLoaderFactory fileset.PipelineLoaderFactory, overwritePipelines bool) error {

logp.Info("Loading Inputs: %v", len(c.inputConfigs))

Expand All @@ -67,7 +67,7 @@ func (c *Crawler) Start(r *registrar.Registrar, configInputs *common.Config,
}()
}

c.ModulesFactory = fileset.NewFactory(c.out, r, c.beatVersion, pipelineLoaderFactory, updatePipelines, c.beatDone)
c.ModulesFactory = fileset.NewFactory(c.out, r, c.beatVersion, pipelineLoaderFactory, overwritePipelines, c.beatDone)
if configModules.Enabled() {
c.modulesReloader = cfgfile.NewReloader(configModules)
if err := c.modulesReloader.Check(c.ModulesFactory); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,11 @@ filebeat.inputs:
# This option is not supported on Windows.
#filebeat.registry_file_permissions: 0600

# By default Ingest pipelines are not updated if a pipeline with the same ID
# already exists. If this option is enabled Filebeat overwrites pipelines
# everytime a new Elasticsearch connection is established.
#filebeat.overwrite_pipelines: false

# These config files must have the full filebeat config part inside, but only
# the input part is processed. All global options like spool_size are ignored.
# The config_dir MUST point to a different directory then where the main filebeat config file is in.
Expand Down
14 changes: 7 additions & 7 deletions filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Factory struct {
registrar *registrar.Registrar
beatVersion string
pipelineLoaderFactory PipelineLoaderFactory
updatePipelines bool
overwritePipelines bool
beatDone chan struct{}
}

Expand All @@ -28,19 +28,19 @@ type inputsRunner struct {
moduleRegistry *ModuleRegistry
inputs []*input.Runner
pipelineLoaderFactory PipelineLoaderFactory
updatePipelines bool
overwritePipelines bool
}

// NewFactory instantiates a new Factory
func NewFactory(outlet channel.Factory, registrar *registrar.Registrar, beatVersion string,
pipelineLoaderFactory PipelineLoaderFactory, updatePipelines bool, beatDone chan struct{}) *Factory {
pipelineLoaderFactory PipelineLoaderFactory, overwritePipelines bool, beatDone chan struct{}) *Factory {
return &Factory{
outlet: outlet,
registrar: registrar,
beatVersion: beatVersion,
beatDone: beatDone,
pipelineLoaderFactory: pipelineLoaderFactory,
updatePipelines: updatePipelines,
overwritePipelines: overwritePipelines,
}
}

Expand Down Expand Up @@ -79,7 +79,7 @@ func (f *Factory) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile.
moduleRegistry: m,
inputs: inputs,
pipelineLoaderFactory: f.pipelineLoaderFactory,
updatePipelines: f.updatePipelines,
overwritePipelines: f.overwritePipelines,
}, nil
}

Expand All @@ -91,7 +91,7 @@ func (p *inputsRunner) Start() {
if err != nil {
logp.Err("Error loading pipeline: %s", err)
} else {
err := p.moduleRegistry.LoadPipelines(pipelineLoader, p.updatePipelines)
err := p.moduleRegistry.LoadPipelines(pipelineLoader, p.overwritePipelines)
if err != nil {
// Log error and continue
logp.Err("Error loading pipeline: %s", err)
Expand All @@ -100,7 +100,7 @@ func (p *inputsRunner) Start() {

// Callback:
callback := func(esClient *elasticsearch.Client) error {
return p.moduleRegistry.LoadPipelines(esClient, p.updatePipelines)
return p.moduleRegistry.LoadPipelines(esClient, p.overwritePipelines)
}
elasticsearch.RegisterConnectCallback(callback)
}
Expand Down
121 changes: 1 addition & 120 deletions filebeat/fileset/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,51 +257,13 @@ func (reg *ModuleRegistry) GetInputConfigs() ([]*common.Config, error) {
return result, nil
}

// PipelineLoader factory builds and returns a PipelineLoader
type PipelineLoaderFactory func() (PipelineLoader, error)

// PipelineLoader is a subset of the Elasticsearch client API capable of loading
// the pipelines.
type PipelineLoader interface {
LoadJSON(path string, json map[string]interface{}) ([]byte, error)
Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error)
GetVersion() string
}

// LoadPipelines loads the pipelines for each configured fileset.
func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, forceUpdate bool) error {
for module, filesets := range reg.registry {
for name, fileset := range filesets {
// check that all the required Ingest Node plugins are available
requiredProcessors := fileset.GetRequiredProcessors()
logp.Debug("modules", "Required processors: %s", requiredProcessors)
if len(requiredProcessors) > 0 {
err := checkAvailableProcessors(esClient, requiredProcessors)
if err != nil {
return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err)
}
}

pipelineID, content, err := fileset.GetPipeline(esClient.GetVersion())
if err != nil {
return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err)
}
err = loadPipeline(esClient, pipelineID, content, forceUpdate)
if err != nil {
return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err)
}
}
}
return nil
}

// InfoString returns the enabled modules and filesets in a single string, ready to
// be shown to the user
func (reg *ModuleRegistry) InfoString() string {
var result string
for module, filesets := range reg.registry {
var filesetNames string
for name, _ := range filesets {
for name := range filesets {
if filesetNames != "" {
filesetNames += ", "
}
Expand Down Expand Up @@ -374,87 +336,6 @@ func checkAvailableProcessors(esClient PipelineLoader, requiredProcessors []Proc
return nil
}

func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}, forceUpdate bool) error {
path := "/_ingest/pipeline/" + pipelineID
if !forceUpdate {
status, _, _ := esClient.Request("GET", path, "", nil, nil)
if status == 200 {
logp.Debug("modules", "Pipeline %s already loaded", pipelineID)
return nil
}
}
body, err := esClient.LoadJSON(path, content)
if err != nil {
return interpretError(err, body)
}
logp.Info("Elasticsearch pipeline with ID '%s' loaded", pipelineID)
return nil
}

func interpretError(initialErr error, body []byte) error {
var response struct {
Error struct {
RootCause []struct {
Type string `json:"type"`
Reason string `json:"reason"`
Header struct {
ProcessorType string `json:"processor_type"`
} `json:"header"`
Index string `json:"index"`
} `json:"root_cause"`
} `json:"error"`
}
err := json.Unmarshal(body, &response)
if err != nil {
// this might be ES < 2.0. Do a best effort to check for ES 1.x
var response1x struct {
Error string `json:"error"`
}
err1x := json.Unmarshal(body, &response1x)
if err1x == nil && response1x.Error != "" {
return fmt.Errorf("The Filebeat modules require Elasticsearch >= 5.0. "+
"This is the response I got from Elasticsearch: %s", body)
}

return fmt.Errorf("couldn't load pipeline: %v. Additionally, error decoding response body: %s",
initialErr, body)
}

// missing plugins?
if len(response.Error.RootCause) > 0 &&
response.Error.RootCause[0].Type == "parse_exception" &&
strings.HasPrefix(response.Error.RootCause[0].Reason, "No processor type exists with name") &&
response.Error.RootCause[0].Header.ProcessorType != "" {

plugins := map[string]string{
"geoip": "ingest-geoip",
"user_agent": "ingest-user-agent",
}
plugin, ok := plugins[response.Error.RootCause[0].Header.ProcessorType]
if !ok {
return fmt.Errorf("This module requires an Elasticsearch plugin that provides the %s processor. "+
"Please visit the Elasticsearch documentation for instructions on how to install this plugin. "+
"Response body: %s", response.Error.RootCause[0].Header.ProcessorType, body)
}

return fmt.Errorf("This module requires the %s plugin to be installed in Elasticsearch. "+
"You can install it using the following command in the Elasticsearch home directory:\n"+
" sudo bin/elasticsearch-plugin install %s", plugin, plugin)
}

// older ES version?
if len(response.Error.RootCause) > 0 &&
response.Error.RootCause[0].Type == "invalid_index_name_exception" &&
response.Error.RootCause[0].Index == "_ingest" {

return fmt.Errorf("The Ingest Node functionality seems to be missing from Elasticsearch. "+
"The Filebeat modules require Elasticsearch >= 5.0. "+
"This is the response I got from Elasticsearch: %s", body)
}

return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body)
}

// LoadML loads the machine-learning configurations into Elasticsearch, if X-Pack is available
func (reg *ModuleRegistry) LoadML(esClient PipelineLoader) error {
haveXpack, err := mlimporter.HaveXpackML(esClient)
Expand Down
Loading

0 comments on commit 2619e13

Please sign in to comment.