From d8d48fc9939d55ca2dda258d1d500918bd5aeeea Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 17 Jun 2019 14:30:54 -0700 Subject: [PATCH] System test for elasticsearch module, xpack code path (#12482) * System test for elasticsearch xpack code path * Fixing formatting * Adding integration test skip annotation --- .../elasticsearch/test_elasticsearch.py | 132 +++++++++++++----- metricbeat/tests/system/metricbeat.py | 6 + 2 files changed, 100 insertions(+), 38 deletions(-) diff --git a/metricbeat/module/elasticsearch/test_elasticsearch.py b/metricbeat/module/elasticsearch/test_elasticsearch.py index 50c337ae91c..af875ececca 100644 --- a/metricbeat/module/elasticsearch/test_elasticsearch.py +++ b/metricbeat/module/elasticsearch/test_elasticsearch.py @@ -2,7 +2,7 @@ import sys import os import unittest -from elasticsearch import Elasticsearch, TransportError +from elasticsearch import Elasticsearch, TransportError, client from parameterized import parameterized from nose.plugins.skip import SkipTest import urllib2 @@ -19,6 +19,28 @@ class Test(metricbeat.BaseTest): COMPOSE_SERVICES = ['elasticsearch'] FIELDS = ["elasticsearch"] + def setUp(self): + super(Test, self).setUp() + self.es = Elasticsearch(self.get_hosts()) + self.ml_es = client.xpack.ml.MlClient(self.es) + + es_version = self.get_version() + if es_version["major"] < 7: + self.license_url = "/_xpack/license" + self.ml_anomaly_detectors_url = "/_xpack/ml/anomaly_detectors" + else: + self.license_url = "/_license" + self.ml_anomaly_detectors_url = "/_ml/anomaly_detectors" + + self.start_trial() + self.es.indices.create(index='test_index', ignore=400) + + def tearDown(self): + self.ccr_unfollow_index() + self.es.indices.delete(index='test_index,pied_piper,rats', ignore_unavailable=True) + self.delete_ml_job() + super(Test, self).tearDown() + @parameterized.expand([ "ccr", "index", @@ -34,28 +56,53 @@ def test_metricsets(self, metricset): """ elasticsearch metricset tests """ - es = Elasticsearch(self.get_hosts()) - self.check_skip(metricset, es) + self.check_skip(metricset) - self.start_trial(es) if metricset == "ml_job": - self.create_ml_job(es) + self.create_ml_job() if metricset == "ccr": - self.create_ccr_stats(es) + self.create_ccr_stats() - es.indices.create(index='test-index', ignore=400) self.check_metricset("elasticsearch", metricset, self.get_hosts(), self.FIELDS + ["service"], extras={"index_recovery.active_only": "false"}) - def create_ml_job(self, es): - es_version = self.get_version(es) - if es_version["major"] < 7: - ml_anomaly_detectors_url = "/_xpack/ml/anomaly_detectors" - else: - ml_anomaly_detectors_url = "/_ml/anomaly_detectors" + @unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test") + def test_xpack(self): + """ + elasticsearch-xpack module tests + """ + es = Elasticsearch(self.get_hosts()) + self.create_ml_job() + self.create_ccr_stats() + + self.render_config_template(modules=[{ + "name": "elasticsearch", + "metricsets": [ + "ccr", + "cluster_stats", + "index", + "index_recovery", + "index_summary", + "ml_job", + "node_stats", + "shard" + ], + "hosts": self.get_hosts(), + "period": "1s", + "extras": { + "xpack.enabled": "true" + } + }]) + + proc = self.start_beat() + self.wait_until(lambda: self.output_lines() > 0) + proc.check_kill_and_wait() + self.assert_no_logged_warnings() + + def create_ml_job(self): # Check if an ml job already exists - response = es.transport.perform_request('GET', ml_anomaly_detectors_url + "/_all") + response = self.ml_es.get_jobs() if response["count"] > 0: return @@ -65,15 +112,21 @@ def create_ml_job(self, es): with open(file, 'r') as f: body = json.load(f) - path = ml_anomaly_detectors_url + "/test" - es.transport.perform_request('PUT', path, body=body) + self.ml_es.put_job(job_id='test', body=body) + + def delete_ml_job(self): + response = self.ml_es.get_jobs() + if response["count"] == 0: + return + + self.ml_es.delete_job(job_id='test') - def create_ccr_stats(self, es): - self.setup_ccr_remote(es) - self.create_ccr_leader_index(es) - self.create_ccr_follower_index(es) + def create_ccr_stats(self): + self.setup_ccr_remote() + self.create_ccr_leader_index() + self.create_ccr_follower_index() - def setup_ccr_remote(self, es): + def setup_ccr_remote(self): file = os.path.join(self.beat_path, "module", "elasticsearch", "ccr", "_meta", "test", "test_remote_settings.json") @@ -82,9 +135,9 @@ def setup_ccr_remote(self, es): body = json.load(f) path = "/_cluster/settings" - es.transport.perform_request('PUT', path, body=body) + self.es.transport.perform_request('PUT', path, body=body) - def create_ccr_leader_index(self, es): + def create_ccr_leader_index(self): file = os.path.join(self.beat_path, "module", "elasticsearch", "ccr", "_meta", "test", "test_leader_index.json") body = {} @@ -92,9 +145,9 @@ def create_ccr_leader_index(self, es): body = json.load(f) path = "/pied_piper" - es.transport.perform_request('PUT', path, body=body) + self.es.transport.perform_request('PUT', path, body=body) - def create_ccr_follower_index(self, es): + def create_ccr_follower_index(self): file = os.path.join(self.beat_path, "module", "elasticsearch", "ccr", "_meta", "test", "test_follower_index.json") @@ -103,37 +156,40 @@ def create_ccr_follower_index(self, es): body = json.load(f) path = "/rats/_ccr/follow" - es.transport.perform_request('PUT', path, body=body) + self.es.transport.perform_request('PUT', path, body=body) - def start_trial(self, es): - es_version = self.get_version(es) - if es_version["major"] < 7: - license_url = "/_xpack/license" - else: - license_url = "/_license" + def ccr_unfollow_index(self): + exists = self.es.indices.exists('rats') + if not exists: + return + + self.es.transport.perform_request('POST', '/rats/_ccr/pause_follow') + self.es.indices.close('rats') + self.es.transport.perform_request('POST', '/rats/_ccr/unfollow') + def start_trial(self): # Check if trial is already enabled - response = es.transport.perform_request('GET', license_url) + response = self.es.transport.perform_request('GET', self.license_url) if response["license"]["type"] == "trial": return # Enable xpack trial try: - es.transport.perform_request('POST', license_url + "/start_trial?acknowledge=true") + self.es.transport.perform_request('POST', self.license_url + "/start_trial?acknowledge=true") except: e = sys.exc_info()[0] print "Trial already enabled. Error: {}".format(e) - def check_skip(self, metricset, es): + def check_skip(self, metricset): if metricset != "ccr": return - es_version = self.get_version(es) + es_version = self.get_version() if es_version["major"] <= 6 and es_version["minor"] < 5: # Skip CCR metricset system test for Elasticsearch versions < 6.5.0 as CCR Stats # API endpoint is not available raise SkipTest("elasticsearch/ccr metricset system test only valid with Elasticsearch versions >= 6.5.0") - def get_version(self, es): - es_info = es.info() + def get_version(self): + es_info = self.es.info() return semver.parse(es_info["version"]["number"]) diff --git a/metricbeat/tests/system/metricbeat.py b/metricbeat/tests/system/metricbeat.py index ae10edb8698..e54007987c5 100644 --- a/metricbeat/tests/system/metricbeat.py +++ b/metricbeat/tests/system/metricbeat.py @@ -27,6 +27,12 @@ def setUpClass(self): super(BaseTest, self).setUpClass() + def setUp(self): + super(BaseTest, self).setUp() + + def tearDown(self): + super(BaseTest, self).tearDown() + def de_dot(self, existing_fields): fields = {}