Skip to content

Commit

Permalink
#34: moved soft errors flag from load_collection to job_options
Browse files Browse the repository at this point in the history
  • Loading branch information
bossie committed Mar 24, 2022
1 parent 66078e2 commit 8d0365d
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 11 deletions.
2 changes: 2 additions & 0 deletions openeogeotrellis/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,7 @@ def _start_job(self, job_id: str, user_id: str, batch_process_dependencies: Unio
max_executors = extra_options.get("max-executors", "200")
queue = extra_options.get("queue", "default")
profile = extra_options.get("profile", "false")
soft_errors = extra_options.get("soft-errors", "false")

def serialize_dependencies() -> str:
dependencies = batch_process_dependencies or job_info.get('dependencies') or []
Expand Down Expand Up @@ -1110,6 +1111,7 @@ def as_arg_element(dependency: dict) -> dict:
args.append(max_executors)
args.append(user_id)
args.append(job_id)
args.append(soft_errors)

try:
logger.info("Submitting job: {a!r}".format(a=args), extra={'job_id': job_id})
Expand Down
15 changes: 10 additions & 5 deletions openeogeotrellis/deploy/batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,11 @@ def main(argv: List[str]) -> None:
logger.info("argv: {a!r}".format(a=argv))
logger.info("pid {p}; ppid {pp}; cwd {c}".format(p=os.getpid(), pp=os.getppid(), c=os.getcwd()))

if len(argv) != 9:
if len(argv) != 10:
raise Exception(
f"usage: {argv[0]} "
"<job specification input file> <job directory> <results output file name> <user log file name> "
"<metadata file name> <api version> <dependencies> <user id>"
"<metadata file name> <api version> <dependencies> <user id> <soft errors>"
)

job_specification_file = argv[1]
Expand All @@ -240,6 +240,7 @@ def main(argv: List[str]) -> None:
api_version = argv[6]
dependencies = _deserialize_dependencies(argv[7])
user_id = argv[8]
soft_errors = argv[9].upper() == "TRUE"

_create_job_dir(job_dir)

Expand Down Expand Up @@ -278,7 +279,8 @@ def main(argv: List[str]) -> None:
def run_driver():
run_job(
job_specification=job_specification, output_file=output_file, metadata_file=metadata_file,
api_version=api_version, job_dir=job_dir, dependencies=dependencies, user_id=user_id
api_version=api_version, job_dir=job_dir, dependencies=dependencies, user_id=user_id,
soft_errors=soft_errors
)

if sc.getConf().get('spark.python.profile', 'false').lower() == 'true':
Expand Down Expand Up @@ -311,8 +313,10 @@ def run_driver():
user_facing_logger.error("Your batch job failed because workers used too much Python memory. The same task was attempted multiple times. Consider increasing executor-memoryOverhead or contact the developers to investigate.")
raise e


@log_memory
def run_job(job_specification, output_file: Path, metadata_file: Path, api_version, job_dir, dependencies: dict, user_id:str=None):
def run_job(job_specification, output_file: Path, metadata_file: Path, api_version, job_dir, dependencies: dict,
user_id: str = None, soft_errors: bool = False):
logger.info(f"Job spec: {json.dumps(job_specification,indent=1)}")
process_graph = job_specification['process_graph']

Expand All @@ -327,7 +331,8 @@ def run_job(job_specification, output_file: Path, metadata_file: Path, api_versi
'require_bounds': True,
'correlation_id': correlation_id,
'dependencies': dependencies,
"backend_implementation": backend_implementation,
'backend_implementation': backend_implementation,
'soft_errors': soft_errors
})
tracer = DryRunDataTracer()
logger.info("Starting process graph evaluation")
Expand Down
3 changes: 2 additions & 1 deletion openeogeotrellis/deploy/submit_batch_job.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pyfiles=${20}
maxexecutors=${21-500}
userId=${22}
batchJobId=${23}
softErrors=${24-false}

pysparkPython="venv/bin/python"

Expand Down Expand Up @@ -134,4 +135,4 @@ spark-submit \
--conf spark.yarn.tags=openeo \
--jars "${extensions}","${backend_assembly}" \
--name "${jobName}" \
"${main_py_file}" "$(basename "${processGraphFile}")" "${outputDir}" "${outputFileName}" "${userLogFileName}" "${metadataFileName}" "${apiVersion}" "${dependencies}" "${userId}"
"${main_py_file}" "$(basename "${processGraphFile}")" "${outputDir}" "${outputFileName}" "${userLogFileName}" "${metadataFileName}" "${apiVersion}" "${dependencies}" "${userId}" "${softErrors}"
3 changes: 2 additions & 1 deletion openeogeotrellis/deploy/submit_batch_job_spark24.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pyfiles=${20}
maxexecutors=${21-500}
userId=${22}
batchJobId=${23}
softErrors=${24-false}

pysparkPython="venv/bin/python"

Expand Down Expand Up @@ -144,4 +145,4 @@ spark-submit \
--conf spark.yarn.tags=openeo \
--jars "${extensions}","${backend_assembly}" \
--name "${jobName}" \
"${main_py_file}" "$(basename "${processGraphFile}")" "${outputDir}" "${outputFileName}" "${userLogFileName}" "${metadataFileName}" "${apiVersion}" "${dependencies}" "${userId}"
"${main_py_file}" "$(basename "${processGraphFile}")" "${outputDir}" "${outputFileName}" "${userLogFileName}" "${metadataFileName}" "${apiVersion}" "${dependencies}" "${userId}" "${softErrors}"
3 changes: 2 additions & 1 deletion openeogeotrellis/deploy/submit_batch_job_spark3.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pyfiles=${20}
maxexecutors=${21-500}
userId=${22}
batchJobId=${23}
softErrors=${24-false}

pysparkPython="venv/bin/python"

Expand Down Expand Up @@ -165,4 +166,4 @@ spark-submit \
--conf spark.yarn.tags=openeo \
--jars "${extensions}","${backend_assembly}" \
--name "${jobName}" \
"${main_py_file}" "$(basename "${processGraphFile}")" "${outputDir}" "${outputFileName}" "${userLogFileName}" "${metadataFileName}" "${apiVersion}" "${dependencies}" "${userId}"
"${main_py_file}" "$(basename "${processGraphFile}")" "${outputDir}" "${outputFileName}" "${userLogFileName}" "${metadataFileName}" "${apiVersion}" "${dependencies}" "${userId}" "${softErrors}"
2 changes: 1 addition & 1 deletion openeogeotrellis/layercatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ def sentinel_hub_pyramid():
if len(band_gsds) > 0
else jvm.geotrellis.raster.CellSize(cell_width, cell_height))

soft_errors = feature_flags.get("soft_errors", False)
soft_errors = env.get("soft_errors", False)

pyramid_factory = jvm.org.openeo.geotrellissentinelhub.PyramidFactory.withGuardedRateLimiting(
endpoint,
Expand Down
6 changes: 4 additions & 2 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,8 @@ def test_create_and_start_and_download(self, api, tmp_path, monkeypatch):
'default', 'false', '[]',
"__pyfiles__/custom_processes.py,foolib.whl", '200'
]
assert batch_job_args[22:] == [TEST_USER, job_id]
assert batch_job_args[22:24] == [TEST_USER, job_id]
assert batch_job_args[24] == 'false'

# Check metadata in zookeeper
raw, _ = zk.get('/openeo/jobs/ongoing/{u}/{j}'.format(u=TEST_USER, j=job_id))
Expand Down Expand Up @@ -556,7 +557,8 @@ def test_create_and_start_job_options(self, api, tmp_path, monkeypatch):
'somequeue', 'false', '[]',
'__pyfiles__/custom_processes.py,foolib.whl', '200'
]
assert batch_job_args[22:] == [TEST_USER, job_id]
assert batch_job_args[22:24] == [TEST_USER, job_id]
assert batch_job_args[24] == 'false'

def test_cancel_job(self, api, tmp_path):
with self._mock_kazoo_client() as zk:
Expand Down

0 comments on commit 8d0365d

Please sign in to comment.