Skip to content

Commit

Permalink
Merge pull request #3041 from DataDog/feature/configure_application_tags
Browse files Browse the repository at this point in the history
[yarn] Add configurable application tags
  • Loading branch information
degemer authored Dec 12, 2016
2 parents a687cd0 + 0358b58 commit 0369d53
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 28 deletions.
70 changes: 49 additions & 21 deletions checks.d/yarn.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,33 @@ class YarnCheck(AgentCheck):
'''
Extract statistics from YARN's ResourceManger REST API
'''
_ALLOWED_APPLICATION_TAGS = [
'applicationTags',
'applicationType',
'name',
'queue',
'user'
]

def check(self, instance):

# Get properties from conf file
rm_address = instance.get('resourcemanager_uri', DEFAULT_RM_URI)
app_tags = instance.get('application_tags', {})

if type(app_tags) is not dict:
self.log.error('application_tags is incorrect: %s is not a dictionary', app_tags)
app_tags = {}

filtered_app_tags = {}
for dd_prefix, yarn_key in app_tags.iteritems():
if yarn_key in self._ALLOWED_APPLICATION_TAGS:
filtered_app_tags[dd_prefix] = yarn_key
app_tags = filtered_app_tags

# Collected by default
app_tags['app_name'] = 'name'


# Get additional tags from the conf file
tags = instance.get('tags', [])
Expand All @@ -166,7 +188,7 @@ def check(self, instance):

# Get metrics from the Resource Manager
self._yarn_cluster_metrics(rm_address, tags)
self._yarn_app_metrics(rm_address, tags)
self._yarn_app_metrics(rm_address, app_tags, tags)
self._yarn_node_metrics(rm_address, tags)

def _yarn_cluster_metrics(self, rm_address, addl_tags):
Expand All @@ -182,44 +204,50 @@ def _yarn_cluster_metrics(self, rm_address, addl_tags):
if yarn_metrics is not None:
self._set_yarn_metrics_from_json(addl_tags, yarn_metrics, YARN_CLUSTER_METRICS)

def _yarn_app_metrics(self, rm_address, addl_tags):
def _yarn_app_metrics(self, rm_address, app_tags, addl_tags):
'''
Get metrics for running applications
'''
metrics_json = self._rest_request_to_json(rm_address,
metrics_json = self._rest_request_to_json(
rm_address,
YARN_APPS_PATH,
states=YARN_APPLICATION_STATES)
states=YARN_APPLICATION_STATES
)

if metrics_json:
if metrics_json['apps'] is not None:
if metrics_json['apps']['app'] is not None:
if (metrics_json and metrics_json['apps'] is not None and
metrics_json['apps']['app'] is not None):

for app_json in metrics_json['apps']['app']:
for app_json in metrics_json['apps']['app']:

app_name = app_json['name']
tags = []
for dd_tag, yarn_key in app_tags.iteritems():
try:
tags.append("{tag}:{value}".format(
tag=dd_tag, value=app_json[yarn_key]
))
except KeyError:
self.log.error("Invalid value %s for application_tag", yarn_key)

tags = ['app_name:%s' % str(app_name)]
tags.extend(addl_tags)
tags.extend(addl_tags)

self._set_yarn_metrics_from_json(tags, app_json, YARN_APP_METRICS)
self._set_yarn_metrics_from_json(tags, app_json, YARN_APP_METRICS)

def _yarn_node_metrics(self, rm_address, addl_tags):
'''
Get metrics related to YARN nodes
'''
metrics_json = self._rest_request_to_json(rm_address, YARN_NODES_PATH)

if metrics_json:
if metrics_json['nodes'] is not None:
if metrics_json['nodes']['node'] is not None:
if (metrics_json and metrics_json['nodes'] is not None and
metrics_json['nodes']['node'] is not None):

for node_json in metrics_json['nodes']['node']:
node_id = node_json['id']
for node_json in metrics_json['nodes']['node']:
node_id = node_json['id']

tags = ['node_id:%s' % str(node_id)]
tags.extend(addl_tags)
tags = ['node_id:%s' % str(node_id)]
tags.extend(addl_tags)

self._set_yarn_metrics_from_json(tags, node_json, YARN_NODE_METRICS)
self._set_yarn_metrics_from_json(tags, node_json, YARN_NODE_METRICS)

def _set_yarn_metrics_from_json(self, tags, metrics_json, yarn_metrics):
'''
Expand All @@ -243,7 +271,7 @@ def _set_metric(self, metric_name, metric_type, value, tags=None, device_name=No
elif metric_type == INCREMENT:
self.increment(metric_name, value, tags=tags, device_name=device_name)
else:
self.log.error('Metric type "%s" unknown' % (metric_type))
self.log.error('Metric type "%s" unknown', metric_type)

def _rest_request_to_json(self, address, object_path, *args, **kwargs):
'''
Expand Down
17 changes: 14 additions & 3 deletions conf.d/yarn.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ init_config:
instances:
# The YARN check retrieves metrics from YARNS's ResourceManager. This
# check must be run from the Master Node and the ResourceManager URI must
# be specified below. The ResourceManager URI is composed of the
# be specified below. The ResourceManager URI is composed of the
# ResourceManager's hostname and port.
#
# The ResourceManager hostname can be found in the yarn-site.xml conf file
Expand All @@ -19,5 +19,16 @@ instances:

# Optional tags to be applied to every emitted metric.
# tags:
# - key:value
# - instance:production
# - "key:value"
# - "instance:production"

# Optional tags retrieved from the application data to be applied to the
# application metrics.
# application_tags:
# # tag_prefix: yarn_key
# app_queue: queue
# This will add a tag 'app_queue:name_of_the_queue' to the app metrics,
# app_queue being the tag_prefix and queue the actual YARN key.
# Allowed yarn keys: applicationType, applicationTags, name, queue, user
# By default, the application name is collected with the prefix app_name.

21 changes: 17 additions & 4 deletions tests/checks/mock/test_yarn.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ class YARNCheck(AgentCheckTest):

YARN_CONFIG = {
'resourcemanager_uri': 'http://localhost:8088',
'cluster_name': CLUSTER_NAME
'cluster_name': CLUSTER_NAME,
'tags': [
'opt_key:opt_value'
],
'application_tags': {
'app_id': 'id',
'app_queue': 'queue'
}
}

YARN_CLUSTER_METRICS_VALUES = {
Expand Down Expand Up @@ -82,7 +89,10 @@ class YARNCheck(AgentCheckTest):
'yarn.metrics.rebooted_nodes': 0,
}

YARN_CLUSTER_METRICS_TAGS = ['cluster_name:%s' % CLUSTER_NAME]
YARN_CLUSTER_METRICS_TAGS = [
'cluster_name:%s' % CLUSTER_NAME,
'opt_key:opt_value'
]

YARN_APP_METRICS_VALUES = {
'yarn.apps.progress': 100,
Expand All @@ -98,7 +108,9 @@ class YARNCheck(AgentCheckTest):

YARN_APP_METRICS_TAGS = [
'cluster_name:%s' % CLUSTER_NAME,
'app_name:word count'
'app_name:word count',
'app_queue:default',
'opt_key:opt_value'
]

YARN_NODE_METRICS_VALUES = {
Expand All @@ -112,7 +124,8 @@ class YARNCheck(AgentCheckTest):

YARN_NODE_METRICS_TAGS = [
'cluster_name:%s' % CLUSTER_NAME,
'node_id:h2:1235'
'node_id:h2:1235',
'opt_key:opt_value'
]

@mock.patch('requests.get', side_effect=requests_get_mock)
Expand Down

0 comments on commit 0369d53

Please sign in to comment.