Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Ingest pipeline loading to setup #6814

Merged
merged 8 commits into from
Apr 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Add json.ignore_decoding_error config to not log json decoding erors. {issue}6547[6547]
- Make registry file permission configurable. {pull}6455[6455]
- Add MongoDB module. {pull}6283[6238]
- Add Ingest pipeline loading to setup. {pull}6814[6814]

*Heartbeat*

Expand Down
5 changes: 5 additions & 0 deletions filebeat/_meta/common.reference.p2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,11 @@ filebeat.inputs:
# This option is not supported on Windows.
#filebeat.registry_file_permissions: 0600

# By default Ingest pipelines are not updated if a pipeline with the same ID
# already exists. If this option is enabled Filebeat overwrites pipelines
# everytime a new Elasticsearch connection is established.
#filebeat.overwrite_pipelines: false

# These config files must have the full filebeat config part inside, but only
# the input part is processed. All global options like spool_size are ignored.
# The config_dir MUST point to a different directory then where the main filebeat config file is in.
Expand Down
38 changes: 33 additions & 5 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ const pipelinesWarning = "Filebeat is unable to load the Ingest Node pipelines f
" can ignore this warning."

var (
once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF")
updatePipelines = flag.Bool("update-pipelines", false, "Update Ingest pipelines")
once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF")
)

// Filebeat is a beater object. Contains all objects needed to run the beat
Expand Down Expand Up @@ -127,9 +126,33 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
b.SetupMLCallback = func(b *beat.Beat, kibanaConfig *common.Config) error {
return fb.loadModulesML(b, kibanaConfig)
}

err = fb.setupPipelineLoaderCallback(b)
if err != nil {
return nil, err
}

return fb, nil
}

func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
if !fb.moduleRegistry.Empty() {
overwritePipelines := fb.config.OverwritePipelines
if b.InSetupCmd {
overwritePipelines = true
}

b.OverwritePipelinesCallback = func(esConfig *common.Config) error {
esClient, err := elasticsearch.NewConnectedClient(esConfig)
if err != nil {
return err
}
return fb.moduleRegistry.LoadPipelines(esClient, overwritePipelines)
}
}
return nil
}

// loadModulesPipelines is called when modules are configured to do the initial
// setup.
func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {
Expand All @@ -138,10 +161,15 @@ func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {
return nil
}

overwritePipelines := fb.config.OverwritePipelines
if b.InSetupCmd {
overwritePipelines = true
}

// register pipeline loading to happen every time a new ES connection is
// established
callback := func(esClient *elasticsearch.Client) error {
return fb.moduleRegistry.LoadPipelines(esClient, *updatePipelines)
return fb.moduleRegistry.LoadPipelines(esClient, overwritePipelines)
}
elasticsearch.RegisterConnectCallback(callback)

Expand Down Expand Up @@ -314,11 +342,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
logp.Warn(pipelinesWarning)
}

if *updatePipelines {
if config.OverwritePipelines {
logp.Debug("modules", "Existing Ingest pipelines will be updated")
}

err = crawler.Start(registrar, config.ConfigInput, config.ConfigModules, pipelineLoaderFactory, *updatePipelines)
err = crawler.Start(registrar, config.ConfigInput, config.ConfigModules, pipelineLoaderFactory, config.OverwritePipelines)
if err != nil {
crawler.Stop()
return err
Expand Down
1 change: 0 additions & 1 deletion filebeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ func init() {
var runFlags = pflag.NewFlagSet(Name, pflag.ExitOnError)
runFlags.AddGoFlag(flag.CommandLine.Lookup("once"))
runFlags.AddGoFlag(flag.CommandLine.Lookup("modules"))
runFlags.AddGoFlag(flag.CommandLine.Lookup("update-pipelines"))

RootCmd = cmd.GenRootCmdWithRunFlags(Name, "", beater.New, runFlags)
RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("M"))
Expand Down
2 changes: 2 additions & 0 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ type Config struct {
ConfigProspector *common.Config `config:"config.prospectors"`
ConfigModules *common.Config `config:"config.modules"`
Autodiscover *autodiscover.Config `config:"autodiscover"`
OverwritePipelines bool `config:"overwrite_pipelines"`
}

var (
DefaultConfig = Config{
RegistryFile: "registry",
RegistryFilePermissions: 0600,
ShutdownTimeout: 0,
OverwritePipelines: false,
}
)

Expand Down
4 changes: 2 additions & 2 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func New(out channel.Factory, inputConfigs []*common.Config, beatVersion string,

// Start starts the crawler with all inputs
func (c *Crawler) Start(r *registrar.Registrar, configInputs *common.Config,
configModules *common.Config, pipelineLoaderFactory fileset.PipelineLoaderFactory, updatePipelines bool) error {
configModules *common.Config, pipelineLoaderFactory fileset.PipelineLoaderFactory, overwritePipelines bool) error {

logp.Info("Loading Inputs: %v", len(c.inputConfigs))

Expand All @@ -67,7 +67,7 @@ func (c *Crawler) Start(r *registrar.Registrar, configInputs *common.Config,
}()
}

c.ModulesFactory = fileset.NewFactory(c.out, r, c.beatVersion, pipelineLoaderFactory, updatePipelines, c.beatDone)
c.ModulesFactory = fileset.NewFactory(c.out, r, c.beatVersion, pipelineLoaderFactory, overwritePipelines, c.beatDone)
if configModules.Enabled() {
c.modulesReloader = cfgfile.NewReloader(configModules)
if err := c.modulesReloader.Check(c.ModulesFactory); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,11 @@ filebeat.inputs:
# This option is not supported on Windows.
#filebeat.registry_file_permissions: 0600

# By default Ingest pipelines are not updated if a pipeline with the same ID
# already exists. If this option is enabled Filebeat overwrites pipelines
# everytime a new Elasticsearch connection is established.
#filebeat.overwrite_pipelines: false

# These config files must have the full filebeat config part inside, but only
# the input part is processed. All global options like spool_size are ignored.
# The config_dir MUST point to a different directory then where the main filebeat config file is in.
Expand Down
14 changes: 7 additions & 7 deletions filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Factory struct {
registrar *registrar.Registrar
beatVersion string
pipelineLoaderFactory PipelineLoaderFactory
updatePipelines bool
overwritePipelines bool
beatDone chan struct{}
}

Expand All @@ -28,19 +28,19 @@ type inputsRunner struct {
moduleRegistry *ModuleRegistry
inputs []*input.Runner
pipelineLoaderFactory PipelineLoaderFactory
updatePipelines bool
overwritePipelines bool
}

// NewFactory instantiates a new Factory
func NewFactory(outlet channel.Factory, registrar *registrar.Registrar, beatVersion string,
pipelineLoaderFactory PipelineLoaderFactory, updatePipelines bool, beatDone chan struct{}) *Factory {
pipelineLoaderFactory PipelineLoaderFactory, overwritePipelines bool, beatDone chan struct{}) *Factory {
return &Factory{
outlet: outlet,
registrar: registrar,
beatVersion: beatVersion,
beatDone: beatDone,
pipelineLoaderFactory: pipelineLoaderFactory,
updatePipelines: updatePipelines,
overwritePipelines: overwritePipelines,
}
}

Expand Down Expand Up @@ -79,7 +79,7 @@ func (f *Factory) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile.
moduleRegistry: m,
inputs: inputs,
pipelineLoaderFactory: f.pipelineLoaderFactory,
updatePipelines: f.updatePipelines,
overwritePipelines: f.overwritePipelines,
}, nil
}

Expand All @@ -91,7 +91,7 @@ func (p *inputsRunner) Start() {
if err != nil {
logp.Err("Error loading pipeline: %s", err)
} else {
err := p.moduleRegistry.LoadPipelines(pipelineLoader, p.updatePipelines)
err := p.moduleRegistry.LoadPipelines(pipelineLoader, p.overwritePipelines)
if err != nil {
// Log error and continue
logp.Err("Error loading pipeline: %s", err)
Expand All @@ -100,7 +100,7 @@ func (p *inputsRunner) Start() {

// Callback:
callback := func(esClient *elasticsearch.Client) error {
return p.moduleRegistry.LoadPipelines(esClient, p.updatePipelines)
return p.moduleRegistry.LoadPipelines(esClient, p.overwritePipelines)
}
elasticsearch.RegisterConnectCallback(callback)
}
Expand Down
121 changes: 1 addition & 120 deletions filebeat/fileset/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,51 +257,13 @@ func (reg *ModuleRegistry) GetInputConfigs() ([]*common.Config, error) {
return result, nil
}

// PipelineLoader factory builds and returns a PipelineLoader
type PipelineLoaderFactory func() (PipelineLoader, error)

// PipelineLoader is a subset of the Elasticsearch client API capable of loading
// the pipelines.
type PipelineLoader interface {
LoadJSON(path string, json map[string]interface{}) ([]byte, error)
Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error)
GetVersion() string
}

// LoadPipelines loads the pipelines for each configured fileset.
func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, forceUpdate bool) error {
for module, filesets := range reg.registry {
for name, fileset := range filesets {
// check that all the required Ingest Node plugins are available
requiredProcessors := fileset.GetRequiredProcessors()
logp.Debug("modules", "Required processors: %s", requiredProcessors)
if len(requiredProcessors) > 0 {
err := checkAvailableProcessors(esClient, requiredProcessors)
if err != nil {
return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err)
}
}

pipelineID, content, err := fileset.GetPipeline(esClient.GetVersion())
if err != nil {
return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err)
}
err = loadPipeline(esClient, pipelineID, content, forceUpdate)
if err != nil {
return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err)
}
}
}
return nil
}

// InfoString returns the enabled modules and filesets in a single string, ready to
// be shown to the user
func (reg *ModuleRegistry) InfoString() string {
var result string
for module, filesets := range reg.registry {
var filesetNames string
for name, _ := range filesets {
for name := range filesets {
if filesetNames != "" {
filesetNames += ", "
}
Expand Down Expand Up @@ -374,87 +336,6 @@ func checkAvailableProcessors(esClient PipelineLoader, requiredProcessors []Proc
return nil
}

func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}, forceUpdate bool) error {
path := "/_ingest/pipeline/" + pipelineID
if !forceUpdate {
status, _, _ := esClient.Request("GET", path, "", nil, nil)
if status == 200 {
logp.Debug("modules", "Pipeline %s already loaded", pipelineID)
return nil
}
}
body, err := esClient.LoadJSON(path, content)
if err != nil {
return interpretError(err, body)
}
logp.Info("Elasticsearch pipeline with ID '%s' loaded", pipelineID)
return nil
}

func interpretError(initialErr error, body []byte) error {
var response struct {
Error struct {
RootCause []struct {
Type string `json:"type"`
Reason string `json:"reason"`
Header struct {
ProcessorType string `json:"processor_type"`
} `json:"header"`
Index string `json:"index"`
} `json:"root_cause"`
} `json:"error"`
}
err := json.Unmarshal(body, &response)
if err != nil {
// this might be ES < 2.0. Do a best effort to check for ES 1.x
var response1x struct {
Error string `json:"error"`
}
err1x := json.Unmarshal(body, &response1x)
if err1x == nil && response1x.Error != "" {
return fmt.Errorf("The Filebeat modules require Elasticsearch >= 5.0. "+
"This is the response I got from Elasticsearch: %s", body)
}

return fmt.Errorf("couldn't load pipeline: %v. Additionally, error decoding response body: %s",
initialErr, body)
}

// missing plugins?
if len(response.Error.RootCause) > 0 &&
response.Error.RootCause[0].Type == "parse_exception" &&
strings.HasPrefix(response.Error.RootCause[0].Reason, "No processor type exists with name") &&
response.Error.RootCause[0].Header.ProcessorType != "" {

plugins := map[string]string{
"geoip": "ingest-geoip",
"user_agent": "ingest-user-agent",
}
plugin, ok := plugins[response.Error.RootCause[0].Header.ProcessorType]
if !ok {
return fmt.Errorf("This module requires an Elasticsearch plugin that provides the %s processor. "+
"Please visit the Elasticsearch documentation for instructions on how to install this plugin. "+
"Response body: %s", response.Error.RootCause[0].Header.ProcessorType, body)
}

return fmt.Errorf("This module requires the %s plugin to be installed in Elasticsearch. "+
"You can install it using the following command in the Elasticsearch home directory:\n"+
" sudo bin/elasticsearch-plugin install %s", plugin, plugin)
}

// older ES version?
if len(response.Error.RootCause) > 0 &&
response.Error.RootCause[0].Type == "invalid_index_name_exception" &&
response.Error.RootCause[0].Index == "_ingest" {

return fmt.Errorf("The Ingest Node functionality seems to be missing from Elasticsearch. "+
"The Filebeat modules require Elasticsearch >= 5.0. "+
"This is the response I got from Elasticsearch: %s", body)
}

return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body)
}

// LoadML loads the machine-learning configurations into Elasticsearch, if X-Pack is available
func (reg *ModuleRegistry) LoadML(esClient PipelineLoader) error {
haveXpack, err := mlimporter.HaveXpackML(esClient)
Expand Down
Loading