Skip to content

Commit

Permalink
[MapReduce] Adds skip ssl validation (#1470)
Browse files Browse the repository at this point in the history
* adds ssl_verify

* bumps version, updates changelog

* adds verify to mapreduce
  • Loading branch information
gmmeyer authored Apr 27, 2018
1 parent ba2fc8e commit 76b3e4e
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 16 deletions.
7 changes: 7 additions & 0 deletions mapreduce/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# CHANGELOG - mapreduce

1.2.0 / Unreleased
==================

### Changes

* [FEATURE] Adds an SSL verification option

1.1.0 / 2018-03-23
==================

Expand Down
4 changes: 4 additions & 0 deletions mapreduce/conf.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ instances:
# - key:value
# - instance:production

# Whether to enable SSL certificate verification for HTTP requests. Defaults to true, you may
# need to set to false when using self-signed certs
# ssl_verify: true

init_config:
#
# Optional metrics can be specified for counters. For more information on
Expand Down
2 changes: 1 addition & 1 deletion mapreduce/datadog_checks/mapreduce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

MapReduceCheck = mapreduce.MapReduceCheck

__version__ = "1.1.0"
__version__ = "1.2.0"

__all__ = ['mapreduce']
36 changes: 22 additions & 14 deletions mapreduce/datadog_checks/mapreduce/mapreduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ def check(self, instance):

collect_task_metrics = _is_affirmative(instance.get('collect_task_metrics', False))

ssl_verify = _is_affirmative(instance.get('ssl_verify', True))

# Get additional tags from the conf file
custom_tags = instance.get('tags') or [] # this handles the case when the YAML `tags` key has an empty value
tags = list(set(custom_tags)) if custom_tags else []
Expand All @@ -144,7 +146,7 @@ def check(self, instance):
tags.append('cluster_name:%s' % cluster_name)

# Get the running MR applications from YARN
running_apps = self._get_running_app_ids(rm_address)
running_apps = self._get_running_app_ids(rm_address, ssl_verify=ssl_verify)

# Report success after gathering all metrics from ResourceManaager
self.service_check(YARN_SERVICE_CHECK,
Expand All @@ -153,14 +155,14 @@ def check(self, instance):
message='Connection to ResourceManager "%s" was successful' % rm_address)

# Get the applications from the application master
running_jobs = self._mapreduce_job_metrics(running_apps, tags)
running_jobs = self._mapreduce_job_metrics(running_apps, tags, ssl_verify=ssl_verify)

# # Get job counter metrics
self._mapreduce_job_counters_metrics(running_jobs, tags)
self._mapreduce_job_counters_metrics(running_jobs, tags, ssl_verify=ssl_verify)

# Get task metrics
if collect_task_metrics:
self._mapreduce_task_metrics(running_jobs, tags)
self._mapreduce_task_metrics(running_jobs, tags, ssl_verify=ssl_verify)

# Report success after gathering all metrics from Application Master
if running_jobs:
Expand Down Expand Up @@ -266,15 +268,16 @@ def _parse_job_specific_counters(self, init_config):

return job_counter

def _get_running_app_ids(self, rm_address, **kwargs):
def _get_running_app_ids(self, rm_address, ssl_verify=False, **kwargs):
'''
Return a dictionary of {app_id: (app_name, tracking_url)} for the running MapReduce applications
'''
metrics_json = self._rest_request_to_json(rm_address,
YARN_APPS_PATH,
YARN_SERVICE_CHECK,
states=YARN_APPLICATION_STATES,
applicationTypes=YARN_APPLICATION_TYPES)
applicationTypes=YARN_APPLICATION_TYPES,
ssl_verify=ssl_verify)

running_apps = {}

Expand All @@ -291,7 +294,7 @@ def _get_running_app_ids(self, rm_address, **kwargs):

return running_apps

def _mapreduce_job_metrics(self, running_apps, addl_tags):
def _mapreduce_job_metrics(self, running_apps, addl_tags, ssl_verify=True):
'''
Get metrics for each MapReduce job.
Return a dictionary for each MapReduce job
Expand All @@ -309,7 +312,8 @@ def _mapreduce_job_metrics(self, running_apps, addl_tags):

metrics_json = self._rest_request_to_json(tracking_url,
MAPREDUCE_JOBS_PATH,
MAPREDUCE_SERVICE_CHECK)
MAPREDUCE_SERVICE_CHECK,
ssl_verify=ssl_verify)

if metrics_json.get('jobs'):
if metrics_json['jobs'].get('job'):
Expand Down Expand Up @@ -337,7 +341,7 @@ def _mapreduce_job_metrics(self, running_apps, addl_tags):

return running_jobs

def _mapreduce_job_counters_metrics(self, running_jobs, addl_tags):
def _mapreduce_job_counters_metrics(self, running_jobs, addl_tags, ssl_verify=True):
'''
Get custom metrics specified for each counter
'''
Expand All @@ -350,7 +354,9 @@ def _mapreduce_job_counters_metrics(self, running_jobs, addl_tags):

metrics_json = self._rest_request_to_json(job_metrics['tracking_url'],
'counters',
MAPREDUCE_SERVICE_CHECK, tags=addl_tags)
MAPREDUCE_SERVICE_CHECK,
tags=addl_tags,
ssl_verify=ssl_verify)

if metrics_json.get('jobCounters'):
if metrics_json['jobCounters'].get('counterGroup'):
Expand Down Expand Up @@ -389,7 +395,7 @@ def _mapreduce_job_counters_metrics(self, running_jobs, addl_tags):
counter,
MAPREDUCE_JOB_COUNTER_METRICS)

def _mapreduce_task_metrics(self, running_jobs, addl_tags):
def _mapreduce_task_metrics(self, running_jobs, addl_tags, ssl_verify=True):
'''
Get metrics for each MapReduce task
Return a dictionary of {task_id: 'tracking_url'} for each MapReduce task
Expand All @@ -398,7 +404,9 @@ def _mapreduce_task_metrics(self, running_jobs, addl_tags):

metrics_json = self._rest_request_to_json(job_stats['tracking_url'],
'tasks',
MAPREDUCE_SERVICE_CHECK, tags=addl_tags)
MAPREDUCE_SERVICE_CHECK,
tags=addl_tags,
ssl_verify=ssl_verify)

if metrics_json.get('tasks'):
if metrics_json['tasks'].get('task'):
Expand Down Expand Up @@ -444,7 +452,7 @@ def _set_metric(self, metric_name, metric_type, value, tags=None, device_name=No
else:
self.log.error('Metric type "%s" unknown' % (metric_type))

def _rest_request_to_json(self, address, object_path, service_name, tags=[], *args, **kwargs):
def _rest_request_to_json(self, address, object_path, service_name, tags=[], ssl_verify=True, *args, **kwargs):
'''
Query the given URL and return the JSON response
'''
Expand All @@ -470,7 +478,7 @@ def _rest_request_to_json(self, address, object_path, service_name, tags=[], *ar
url = urljoin(url, '?' + query)

try:
response = requests.get(url, timeout=self.default_integration_http_timeout)
response = requests.get(url, timeout=self.default_integration_http_timeout, verify=ssl_verify)
response.raise_for_status()
response_json = response.json()

Expand Down
2 changes: 1 addition & 1 deletion mapreduce/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"mac_os",
"windows"
],
"version": "1.1.0",
"version": "1.2.0",
"guid": "1c143492-84ac-42d2-89d5-a45c718092b0",
"public_title": "Datadog-Map Reduce Integration",
"categories":["processing"],
Expand Down

0 comments on commit 76b3e4e

Please sign in to comment.