Skip to content

Commit

Permalink
Revert "Gracefully handle unavailable apps and their aspects (#19561)" (
Browse files Browse the repository at this point in the history
#19663)

This reverts commit 7c6d13f.
  • Loading branch information
iliakur authored Feb 20, 2025
1 parent db11d64 commit d71f81c
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 103 deletions.
1 change: 0 additions & 1 deletion spark/changelog.d/19561.fixed

This file was deleted.

123 changes: 73 additions & 50 deletions spark/datadog_checks/spark/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,110 +392,133 @@ def _get_spark_app_ids(self, running_apps, tags):

return spark_apps

def _describe_app(self, property, running_apps, addl_tags):
def _spark_job_metrics(self, running_apps, addl_tags):
"""
Get payloads that describe certain property of the running apps.
Examples of "aspects":
- the app's jobs
- the app's spark stages
Get metrics for each Spark job.
"""
for app_id, (app_name, tracking_url) in running_apps.items():

base_url = self._get_request_url(tracking_url)
response = self._rest_request(base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, property)
try:
yield (response.json(), [f'app_name:{app_name}'] + addl_tags)
except JSONDecodeError:
self.log.debug(
'Skipping metrics for %s from app %s due to unparseable JSON payload.', property, app_name
)
continue
response = self._rest_request_to_json(
base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, 'jobs'
)

def _spark_job_metrics(self, running_apps, addl_tags):
"""
Get metrics for each Spark job.
"""
for jobs, app_tags in self._describe_app('jobs', running_apps, addl_tags):
for job in jobs:
job_tags = []
for job in response:

status = job.get('status')
job_tags.append('status:%s' % str(status).lower())

tags = ['app_name:%s' % str(app_name)]
tags.extend(addl_tags)
tags.append('status:%s' % str(status).lower())

job_id = job.get('jobId')
if job_id is not None:
job_tags.append('job_id:{}'.format(job_id))
tags.append('job_id:{}'.format(job_id))

if not self._disable_spark_job_stage_tags:
for stage_id in job.get('stageIds', []):
job_tags.append('stage_id:{}'.format(stage_id))
tags.append('stage_id:{}'.format(stage_id))

tags = app_tags + job_tags
self._set_metrics_from_json(tags, job, SPARK_JOB_METRICS)
self._set_metric('spark.job.count', COUNT, 1, tags)

def _spark_stage_metrics(self, running_apps, addl_tags):
"""
Get metrics for each Spark stage.
"""
for stages, app_tags in self._describe_app('stages', running_apps, addl_tags):
for stage in stages:
stage_tags = []
for app_id, (app_name, tracking_url) in running_apps.items():

base_url = self._get_request_url(tracking_url)
response = self._rest_request_to_json(
base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, 'stages'
)

for stage in response:

status = stage.get('status')
stage_tags.append('status:%s' % str(status).lower())

tags = ['app_name:%s' % str(app_name)]
tags.extend(addl_tags)
tags.append('status:%s' % str(status).lower())

stage_id = stage.get('stageId')
if stage_id is not None:
stage_tags.append('stage_id:{}'.format(stage_id))
tags.append('stage_id:{}'.format(stage_id))

tags = app_tags + stage_tags
self._set_metrics_from_json(tags, stage, SPARK_STAGE_METRICS)
self._set_metric('spark.stage.count', COUNT, 1, tags)

def _spark_executor_metrics(self, running_apps, addl_tags):
"""
Get metrics for each Spark executor.
"""
for executors, app_tags in self._describe_app('executors', running_apps, addl_tags):
for executor in executors:
for app_id, (app_name, tracking_url) in running_apps.items():

base_url = self._get_request_url(tracking_url)
response = self._rest_request_to_json(
base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, 'executors'
)

tags = ['app_name:%s' % str(app_name)]
tags.extend(addl_tags)

for executor in response:
if executor.get('id') == 'driver':
self._set_metrics_from_json(app_tags, executor, SPARK_DRIVER_METRICS)
self._set_metrics_from_json(tags, executor, SPARK_DRIVER_METRICS)
else:
self._set_metrics_from_json(app_tags, executor, SPARK_EXECUTOR_METRICS)
self._set_metrics_from_json(tags, executor, SPARK_EXECUTOR_METRICS)

if is_affirmative(self.instance.get('executor_level_metrics', False)):
self._set_metrics_from_json(
app_tags + ['executor_id:{}'.format(executor.get('id', 'unknown'))],
tags + ['executor_id:{}'.format(executor.get('id', 'unknown'))],
executor,
SPARK_EXECUTOR_LEVEL_METRICS,
)

if executors:
self._set_metric('spark.executor.count', COUNT, len(executors), app_tags)
if len(response):
self._set_metric('spark.executor.count', COUNT, len(response), tags)

def _spark_rdd_metrics(self, running_apps, addl_tags):
"""
Get metrics for each Spark RDD.
"""
for rdds, app_tags in self._describe_app('storage/rdd', running_apps, addl_tags):
for rdd in rdds:
self._set_metrics_from_json(app_tags, rdd, SPARK_RDD_METRICS)
for app_id, (app_name, tracking_url) in running_apps.items():

base_url = self._get_request_url(tracking_url)
response = self._rest_request_to_json(
base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, 'storage/rdd'
)

if rdds:
self._set_metric('spark.rdd.count', COUNT, len(rdds), app_tags)
tags = ['app_name:%s' % str(app_name)]
tags.extend(addl_tags)

for rdd in response:
self._set_metrics_from_json(tags, rdd, SPARK_RDD_METRICS)

if len(response):
self._set_metric('spark.rdd.count', COUNT, len(response), tags)

def _spark_streaming_statistics_metrics(self, running_apps, addl_tags):
"""
Get metrics for each application streaming statistics.
"""
try:
for stats, app_tags in self._describe_app('streaming/statistics', running_apps, addl_tags):
self.log.debug('streaming/statistics: %s', stats)
self._set_metrics_from_json(app_tags, stats, SPARK_STREAMING_STATISTICS_METRICS)
except HTTPError as e:
self.log.debug("Got an error collecting streaming/statistics", e, exc_info=True)
pass
for app_id, (app_name, tracking_url) in running_apps.items():
try:
base_url = self._get_request_url(tracking_url)
response = self._rest_request_to_json(
base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, 'streaming/statistics'
)
self.log.debug('streaming/statistics: %s', response)
tags = ['app_name:%s' % str(app_name)]
tags.extend(addl_tags)

# NOTE: response is a dict
self._set_metrics_from_json(tags, response, SPARK_STREAMING_STATISTICS_METRICS)
except HTTPError as e:
# NOTE: If api call returns response 404
# then it means that the application is not a streaming application, we should skip metric submission
if e.response.status_code != 404:
raise

def _spark_structured_streams_metrics(self, running_apps, addl_tags):
"""
Expand Down
2 changes: 1 addition & 1 deletion spark/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
EXPECTED_E2E_METRICS = [
'spark.driver.total_shuffle_read',
'spark.stage.num_active_tasks',
'spark.streaming.statistics.num_inactive_receivers',
'spark.driver.max_memory',
'spark.streaming.statistics.num_active_batches',
'spark.driver.total_tasks',
Expand Down Expand Up @@ -90,7 +91,6 @@
'spark.structured_streaming.input_rate',
'spark.streaming.statistics.avg_input_rate',
'spark.streaming.statistics.avg_scheduling_delay',
'spark.streaming.statistics.num_inactive_receivers',
# The peak memory metrics are only available in Spark 3.0+ and after one loop of garbage collection
'spark.driver.peak_mem.jvm_heap_memory',
'spark.driver.peak_mem.jvm_off_heap_memory',
Expand Down
51 changes: 0 additions & 51 deletions spark/tests/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -1248,57 +1248,6 @@ def test_no_running_apps(aggregator, dd_run_check, instance, service_check, capl
assert 'No running apps found. No metrics will be collected.' in caplog.text


@pytest.mark.unit
@pytest.mark.parametrize(
'property_url, missing_metrics',
[
pytest.param(YARN_SPARK_JOB_URL, SPARK_JOB_RUNNING_METRIC_VALUES, id='jobs'),
pytest.param(YARN_SPARK_STAGE_URL, SPARK_STAGE_RUNNING_METRIC_VALUES, id='stages'),
pytest.param(
YARN_SPARK_EXECUTOR_URL,
SPARK_EXECUTOR_METRIC_VALUES.keys() | SPARK_EXECUTOR_LEVEL_METRIC_VALUES.keys(),
id='executors',
),
pytest.param(YARN_SPARK_RDD_URL, SPARK_RDD_METRIC_VALUES, id='storage/rdd'),
pytest.param(
YARN_SPARK_STREAMING_STATISTICS_URL, SPARK_STREAMING_STATISTICS_METRIC_VALUES, id='streaming/statistics'
),
],
)
def test_yarn_no_json_for_app_properties(aggregator, dd_run_check, mocker, property_url, missing_metrics):
"""
In some yarn deployments apps stop exposing properties (such as jobs and stages) by the time we query them.
In these cases we skip only the specific missing apps and metrics while collecting all others.
"""

def get_without_json(url, *args, **kwargs):
arg_url = Url(url)
if arg_url == property_url:
return MockResponse(content="") # this should trigger json error
return yarn_requests_get_mock(url, *args, **kwargs)

mocker.patch('requests.get', get_without_json)
dd_run_check(SparkCheck('spark', {}, [YARN_CONFIG]))
for m in missing_metrics:
aggregator.assert_metric(m, count=0)


@pytest.mark.unit
@pytest.mark.parametrize('status_code', [404, 500])
def test_yarn_streaming_metrics_http_error(aggregator, dd_run_check, mocker, caplog, status_code):
def get_raise_error(url, *args, **kwargs):
arg_url = Url(url)
if arg_url == YARN_SPARK_STREAMING_STATISTICS_URL:
return MockResponse(status_code=status_code)
return yarn_requests_get_mock(url, *args, **kwargs)

mocker.patch('requests.get', get_raise_error)
dd_run_check(SparkCheck('spark', {}, [YARN_CONFIG]))
for m in SPARK_STREAMING_STATISTICS_METRIC_VALUES:
aggregator.assert_metric(m, count=0)


class StandaloneAppsResponseHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
Expand Down

0 comments on commit d71f81c

Please sign in to comment.