Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #5509 to 6.0: Fix ML jobs setup for dynamic modules #5518

Merged
merged 2 commits into from
Nov 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ https://github.com/elastic/beats/compare/v6.0.0-rc2...master[Check the HEAD diff

*Filebeat*

- Fix machine learning jobs setup for dynamic modules. {pull}5509[5509]

*Heartbeat*

*Metricbeat*
Expand Down
42 changes: 37 additions & 5 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"flag"
"fmt"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -99,10 +101,8 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
}

// register `setup` callback for ML jobs
if !moduleRegistry.Empty() {
b.SetupMLCallback = func(b *beat.Beat) error {
return fb.loadModulesML(b)
}
b.SetupMLCallback = func(b *beat.Beat) error {
return fb.loadModulesML(b)
}
return fb, nil
}
Expand All @@ -127,6 +127,7 @@ func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {

func (fb *Filebeat) loadModulesML(b *beat.Beat) error {
logp.Debug("machine-learning", "Setting up ML jobs for modules")
var errs multierror.Errors

if b.Config.Output.Name() != "elasticsearch" {
logp.Warn("Filebeat is unable to load the Xpack Machine Learning configurations for the" +
Expand All @@ -139,8 +140,39 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat) error {
if err != nil {
return errors.Errorf("Error creating Elasticsearch client: %v", err)
}
if err := fb.moduleRegistry.LoadML(esClient); err != nil {
errs = append(errs, err)
}

// Add dynamic modules.d
if fb.config.ConfigModules.Enabled() {
config := cfgfile.DefaultDynamicConfig
fb.config.ConfigModules.Unpack(&config)

modulesManager, err := cfgfile.NewGlobManager(config.Path, ".yml", ".disabled")
if err != nil {
return errors.Wrap(err, "initialization error")
}

for _, file := range modulesManager.ListEnabled() {
confs, err := cfgfile.LoadList(file.Path)
if err != nil {
errs = append(errs, errors.Wrap(err, "error loading config file"))
continue
}
set, err := fileset.NewModuleRegistry(confs, "", false)
if err != nil {
errs = append(errs, err)
continue
}

if err := set.LoadML(esClient); err != nil {
errs = append(errs, err)
}
}
}

return fb.moduleRegistry.LoadML(esClient)
return errs.Err()
}

// Run allows the beater to be run as a beat.
Expand Down
9 changes: 9 additions & 0 deletions filebeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ services:
env_file:
- ${PWD}/build/test.env
- ${PWD}/prospector/redis/_meta/env
environment:
- KIBANA_HOST=kibana
- KIBANA_PORT=5601
working_dir: /go/src/github.com/elastic/beats/filebeat
volumes:
- ${PWD}/..:/go/src/github.com/elastic/beats/
Expand All @@ -18,12 +21,18 @@ services:
image: busybox
depends_on:
elasticsearch: { condition: service_healthy }
kibana: { condition: service_healthy }
redis: { condition: service_healthy }

elasticsearch:
extends:
file: ../testing/environments/${TESTING_ENVIRONMENT}.yml
service: elasticsearch

kibana:
extends:
file: ../testing/environments/${TESTING_ENVIRONMENT}.yml
service: kibana

redis:
build: ${PWD}/prospector/redis/_meta
9 changes: 9 additions & 0 deletions filebeat/tests/system/config/filebeat_modules.yml.j2
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("registry")}}

filebeat.config.modules:
path: {{ beat.working_dir + '/modules.d/*.yml' }}

output.elasticsearch.hosts: ["{{ elasticsearch_url }}"]
output.elasticsearch.index: {{ index_name }}

setup.template.name: {{ index_name }}
setup.template.pattern: {{ index_name }}*

setup.kibana.host: {{ kibana_url }}

{% if kibana_path %}
setup.dashboards.directory: {{ kibana_path }}
{% endif %}
76 changes: 58 additions & 18 deletions filebeat/tests/system/test_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import unittest
import glob
import shutil
import subprocess
from elasticsearch import Elasticsearch
import json
Expand All @@ -13,6 +14,7 @@ class Test(BaseTest):

def init(self):
self.elasticsearch_url = self.get_elasticsearch_url()
self.kibana_url = self.get_kibana_url()
print("Using elasticsearch: {}".format(self.elasticsearch_url))
self.es = Elasticsearch([self.elasticsearch_url])
logging.getLogger("urllib3").setLevel(logging.WARNING)
Expand All @@ -21,6 +23,9 @@ def init(self):
self.modules_path = os.path.abspath(self.working_dir +
"/../../../../module")

self.kibana_path = os.path.abspath(self.working_dir +
"/../../../../_meta/kibana")

self.filebeat = os.path.abspath(self.working_dir +
"/../../../../filebeat.test")

Expand Down Expand Up @@ -197,35 +202,70 @@ def search_objects():
@unittest.skipIf(not INTEGRATION_TESTS or
os.getenv("TESTING_ENVIRONMENT") == "2x",
"integration test not available on 2.x")
def test_setup_machine_learning_nginx(self):
"""
Tests that setup works and loads nginx dashboards.
"""
def test_ml_setup(self):
""" Test ML are installed in all possible ways """
for setup_flag in (True, False):
for modules_flag in (True, False):
self._run_ml_test(setup_flag, modules_flag)

def _run_ml_test(self, setup_flag, modules_flag):
self.init()

# Clean any previous state
for df in self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/")["datafeeds"]:
if df["datafeed_id"] == 'filebeat-nginx-access-response_code':
self.es.transport.perform_request("DELETE", "/_xpack/ml/datafeeds/" + df["datafeed_id"])

for df in self.es.transport.perform_request("GET", "/_xpack/ml/anomaly_detectors/")["jobs"]:
if df["job_id"] == 'datafeed-filebeat-nginx-access-response_code':
self.es.transport.perform_request("DELETE", "/_xpack/ml/anomaly_detectors/" + df["job_id"])

shutil.rmtree(os.path.join(self.working_dir, "modules.d"), ignore_errors=True)

# generate a minimal configuration
cfgfile = os.path.join(self.working_dir, "filebeat.yml")
self.render_config_template(
template_name="filebeat_modules",
output=cfgfile,
index_name=self.index_name,
elasticsearch_url=self.elasticsearch_url)
elasticsearch_url=self.elasticsearch_url,
kibana_url=self.kibana_url,
kibana_path=self.kibana_path)

if not modules_flag:
# Enable nginx
os.mkdir(os.path.join(self.working_dir, "modules.d"))
with open(os.path.join(self.working_dir, "modules.d/nginx.yml"), "wb") as nginx:
nginx.write("- module: nginx")

cmd = [
self.filebeat, "-systemTest",
"-e", "-d", "*",
"-c", cfgfile,
"setup", "--modules=nginx", "--machine-learning"]
"-c", cfgfile
]

output = open(os.path.join(self.working_dir, "output.log"), "ab")
output.write(" ".join(cmd) + "\n")
subprocess.Popen(cmd,
stdin=None,
stdout=output,
stderr=subprocess.STDOUT,
bufsize=0).wait()
if setup_flag:
cmd += ["--setup"]
else:
cmd += ["setup", "--machine-learning"]

jobs = self.es.transport.perform_request("GET", "/_xpack/ml/anomaly_detectors/")
assert "filebeat-nginx-access-response_code" in (job["job_id"] for job in jobs["jobs"])
if modules_flag:
cmd += ["--modules=nginx"]

datafeeds = self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/")
assert "filebeat-nginx-access-response_code" in (df["job_id"] for df in datafeeds["datafeeds"])
output = open(os.path.join(self.working_dir, "output.log"), "ab")
output.write(" ".join(cmd) + "\n")
beat = subprocess.Popen(cmd,
stdin=None,
stdout=output,
stderr=output,
bufsize=0)

# Check result
self.wait_until(lambda: "filebeat-nginx-access-response_code" in
(df["job_id"] for df in self.es.transport.perform_request(
"GET", "/_xpack/ml/anomaly_detectors/")["jobs"]),
max_timeout=30)
self.wait_until(lambda: "datafeed-filebeat-nginx-access-response_code" in
(df["datafeed_id"] for df in self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/")["datafeeds"]))

beat.kill()
9 changes: 9 additions & 0 deletions libbeat/tests/system/beat/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,3 +530,12 @@ def get_elasticsearch_url(self):
host=os.getenv("ES_HOST", "localhost"),
port=os.getenv("ES_PORT", "9200"),
)

def get_kibana_url(self):
"""
Returns kibana host URL
"""
return "http://{host}:{port}".format(
host=os.getenv("KIBANA_HOST", "localhost"),
port=os.getenv("KIBANA_PORT", "5601"),
)