From adcd3d0d06781f05f6ca9dccbc78190157a66f43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Mon, 6 Nov 2017 14:06:31 +0100 Subject: [PATCH] Fix ML jobs setup for dynamic modules (#5509) * Fix ML jobs setup for dynamic modules Modules from `modules.d` were ignored by both `setup` command and `--setup` flag. Fixes #5504 * Fix test --- filebeat/beater/filebeat.go | 42 ++++++++-- filebeat/docker-compose.yml | 9 +++ .../system/config/filebeat_modules.yml.j2 | 9 +++ filebeat/tests/system/test_modules.py | 76 ++++++++++++++----- libbeat/tests/system/beat/beat.py | 9 +++ 5 files changed, 122 insertions(+), 23 deletions(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index bba19a3300d8..da3829f79369 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -4,9 +4,11 @@ import ( "flag" "fmt" + "github.com/joeshaw/multierror" "github.com/pkg/errors" "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/logp" @@ -99,10 +101,8 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { } // register `setup` callback for ML jobs - if !moduleRegistry.Empty() { - b.SetupMLCallback = func(b *beat.Beat) error { - return fb.loadModulesML(b) - } + b.SetupMLCallback = func(b *beat.Beat) error { + return fb.loadModulesML(b) } return fb, nil } @@ -127,6 +127,7 @@ func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error { func (fb *Filebeat) loadModulesML(b *beat.Beat) error { logp.Debug("machine-learning", "Setting up ML jobs for modules") + var errs multierror.Errors if b.Config.Output.Name() != "elasticsearch" { logp.Warn("Filebeat is unable to load the Xpack Machine Learning configurations for the" + @@ -139,8 +140,39 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat) error { if err != nil { return errors.Errorf("Error creating Elasticsearch client: %v", err) } + if err := fb.moduleRegistry.LoadML(esClient); err != nil { + errs = append(errs, err) + } + + // Add dynamic modules.d + if fb.config.ConfigModules.Enabled() { + config := cfgfile.DefaultDynamicConfig + fb.config.ConfigModules.Unpack(&config) + + modulesManager, err := cfgfile.NewGlobManager(config.Path, ".yml", ".disabled") + if err != nil { + return errors.Wrap(err, "initialization error") + } + + for _, file := range modulesManager.ListEnabled() { + confs, err := cfgfile.LoadList(file.Path) + if err != nil { + errs = append(errs, errors.Wrap(err, "error loading config file")) + continue + } + set, err := fileset.NewModuleRegistry(confs, "", false) + if err != nil { + errs = append(errs, err) + continue + } + + if err := set.LoadML(esClient); err != nil { + errs = append(errs, err) + } + } + } - return fb.moduleRegistry.LoadML(esClient) + return errs.Err() } // Run allows the beater to be run as a beat. diff --git a/filebeat/docker-compose.yml b/filebeat/docker-compose.yml index cefff33663a0..6559fffe8b23 100644 --- a/filebeat/docker-compose.yml +++ b/filebeat/docker-compose.yml @@ -7,6 +7,9 @@ services: env_file: - ${PWD}/build/test.env - ${PWD}/prospector/redis/_meta/env + environment: + - KIBANA_HOST=kibana + - KIBANA_PORT=5601 working_dir: /go/src/github.com/elastic/beats/filebeat volumes: - ${PWD}/..:/go/src/github.com/elastic/beats/ @@ -18,6 +21,7 @@ services: image: busybox depends_on: elasticsearch: { condition: service_healthy } + kibana: { condition: service_healthy } redis: { condition: service_healthy } elasticsearch: @@ -25,5 +29,10 @@ services: file: ../testing/environments/${TESTING_ENVIRONMENT}.yml service: elasticsearch + kibana: + extends: + file: ../testing/environments/${TESTING_ENVIRONMENT}.yml + service: kibana + redis: build: ${PWD}/prospector/redis/_meta diff --git a/filebeat/tests/system/config/filebeat_modules.yml.j2 b/filebeat/tests/system/config/filebeat_modules.yml.j2 index b62c9c293c98..6f003525289a 100644 --- a/filebeat/tests/system/config/filebeat_modules.yml.j2 +++ b/filebeat/tests/system/config/filebeat_modules.yml.j2 @@ -1,7 +1,16 @@ filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("registry")}} +filebeat.config.modules: + path: {{ beat.working_dir + '/modules.d/*.yml' }} + output.elasticsearch.hosts: ["{{ elasticsearch_url }}"] output.elasticsearch.index: {{ index_name }} setup.template.name: {{ index_name }} setup.template.pattern: {{ index_name }}* + +setup.kibana.host: {{ kibana_url }} + +{% if kibana_path %} +setup.dashboards.directory: {{ kibana_path }} +{% endif %} diff --git a/filebeat/tests/system/test_modules.py b/filebeat/tests/system/test_modules.py index 582a80b444e8..ac6c2bfaf0c5 100644 --- a/filebeat/tests/system/test_modules.py +++ b/filebeat/tests/system/test_modules.py @@ -3,6 +3,7 @@ import os import unittest import glob +import shutil import subprocess from elasticsearch import Elasticsearch import json @@ -13,6 +14,7 @@ class Test(BaseTest): def init(self): self.elasticsearch_url = self.get_elasticsearch_url() + self.kibana_url = self.get_kibana_url() print("Using elasticsearch: {}".format(self.elasticsearch_url)) self.es = Elasticsearch([self.elasticsearch_url]) logging.getLogger("urllib3").setLevel(logging.WARNING) @@ -21,6 +23,9 @@ def init(self): self.modules_path = os.path.abspath(self.working_dir + "/../../../../module") + self.kibana_path = os.path.abspath(self.working_dir + + "/../../../../_meta/kibana") + self.filebeat = os.path.abspath(self.working_dir + "/../../../../filebeat.test") @@ -206,35 +211,70 @@ def search_objects(): @unittest.skipIf(not INTEGRATION_TESTS or os.getenv("TESTING_ENVIRONMENT") == "2x", "integration test not available on 2.x") - def test_setup_machine_learning_nginx(self): - """ - Tests that setup works and loads nginx dashboards. - """ + def test_ml_setup(self): + """ Test ML are installed in all possible ways """ + for setup_flag in (True, False): + for modules_flag in (True, False): + self._run_ml_test(setup_flag, modules_flag) + + def _run_ml_test(self, setup_flag, modules_flag): self.init() + + # Clean any previous state + for df in self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/")["datafeeds"]: + if df["datafeed_id"] == 'filebeat-nginx-access-response_code': + self.es.transport.perform_request("DELETE", "/_xpack/ml/datafeeds/" + df["datafeed_id"]) + + for df in self.es.transport.perform_request("GET", "/_xpack/ml/anomaly_detectors/")["jobs"]: + if df["job_id"] == 'datafeed-filebeat-nginx-access-response_code': + self.es.transport.perform_request("DELETE", "/_xpack/ml/anomaly_detectors/" + df["job_id"]) + + shutil.rmtree(os.path.join(self.working_dir, "modules.d"), ignore_errors=True) + # generate a minimal configuration cfgfile = os.path.join(self.working_dir, "filebeat.yml") self.render_config_template( template_name="filebeat_modules", output=cfgfile, index_name=self.index_name, - elasticsearch_url=self.elasticsearch_url) + elasticsearch_url=self.elasticsearch_url, + kibana_url=self.kibana_url, + kibana_path=self.kibana_path) + + if not modules_flag: + # Enable nginx + os.mkdir(os.path.join(self.working_dir, "modules.d")) + with open(os.path.join(self.working_dir, "modules.d/nginx.yml"), "wb") as nginx: + nginx.write("- module: nginx") cmd = [ self.filebeat, "-systemTest", "-e", "-d", "*", - "-c", cfgfile, - "setup", "--modules=nginx", "--machine-learning"] + "-c", cfgfile + ] - output = open(os.path.join(self.working_dir, "output.log"), "ab") - output.write(" ".join(cmd) + "\n") - subprocess.Popen(cmd, - stdin=None, - stdout=output, - stderr=subprocess.STDOUT, - bufsize=0).wait() + if setup_flag: + cmd += ["--setup"] + else: + cmd += ["setup", "--machine-learning"] - jobs = self.es.transport.perform_request("GET", "/_xpack/ml/anomaly_detectors/") - assert "filebeat-nginx-access-response_code" in (job["job_id"] for job in jobs["jobs"]) + if modules_flag: + cmd += ["--modules=nginx"] - datafeeds = self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/") - assert "filebeat-nginx-access-response_code" in (df["job_id"] for df in datafeeds["datafeeds"]) + output = open(os.path.join(self.working_dir, "output.log"), "ab") + output.write(" ".join(cmd) + "\n") + beat = subprocess.Popen(cmd, + stdin=None, + stdout=output, + stderr=output, + bufsize=0) + + # Check result + self.wait_until(lambda: "filebeat-nginx-access-response_code" in + (df["job_id"] for df in self.es.transport.perform_request( + "GET", "/_xpack/ml/anomaly_detectors/")["jobs"]), + max_timeout=30) + self.wait_until(lambda: "datafeed-filebeat-nginx-access-response_code" in + (df["datafeed_id"] for df in self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/")["datafeeds"])) + + beat.kill() diff --git a/libbeat/tests/system/beat/beat.py b/libbeat/tests/system/beat/beat.py index 8fd617b20a09..f3260130e918 100644 --- a/libbeat/tests/system/beat/beat.py +++ b/libbeat/tests/system/beat/beat.py @@ -542,3 +542,12 @@ def get_elasticsearch_url(self): host=os.getenv("ES_HOST", "localhost"), port=os.getenv("ES_PORT", "9200"), ) + + def get_kibana_url(self): + """ + Returns kibana host URL + """ + return "http://{host}:{port}".format( + host=os.getenv("KIBANA_HOST", "localhost"), + port=os.getenv("KIBANA_PORT", "5601"), + )