Skip to content

Commit

Permalink
System test for elasticsearch module, xpack code path (#12482)
Browse files Browse the repository at this point in the history
* System test for elasticsearch xpack code path

* Fixing formatting

* Adding integration test skip annotation
  • Loading branch information
ycombinator authored Jun 17, 2019
1 parent 8707c9b commit d8d48fc
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 38 deletions.
132 changes: 94 additions & 38 deletions metricbeat/module/elasticsearch/test_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys
import os
import unittest
from elasticsearch import Elasticsearch, TransportError
from elasticsearch import Elasticsearch, TransportError, client
from parameterized import parameterized
from nose.plugins.skip import SkipTest
import urllib2
Expand All @@ -19,6 +19,28 @@ class Test(metricbeat.BaseTest):
COMPOSE_SERVICES = ['elasticsearch']
FIELDS = ["elasticsearch"]

def setUp(self):
super(Test, self).setUp()
self.es = Elasticsearch(self.get_hosts())
self.ml_es = client.xpack.ml.MlClient(self.es)

es_version = self.get_version()
if es_version["major"] < 7:
self.license_url = "/_xpack/license"
self.ml_anomaly_detectors_url = "/_xpack/ml/anomaly_detectors"
else:
self.license_url = "/_license"
self.ml_anomaly_detectors_url = "/_ml/anomaly_detectors"

self.start_trial()
self.es.indices.create(index='test_index', ignore=400)

def tearDown(self):
self.ccr_unfollow_index()
self.es.indices.delete(index='test_index,pied_piper,rats', ignore_unavailable=True)
self.delete_ml_job()
super(Test, self).tearDown()

@parameterized.expand([
"ccr",
"index",
Expand All @@ -34,28 +56,53 @@ def test_metricsets(self, metricset):
"""
elasticsearch metricset tests
"""
es = Elasticsearch(self.get_hosts())
self.check_skip(metricset, es)
self.check_skip(metricset)

self.start_trial(es)
if metricset == "ml_job":
self.create_ml_job(es)
self.create_ml_job()
if metricset == "ccr":
self.create_ccr_stats(es)
self.create_ccr_stats()

es.indices.create(index='test-index', ignore=400)
self.check_metricset("elasticsearch", metricset, self.get_hosts(), self.FIELDS +
["service"], extras={"index_recovery.active_only": "false"})

def create_ml_job(self, es):
es_version = self.get_version(es)
if es_version["major"] < 7:
ml_anomaly_detectors_url = "/_xpack/ml/anomaly_detectors"
else:
ml_anomaly_detectors_url = "/_ml/anomaly_detectors"
@unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test")
def test_xpack(self):
"""
elasticsearch-xpack module tests
"""
es = Elasticsearch(self.get_hosts())

self.create_ml_job()
self.create_ccr_stats()

self.render_config_template(modules=[{
"name": "elasticsearch",
"metricsets": [
"ccr",
"cluster_stats",
"index",
"index_recovery",
"index_summary",
"ml_job",
"node_stats",
"shard"
],
"hosts": self.get_hosts(),
"period": "1s",
"extras": {
"xpack.enabled": "true"
}
}])

proc = self.start_beat()
self.wait_until(lambda: self.output_lines() > 0)
proc.check_kill_and_wait()
self.assert_no_logged_warnings()

def create_ml_job(self):
# Check if an ml job already exists
response = es.transport.perform_request('GET', ml_anomaly_detectors_url + "/_all")
response = self.ml_es.get_jobs()
if response["count"] > 0:
return

Expand All @@ -65,15 +112,21 @@ def create_ml_job(self, es):
with open(file, 'r') as f:
body = json.load(f)

path = ml_anomaly_detectors_url + "/test"
es.transport.perform_request('PUT', path, body=body)
self.ml_es.put_job(job_id='test', body=body)

def delete_ml_job(self):
response = self.ml_es.get_jobs()
if response["count"] == 0:
return

self.ml_es.delete_job(job_id='test')

def create_ccr_stats(self, es):
self.setup_ccr_remote(es)
self.create_ccr_leader_index(es)
self.create_ccr_follower_index(es)
def create_ccr_stats(self):
self.setup_ccr_remote()
self.create_ccr_leader_index()
self.create_ccr_follower_index()

def setup_ccr_remote(self, es):
def setup_ccr_remote(self):
file = os.path.join(self.beat_path, "module", "elasticsearch", "ccr",
"_meta", "test", "test_remote_settings.json")

Expand All @@ -82,19 +135,19 @@ def setup_ccr_remote(self, es):
body = json.load(f)

path = "/_cluster/settings"
es.transport.perform_request('PUT', path, body=body)
self.es.transport.perform_request('PUT', path, body=body)

def create_ccr_leader_index(self, es):
def create_ccr_leader_index(self):
file = os.path.join(self.beat_path, "module", "elasticsearch", "ccr", "_meta", "test", "test_leader_index.json")

body = {}
with open(file, 'r') as f:
body = json.load(f)

path = "/pied_piper"
es.transport.perform_request('PUT', path, body=body)
self.es.transport.perform_request('PUT', path, body=body)

def create_ccr_follower_index(self, es):
def create_ccr_follower_index(self):
file = os.path.join(self.beat_path, "module", "elasticsearch", "ccr",
"_meta", "test", "test_follower_index.json")

Expand All @@ -103,37 +156,40 @@ def create_ccr_follower_index(self, es):
body = json.load(f)

path = "/rats/_ccr/follow"
es.transport.perform_request('PUT', path, body=body)
self.es.transport.perform_request('PUT', path, body=body)

def start_trial(self, es):
es_version = self.get_version(es)
if es_version["major"] < 7:
license_url = "/_xpack/license"
else:
license_url = "/_license"
def ccr_unfollow_index(self):
exists = self.es.indices.exists('rats')
if not exists:
return

self.es.transport.perform_request('POST', '/rats/_ccr/pause_follow')
self.es.indices.close('rats')
self.es.transport.perform_request('POST', '/rats/_ccr/unfollow')

def start_trial(self):
# Check if trial is already enabled
response = es.transport.perform_request('GET', license_url)
response = self.es.transport.perform_request('GET', self.license_url)
if response["license"]["type"] == "trial":
return

# Enable xpack trial
try:
es.transport.perform_request('POST', license_url + "/start_trial?acknowledge=true")
self.es.transport.perform_request('POST', self.license_url + "/start_trial?acknowledge=true")
except:
e = sys.exc_info()[0]
print "Trial already enabled. Error: {}".format(e)

def check_skip(self, metricset, es):
def check_skip(self, metricset):
if metricset != "ccr":
return

es_version = self.get_version(es)
es_version = self.get_version()
if es_version["major"] <= 6 and es_version["minor"] < 5:
# Skip CCR metricset system test for Elasticsearch versions < 6.5.0 as CCR Stats
# API endpoint is not available
raise SkipTest("elasticsearch/ccr metricset system test only valid with Elasticsearch versions >= 6.5.0")

def get_version(self, es):
es_info = es.info()
def get_version(self):
es_info = self.es.info()
return semver.parse(es_info["version"]["number"])
6 changes: 6 additions & 0 deletions metricbeat/tests/system/metricbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ def setUpClass(self):

super(BaseTest, self).setUpClass()

def setUp(self):
super(BaseTest, self).setUp()

def tearDown(self):
super(BaseTest, self).tearDown()

def de_dot(self, existing_fields):
fields = {}

Expand Down

0 comments on commit d8d48fc

Please sign in to comment.