diff --git a/openeogeotrellis/backend.py b/openeogeotrellis/backend.py index ee5a4c1bf..564a9d7c5 100644 --- a/openeogeotrellis/backend.py +++ b/openeogeotrellis/backend.py @@ -943,6 +943,7 @@ def _start_job(self, job_id: str, user_id: str, batch_process_dependencies: Unio max_executors = extra_options.get("max-executors", "200") queue = extra_options.get("queue", "default") profile = extra_options.get("profile", "false") + soft_errors = extra_options.get("soft-errors", "false") def serialize_dependencies() -> str: dependencies = batch_process_dependencies or job_info.get('dependencies') or [] @@ -1110,6 +1111,7 @@ def as_arg_element(dependency: dict) -> dict: args.append(max_executors) args.append(user_id) args.append(job_id) + args.append(soft_errors) try: logger.info("Submitting job: {a!r}".format(a=args), extra={'job_id': job_id}) diff --git a/openeogeotrellis/deploy/batch_job.py b/openeogeotrellis/deploy/batch_job.py index 32dea9ba8..97e6703b5 100644 --- a/openeogeotrellis/deploy/batch_job.py +++ b/openeogeotrellis/deploy/batch_job.py @@ -225,11 +225,11 @@ def main(argv: List[str]) -> None: logger.info("argv: {a!r}".format(a=argv)) logger.info("pid {p}; ppid {pp}; cwd {c}".format(p=os.getpid(), pp=os.getppid(), c=os.getcwd())) - if len(argv) != 9: + if len(argv) != 10: raise Exception( f"usage: {argv[0]} " " " - " " + " " ) job_specification_file = argv[1] @@ -240,6 +240,7 @@ def main(argv: List[str]) -> None: api_version = argv[6] dependencies = _deserialize_dependencies(argv[7]) user_id = argv[8] + soft_errors = argv[9].upper() == "TRUE" _create_job_dir(job_dir) @@ -278,7 +279,8 @@ def main(argv: List[str]) -> None: def run_driver(): run_job( job_specification=job_specification, output_file=output_file, metadata_file=metadata_file, - api_version=api_version, job_dir=job_dir, dependencies=dependencies, user_id=user_id + api_version=api_version, job_dir=job_dir, dependencies=dependencies, user_id=user_id, + soft_errors=soft_errors ) if sc.getConf().get('spark.python.profile', 'false').lower() == 'true': @@ -311,8 +313,10 @@ def run_driver(): user_facing_logger.error("Your batch job failed because workers used too much Python memory. The same task was attempted multiple times. Consider increasing executor-memoryOverhead or contact the developers to investigate.") raise e + @log_memory -def run_job(job_specification, output_file: Path, metadata_file: Path, api_version, job_dir, dependencies: dict, user_id:str=None): +def run_job(job_specification, output_file: Path, metadata_file: Path, api_version, job_dir, dependencies: dict, + user_id: str = None, soft_errors: bool = False): logger.info(f"Job spec: {json.dumps(job_specification,indent=1)}") process_graph = job_specification['process_graph'] @@ -327,7 +331,8 @@ def run_job(job_specification, output_file: Path, metadata_file: Path, api_versi 'require_bounds': True, 'correlation_id': correlation_id, 'dependencies': dependencies, - "backend_implementation": backend_implementation, + 'backend_implementation': backend_implementation, + 'soft_errors': soft_errors }) tracer = DryRunDataTracer() logger.info("Starting process graph evaluation") diff --git a/openeogeotrellis/deploy/submit_batch_job.sh b/openeogeotrellis/deploy/submit_batch_job.sh index cb9128ee0..67760f77c 100755 --- a/openeogeotrellis/deploy/submit_batch_job.sh +++ b/openeogeotrellis/deploy/submit_batch_job.sh @@ -44,6 +44,7 @@ pyfiles=${20} maxexecutors=${21-500} userId=${22} batchJobId=${23} +softErrors=${24-false} pysparkPython="venv/bin/python" @@ -134,4 +135,4 @@ spark-submit \ --conf spark.yarn.tags=openeo \ --jars "${extensions}","${backend_assembly}" \ --name "${jobName}" \ - "${main_py_file}" "$(basename "${processGraphFile}")" "${outputDir}" "${outputFileName}" "${userLogFileName}" "${metadataFileName}" "${apiVersion}" "${dependencies}" "${userId}" + "${main_py_file}" "$(basename "${processGraphFile}")" "${outputDir}" "${outputFileName}" "${userLogFileName}" "${metadataFileName}" "${apiVersion}" "${dependencies}" "${userId}" "${softErrors}" diff --git a/openeogeotrellis/deploy/submit_batch_job_spark24.sh b/openeogeotrellis/deploy/submit_batch_job_spark24.sh index 56ebc823a..74b9f5883 100755 --- a/openeogeotrellis/deploy/submit_batch_job_spark24.sh +++ b/openeogeotrellis/deploy/submit_batch_job_spark24.sh @@ -44,6 +44,7 @@ pyfiles=${20} maxexecutors=${21-500} userId=${22} batchJobId=${23} +softErrors=${24-false} pysparkPython="venv/bin/python" @@ -144,4 +145,4 @@ spark-submit \ --conf spark.yarn.tags=openeo \ --jars "${extensions}","${backend_assembly}" \ --name "${jobName}" \ - "${main_py_file}" "$(basename "${processGraphFile}")" "${outputDir}" "${outputFileName}" "${userLogFileName}" "${metadataFileName}" "${apiVersion}" "${dependencies}" "${userId}" + "${main_py_file}" "$(basename "${processGraphFile}")" "${outputDir}" "${outputFileName}" "${userLogFileName}" "${metadataFileName}" "${apiVersion}" "${dependencies}" "${userId}" "${softErrors}" diff --git a/openeogeotrellis/deploy/submit_batch_job_spark3.sh b/openeogeotrellis/deploy/submit_batch_job_spark3.sh index 9cd17173d..9b709565b 100755 --- a/openeogeotrellis/deploy/submit_batch_job_spark3.sh +++ b/openeogeotrellis/deploy/submit_batch_job_spark3.sh @@ -44,6 +44,7 @@ pyfiles=${20} maxexecutors=${21-500} userId=${22} batchJobId=${23} +softErrors=${24-false} pysparkPython="venv/bin/python" @@ -165,4 +166,4 @@ spark-submit \ --conf spark.yarn.tags=openeo \ --jars "${extensions}","${backend_assembly}" \ --name "${jobName}" \ - "${main_py_file}" "$(basename "${processGraphFile}")" "${outputDir}" "${outputFileName}" "${userLogFileName}" "${metadataFileName}" "${apiVersion}" "${dependencies}" "${userId}" + "${main_py_file}" "$(basename "${processGraphFile}")" "${outputDir}" "${outputFileName}" "${userLogFileName}" "${metadataFileName}" "${apiVersion}" "${dependencies}" "${userId}" "${softErrors}" diff --git a/openeogeotrellis/layercatalog.py b/openeogeotrellis/layercatalog.py index 3a6f1b5d1..5dfc59db7 100644 --- a/openeogeotrellis/layercatalog.py +++ b/openeogeotrellis/layercatalog.py @@ -392,7 +392,7 @@ def sentinel_hub_pyramid(): if len(band_gsds) > 0 else jvm.geotrellis.raster.CellSize(cell_width, cell_height)) - soft_errors = feature_flags.get("soft_errors", False) + soft_errors = env.get("soft_errors", False) pyramid_factory = jvm.org.openeo.geotrellissentinelhub.PyramidFactory.withGuardedRateLimiting( endpoint, diff --git a/tests/test_views.py b/tests/test_views.py index c7f5009ec..7403dc277 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -439,7 +439,8 @@ def test_create_and_start_and_download(self, api, tmp_path, monkeypatch): 'default', 'false', '[]', "__pyfiles__/custom_processes.py,foolib.whl", '200' ] - assert batch_job_args[22:] == [TEST_USER, job_id] + assert batch_job_args[22:24] == [TEST_USER, job_id] + assert batch_job_args[24] == 'false' # Check metadata in zookeeper raw, _ = zk.get('/openeo/jobs/ongoing/{u}/{j}'.format(u=TEST_USER, j=job_id)) @@ -556,7 +557,8 @@ def test_create_and_start_job_options(self, api, tmp_path, monkeypatch): 'somequeue', 'false', '[]', '__pyfiles__/custom_processes.py,foolib.whl', '200' ] - assert batch_job_args[22:] == [TEST_USER, job_id] + assert batch_job_args[22:24] == [TEST_USER, job_id] + assert batch_job_args[24] == 'false' def test_cancel_job(self, api, tmp_path): with self._mock_kazoo_client() as zk: