From 939aef50e69ff103faece1feb158542cbaf3deac Mon Sep 17 00:00:00 2001 From: Diego Orellana Date: Thu, 23 Jan 2025 15:20:14 -0800 Subject: [PATCH] Add flag to parse query times from logs for serverless DPB services. PiperOrigin-RevId: 719033039 --- perfkitbenchmarker/dpb_service.py | 36 +++- .../dpb_sparksql_benchmark.py | 164 +++++++++++++----- .../providers/aws/aws_dpb_emr.py | 58 ++++++- .../providers/aws/aws_dpb_glue.py | 17 ++ .../providers/gcp/gcp_dpb_dataproc.py | 2 + .../spark_sql_glue_wrapper.py | 18 +- .../spark_sql_runner.py | 92 +++++++--- 7 files changed, 310 insertions(+), 77 deletions(-) diff --git a/perfkitbenchmarker/dpb_service.py b/perfkitbenchmarker/dpb_service.py index 5c93d87bb4..526e296cc4 100644 --- a/perfkitbenchmarker/dpb_service.py +++ b/perfkitbenchmarker/dpb_service.py @@ -20,14 +20,14 @@ """ import abc -from collections.abc import MutableMapping +from collections.abc import Callable, MutableMapping import dataclasses import datetime import logging import os import shutil import tempfile -from typing import Dict, List, Type +from typing import Dict, List, Type, TypeAlias from absl import flags import jinja2 @@ -164,18 +164,37 @@ class JobSubmissionError(errors.Benchmarks.RunError): pass +FetchOutputFn: TypeAlias = Callable[[], tuple[str | None, str | None]] + + @dataclasses.dataclass class JobResult: - """Data class for the timing of a successful DPB job.""" + """Data class for the timing of a successful DPB job. + + Attributes: + run_time: Service reported execution time. + pending_time: Service reported pending time (0 if service does not report). + stdout: Job's stdout. Call FetchOutput before to ensure it's populated. + stderr: Job's stderr. Call FetchOutput before to ensure it's populated. + fetch_output_fn: Callback expected to return a 2-tuple of str or None whose + values correspond to stdout and stderr respectively. This is called by + FetchOutput which updates stdout and stderr if their respective value in + this callback's return tuple is not None. Defaults to a no-op. + """ - # Service reported execution time run_time: float - # Service reported pending time (0 if service does not report). pending_time: float = 0 - # Stdout of the job. stdout: str = '' - # Stderr of the job. stderr: str = '' + fetch_output_fn: FetchOutputFn = lambda: (None, None) + + def FetchOutput(self): + """Populates stdout and stderr according to fetch_output_fn callback.""" + stdout, stderr = self.fetch_output_fn() + if stdout is not None: + self.stdout = stdout + if stderr is not None: + self.stderr = stderr @property def wall_time(self) -> float: @@ -795,7 +814,8 @@ def CheckPrerequisites(self): if self.cloud == 'AWS' and not aws_flags.AWS_EC2_INSTANCE_PROFILE.value: raise ValueError( 'EC2 based Spark and Hadoop services require ' - '--aws_ec2_instance_profile.') + '--aws_ec2_instance_profile.' + ) def GetClusterCreateTime(self) -> float | None: """Returns the cluster creation time. diff --git a/perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py b/perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py index d1eec9c8c4..ca291a947c 100644 --- a/perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py @@ -48,8 +48,9 @@ import json import logging import os +import re import time -from typing import List +from typing import Any, List from absl import flags from perfkitbenchmarker import configs @@ -60,6 +61,7 @@ from perfkitbenchmarker import object_storage_service from perfkitbenchmarker import sample from perfkitbenchmarker import temp_dir +from perfkitbenchmarker import vm_util BENCHMARK_NAME = 'dpb_sparksql_benchmark' @@ -112,9 +114,34 @@ 'The record format to use when connecting to BigQuery storage. See: ' 'https://github.com/GoogleCloudDataproc/spark-bigquery-connector#properties', ) +_FETCH_RESULTS_FROM_LOGS = flags.DEFINE_bool( + 'dpb_sparksql_fetch_results_from_logs', + False, + 'Make the query runner script to log query timings to stdout/stderr ' + ' instead of writing them to some object storage location. Reduces runner ' + ' latency (and hence its total wall time), but it is not supported by all ' + ' DPB services.', +) FLAGS = flags.FLAGS +LOG_RESULTS_PATTERN = ( + r'----@spark_sql_runner:results_start@----' + r'(.*)' + r'----@spark_sql_runner:results_end@----' +) +POLL_LOGS_INTERVAL = 60 +POLL_LOGS_TIMEOUT = 6 * 60 +RESULTS_FROM_LOGS_SUPPORTED_DPB_SERVICES = ( + dpb_constants.DATAPROC_SERVERLESS, + dpb_constants.EMR_SERVERLESS, + dpb_constants.GLUE, +) + + +class QueryResultsNotReadyError(Exception): + """Used to signal a job is still running.""" + def GetConfig(user_config): return configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) @@ -129,7 +156,6 @@ def CheckPrerequisites(benchmark_config): Raises: Config.InvalidValue: On encountering invalid configuration. """ - del benchmark_config # unused if not FLAGS.dpb_sparksql_data and FLAGS.dpb_sparksql_create_hive_tables: raise errors.Config.InvalidValue( 'You must pass dpb_sparksql_data with dpb_sparksql_create_hive_tables' @@ -160,7 +186,7 @@ def CheckPrerequisites(benchmark_config): bool(_BIGQUERY_TABLES.value), bool(FLAGS.dpb_sparksql_database), ]) - == 1 + > 1 ): logging.warning( 'You should only pass one of them: --dpb_sparksql_data,' @@ -176,6 +202,16 @@ def CheckPrerequisites(benchmark_config): '--dpb_sparksql_simultaneous is not compatible with ' '--dpb_sparksql_streams.' ) + if ( + _FETCH_RESULTS_FROM_LOGS.value + and benchmark_config.dpb_service.service_type + not in RESULTS_FROM_LOGS_SUPPORTED_DPB_SERVICES + ): + raise errors.Config.InvalidValue( + f'Current dpb service {benchmark_config.dpb_service.service_type!r} is' + ' not supported for --dpb_sparksql_fetch_results_from_logs. Supported' + f' dpb services are: {RESULTS_FROM_LOGS_SUPPORTED_DPB_SERVICES!r}' + ) def Prepare(benchmark_spec): @@ -275,7 +311,7 @@ def Run(benchmark_spec): # Run PySpark Spark SQL Runner report_dir, job_result = _RunQueries(benchmark_spec) - results = _GetQuerySamples(storage_service, report_dir, metadata) + results = _GetQuerySamples(storage_service, report_dir, job_result, metadata) results += _GetGlobalSamples(results, cluster, job_result, metadata) results += _GetPrepareSamples(benchmark_spec, metadata) return results @@ -319,7 +355,10 @@ def _RunQueries(benchmark_spec) -> tuple[str, dpb_service.JobResult]: else: for stream in benchmark_spec.query_streams: args += ['--sql-queries', ','.join(stream)] - args += ['--report-dir', report_dir] + if _FETCH_RESULTS_FROM_LOGS.value: + args += ['--log-results', 'True'] + else: + args += ['--report-dir', report_dir] if FLAGS.dpb_sparksql_database: args += ['--database', FLAGS.dpb_sparksql_database] if FLAGS.dpb_sparksql_create_hive_tables: @@ -365,46 +404,33 @@ def _RunQueries(benchmark_spec) -> tuple[str, dpb_service.JobResult]: def _GetQuerySamples( storage_service: object_storage_service.ObjectStorageService, report_dir: str, + job_result: dpb_service.JobResult, base_metadata: MutableMapping[str, str], ) -> list[sample.Sample]: - """Get Sample objects from metrics storage path.""" - # Spark can only write data to directories not files. So do a recursive copy - # of that directory and then search it for the collection of JSON files with - # the results. - temp_run_dir = temp_dir.GetRunDirPath() - storage_service.Copy(report_dir, temp_run_dir, recursive=True) - report_files = [] - for dir_name, _, files in os.walk( - os.path.join(temp_run_dir, os.path.basename(report_dir)) - ): - for filename in files: - if filename.endswith('.json'): - report_file = os.path.join(dir_name, filename) - report_files.append(report_file) - logging.info("Found report file '%s'.", report_file) - if not report_files: - raise errors.Benchmarks.RunError('Job report not found.') + """Get Sample objects from job's logs.""" + + if _FETCH_RESULTS_FROM_LOGS.value: + query_results = _FetchResultsFromLogs(job_result) + else: + query_results = _FetchResultsFromStorage(storage_service, report_dir) samples = [] - for report_file in report_files: - with open(report_file) as file: - for line in file: - result = json.loads(line) - logging.info('Timing: %s', result) - query_id = result['query_id'] - assert query_id - metadata_copy = base_metadata.copy() - metadata_copy['query'] = query_id - if FLAGS.dpb_sparksql_streams: - metadata_copy['stream'] = result['stream'] - samples.append( - sample.Sample( - 'sparksql_run_time', - result['duration'], - 'seconds', - metadata_copy, - ) + for result in query_results: + logging.info('Timing: %s', result) + query_id = result['query_id'] + assert query_id + metadata_copy = dict(base_metadata) + metadata_copy['query'] = query_id + if FLAGS.dpb_sparksql_streams: + metadata_copy['stream'] = result['stream'] + samples.append( + sample.Sample( + 'sparksql_run_time', + result['duration'], + 'seconds', + metadata_copy, ) + ) return samples @@ -524,6 +550,64 @@ def _GetPrepareSamples( return samples +def _FetchResultsFromStorage( + storage_service: object_storage_service.ObjectStorageService, + report_dir: str, +) -> list[dict[str, Any]]: + """Get Sample objects from metrics storage path.""" + # Spark can only write data to directories not files. So do a recursive copy + # of that directory and then search it for the collection of JSON files with + # the results. + temp_run_dir = temp_dir.GetRunDirPath() + storage_service.Copy(report_dir, temp_run_dir, recursive=True) + report_files = [] + for dir_name, _, files in os.walk( + os.path.join(temp_run_dir, os.path.basename(report_dir)) + ): + for filename in files: + if filename.endswith('.json'): + report_file = os.path.join(dir_name, filename) + report_files.append(report_file) + logging.info("Found report file '%s'.", report_file) + if not report_files: + raise errors.Benchmarks.RunError('Job report not found.') + results = [] + for report_file in report_files: + with open(report_file) as file: + for line in file: + results.append(json.loads(line)) + return results + + +@vm_util.Retry( + timeout=POLL_LOGS_TIMEOUT, + poll_interval=POLL_LOGS_INTERVAL, + fuzz=0, + retryable_exceptions=(QueryResultsNotReadyError,), +) +def _FetchResultsFromLogs(job_result: dpb_service.JobResult): + """Get samples from job results logs.""" + job_result.FetchOutput() + logs = '\n'.join([job_result.stdout or '', job_result.stderr]) + query_results = _ParseResultsFromLogs(logs) + if query_results is None: + raise QueryResultsNotReadyError + return query_results + + +def _ParseResultsFromLogs(logs: str) -> list[dict[str, Any]] | None: + json_str_match = re.search(LOG_RESULTS_PATTERN, logs, re.DOTALL) + if not json_str_match: + return None + try: + results = json.loads(json_str_match.group(1)) + except ValueError as e: + raise errors.Benchmarks.RunError( + 'Corrupted results from logs cannot be deserialized.' + ) from e + return results + + def _GetDistCpMetadata(base_dir: str, subdirs: List[str], extra_metadata=None): """Compute list of table metadata for spark_sql_distcp metadata flags.""" metadata = [] diff --git a/perfkitbenchmarker/providers/aws/aws_dpb_emr.py b/perfkitbenchmarker/providers/aws/aws_dpb_emr.py index faa827d1a7..181ec62c2f 100644 --- a/perfkitbenchmarker/providers/aws/aws_dpb_emr.py +++ b/perfkitbenchmarker/providers/aws/aws_dpb_emr.py @@ -18,8 +18,10 @@ import collections import dataclasses +import gzip import json import logging +import os from typing import Any, Dict from absl import flags @@ -28,6 +30,7 @@ from perfkitbenchmarker import dpb_service from perfkitbenchmarker import errors from perfkitbenchmarker import provider_info +from perfkitbenchmarker import temp_dir from perfkitbenchmarker import vm_util from perfkitbenchmarker.providers.aws import aws_disk from perfkitbenchmarker.providers.aws import aws_dpb_emr_serverless_prices @@ -36,6 +39,7 @@ from perfkitbenchmarker.providers.aws import s3 from perfkitbenchmarker.providers.aws import util + FLAGS = flags.FLAGS flags.DEFINE_string( 'dpb_emr_release_label', None, 'DEPRECATED use dpb_service.version.' @@ -551,6 +555,25 @@ def GetHdfsType(self) -> str | None: ' spec.' ) from None + def _FetchLogs(self, step_id: str) -> str | None: + local_stdout_path = os.path.join( + temp_dir.GetRunDirPath(), + f'emr_{self.cluster_id}_{step_id}.stdout.gz', + ) + get_stdout_cmd = self.cmd_prefix + [ + 's3', + 'cp', + os.path.join( + self.base_dir, + f'{self.cluster_id}/steps/{step_id}/stderr.gz' + ), + local_stdout_path, + ] + _, _, _ = vm_util.IssueCommand(get_stdout_cmd) + with gzip.open(local_stdout_path, 'rt') as f: + stdout = f.read() + return stdout + class AwsDpbEmrServerless( dpb_service.DpbServiceServerlessMixin, dpb_service.BaseDpbService @@ -635,6 +658,11 @@ def SubmitJob( f'Unsupported job type {job_type} for AWS EMR Serverless.' ) + s3_monitoring_config = { + 's3MonitoringConfiguration': { + 'logUri': os.path.join(self.base_dir, 'logs') + } + } # Create the application. stdout, _, _ = vm_util.IssueCommand( self.cmd_prefix @@ -649,6 +677,8 @@ def SubmitJob( application_type, '--tags', json.dumps(util.MakeDefaultTags()), + '--monitoring-configuration', + json.dumps(s3_monitoring_config), ] ) result = json.loads(stdout) @@ -816,7 +846,33 @@ def _CallGetJobRunApi( start_time = result['jobRun']['createdAt'] end_time = result['jobRun']['updatedAt'] self._job_run = self._ParseCostMetrics(result) - return dpb_service.JobResult(run_time=end_time - start_time) + return dpb_service.JobResult( + run_time=end_time - start_time, + fetch_output_fn=lambda: ( + self._FetchLogs(job_run.application_id, job_run.job_run_id), + None, + ), + ) + + def _FetchLogs(self, application_id: str, job_run_id: str) -> str | None: + local_stdout_path = os.path.join( + temp_dir.GetRunDirPath(), + f'emrs8s_{application_id}_{job_run_id}.stdout.gz', + ) + get_stdout_cmd = self.cmd_prefix + [ + 's3', + 'cp', + os.path.join( + self.base_dir, + f'logs/applications/{application_id}/jobs/{job_run_id}/' + 'SPARK_DRIVER/stdout.gz', + ), + local_stdout_path, + ] + _, _, _ = vm_util.IssueCommand(get_stdout_cmd) + with gzip.open(local_stdout_path, 'rt') as f: + stdout = f.read() + return stdout def _FillMetadata(self) -> None: """Gets a dict to initialize this DPB service instance's metadata.""" diff --git a/perfkitbenchmarker/providers/aws/aws_dpb_glue.py b/perfkitbenchmarker/providers/aws/aws_dpb_glue.py index cfe31d3f36..15e0d78a22 100644 --- a/perfkitbenchmarker/providers/aws/aws_dpb_glue.py +++ b/perfkitbenchmarker/providers/aws/aws_dpb_glue.py @@ -66,6 +66,22 @@ def __init__(self, dpb_service_spec): def _glue_script_wrapper_url(self): return os.path.join(self.base_dir, self.SPARK_SQL_GLUE_WRAPPER_SCRIPT) + def _FetchStderr(self, job_run_id: str) -> str: + cmd = self.cmd_prefix + [ + 'logs', + 'get-log-events', + '--log-group-name', + '/aws-glue/jobs/error', + '--log-stream-name', + job_run_id, + '--output', + 'text', + '--query', + 'events[*].[message]', + ] + stdout, _, _ = vm_util.IssueCommand(cmd) + return stdout + def _GetCompletedJob(self, job_id): """See base class.""" job_name, job_run_id = job_id @@ -98,6 +114,7 @@ def _GetCompletedJob(self, job_id): return dpb_service.JobResult( run_time=execution_time, pending_time=completed_on - started_on - execution_time, + fetch_output_fn=lambda: (None, self._FetchStderr(job_run_id)), ) def SubmitJob( diff --git a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataproc.py b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataproc.py index d2f61b74fd..a2fe073ece 100644 --- a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataproc.py +++ b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataproc.py @@ -614,6 +614,7 @@ def SubmitJob( _, stderr, retcode = cmd.Issue(timeout=None, raise_on_failure=False) if retcode != 0: raise dpb_service.JobSubmissionError(stderr) + job_stderr = stderr fetch_batch_cmd = self.DataprocGcloudCommand( 'batches', 'describe', self.batch_name @@ -641,6 +642,7 @@ def SubmitJob( return dpb_service.JobResult( run_time=(done_time - start_time).total_seconds(), pending_time=(start_time - pending_time).total_seconds(), + fetch_output_fn=lambda: (None, job_stderr), ) def GetJobProperties(self) -> Dict[str, str]: diff --git a/perfkitbenchmarker/scripts/spark_sql_test_scripts/spark_sql_glue_wrapper.py b/perfkitbenchmarker/scripts/spark_sql_test_scripts/spark_sql_glue_wrapper.py index 0a0a2d6e75..8f1d93d1bd 100644 --- a/perfkitbenchmarker/scripts/spark_sql_test_scripts/spark_sql_glue_wrapper.py +++ b/perfkitbenchmarker/scripts/spark_sql_test_scripts/spark_sql_glue_wrapper.py @@ -3,9 +3,21 @@ import importlib import json import sys -from awsglue.utils import getResolvedOptions -args = getResolvedOptions(sys.argv, ['pkb_main', 'pkb_args']) +from awsglue import context +from awsglue import utils + + +def get_results_logger(spark_context): + glue_context = context.GlueContext(spark_context) + logger = glue_context.get_logger() + return logger + + +args = utils.getResolvedOptions(sys.argv, ['pkb_main', 'pkb_args']) pkb_args = json.loads(args['pkb_args']) pkb_main = importlib.import_module(args['pkb_main']) -pkb_main.main(pkb_main.parse_args(pkb_args)) +pkb_main.main( + pkb_main.parse_args(pkb_args), + results_logger_getter=get_results_logger, +) diff --git a/perfkitbenchmarker/scripts/spark_sql_test_scripts/spark_sql_runner.py b/perfkitbenchmarker/scripts/spark_sql_test_scripts/spark_sql_runner.py index 822986b33c..7c19ddb42a 100644 --- a/perfkitbenchmarker/scripts/spark_sql_test_scripts/spark_sql_runner.py +++ b/perfkitbenchmarker/scripts/spark_sql_test_scripts/spark_sql_runner.py @@ -10,6 +10,7 @@ import argparse from concurrent import futures +import json import logging import os import time @@ -18,6 +19,14 @@ from pyspark import sql +_results_logger = logging.getLogger('spark_sql_runner_results') +_results_logger.propagate = False +_results_logger.setLevel(logging.INFO) +_results_logger_handler = logging.StreamHandler() +_results_logger_handler.setFormatter(logging.Formatter()) +_results_logger.addHandler(_results_logger_handler) + + # This snippet will be replaced with an actual dict[str, str] from query_id to # SQL string before uploading this file. Not using a Jinja template, since we # also want to load this as a proper python file for unit testing. @@ -41,16 +50,18 @@ def parse_args(args=None): ' parallel.' ), ) - group = parser.add_mutually_exclusive_group() - group.add_argument('--database', help='Hive database to look for data in.') - group.add_argument( + data_group = parser.add_mutually_exclusive_group() + data_group.add_argument( + '--database', help='Hive database to look for data in.' + ) + data_group.add_argument( '--table-base-dir', help=( 'Base HCFS path containing the table data to be registered into Spark' ' temporary view.' ), ) - group.add_argument( + data_group.add_argument( '--bigquery-dataset', help=( 'BQ Dataset containing the tables passed in --table-names to be' @@ -91,9 +102,19 @@ def parse_args(args=None): choices=['eager', 'lazy'], help='Whether to cache the tables in memory spilling to local-disk.', ) - parser.add_argument( + results_group = parser.add_mutually_exclusive_group(required=True) + results_group.add_argument( + '--log-results', + type=bool, + default=False, + help=( + 'Log query timings to stdout/stderr instead of writing them to some' + ' object storage location. Reduces runner latency (and hence its' + ' total wall time), but it is not supported by all DPB services.' + ), + ) + results_group.add_argument( '--report-dir', - required=True, help='Directory to write out query timings to.', ) parser.add_argument( @@ -123,7 +144,22 @@ def _load_file(spark, object_path): return '\n'.join(spark.sparkContext.textFile(object_path).collect()) -def main(args): +def get_results_logger(spark_context): + """Gets results logger. + + Injected into main fn to be replaceable by wrappers. + + Args: + spark_context: Spark API context. + + Returns: + A python logger object. + """ + del spark_context + return _results_logger + + +def main(args, results_logger_getter=get_results_logger): builder = sql.SparkSession.builder.appName('Spark SQL Query') if args.enable_hive: builder = builder.enableHiveSupport() @@ -157,8 +193,6 @@ def main(args): args.dump_spark_conf ) - results = [] - threads = len(query_streams) executor = futures.ThreadPoolExecutor(max_workers=threads) result_futures = [ @@ -171,10 +205,26 @@ def main(args): results = [] for f in result_futures: results += f.result() - logging.info('Writing results to %s', args.report_dir) - spark.createDataFrame(results).coalesce(1).write.mode('append').json( - args.report_dir - ) + + if args.log_results: + dumped_results = '\n'.join([ + '----@spark_sql_runner:results_start@----', + json.dumps(results), + '----@spark_sql_runner:results_end@----', + ]) + results_logger = results_logger_getter(spark.sparkContext) + results_logger.info(dumped_results) + else: + logging.info('Writing results to %s', args.report_dir) + results_as_rows = [ + sql.Row( + stream=r['stream'], query_id=r['query_id'], duration=r['duration'] + ) + for r in results + ] + spark.createDataFrame(results_as_rows).coalesce(1).write.mode( + 'append' + ).json(args.report_dir) def get_table_metadata(args): @@ -189,9 +239,7 @@ def get_table_metadata(args): metadata[table_name] = (args.table_format or 'parquet', option_params) elif args.bigquery_dataset: for table_name in args.table_names: - bq_options = { - 'table': '.'.join([args.bigquery_dataset, table_name]) - } + bq_options = {'table': '.'.join([args.bigquery_dataset, table_name])} if args.bigquery_read_data_format: bq_options['readDataFormat'] = args.bigquery_read_data_format metadata[table_name] = (args.table_format or 'bigquery', bq_options) @@ -201,7 +249,7 @@ def get_table_metadata(args): def run_sql_query( spark_session, query_stream, stream_id, raise_query_execution_errors ): - """Runs a SQL query stream, returns list[pyspark.sql.Row] with durations.""" + """Runs a SQL query stream, returns list[dict] with durations.""" results = [] for query_id in query_stream: @@ -210,17 +258,11 @@ def run_sql_query( try: logging.info('Running query %s', query_id) start = time.time() - # spark-sql does not limit its output. Replicate that here by setting - # limit to max Java Integer. Hopefully you limited the output in SQL or - # you are going to have a bad time. Note this is not true of all TPC-DS or - # TPC-H queries and they may crash with small JVMs. - # pylint: disable=protected-access df = spark_session.sql(query) - df.show(spark_session._jvm.java.lang.Integer.MAX_VALUE) - # pylint: enable=protected-access + df.collect() duration = time.time() - start results.append( - sql.Row(stream=stream_id, query_id=query_id, duration=duration) + {'stream': stream_id, 'query_id': query_id, 'duration': duration} ) # These correspond to errors in low level Spark Excecution. # Let ParseException and AnalysisException fail the job.