diff --git a/genie-client/src/main/python/pygenie/exceptions.py b/genie-client/src/main/python/pygenie/exceptions.py index 12b83d2f11b..d71f313d894 100644 --- a/genie-client/src/main/python/pygenie/exceptions.py +++ b/genie-client/src/main/python/pygenie/exceptions.py @@ -69,3 +69,8 @@ class GenieJobNotFoundError(GenieError): class GenieLogNotFoundError(GenieError): """Error when a log is not found.""" pass + + +class JobTimeoutError(GenieError): + """Error when a job runs longer than a specified timeout.""" + pass diff --git a/genie-client/src/main/python/pygenie/jobs/running.py b/genie-client/src/main/python/pygenie/jobs/running.py index 07af016cba3..c9d4d8f5296 100644 --- a/genie-client/src/main/python/pygenie/jobs/running.py +++ b/genie-client/src/main/python/pygenie/jobs/running.py @@ -18,6 +18,8 @@ from ..conf import GenieConf from ..utils import dttm_to_epoch +from ..exceptions import JobTimeoutError + logger = logging.getLogger('com.netflix.genie.jobs.running') @@ -600,7 +602,8 @@ def username(self): str: The username. """ - def wait(self, sleep_seconds=10, suppress_stream=False, until_running=False): + def wait(self, sleep_seconds=10, suppress_stream=False, until_running=False, + job_timeout=None, kill_after_job_timeout=False): """ Blocking call that will wait for the job to complete. @@ -615,6 +618,10 @@ def wait(self, sleep_seconds=10, suppress_stream=False, until_running=False): complete (default: False). until_running (bool, optional): If True, only block until the job status becomes 'RUNNING' (default: False). + job_timeout (int, optional): The number of seconds to wait for the + job to finish (default: None). + kill_after_job_timeout (bool, optional): Whether to kill the job or + not after the job_timeout (default: False). Returns: :py:class:`RunningJob`: self @@ -625,10 +632,22 @@ def wait(self, sleep_seconds=10, suppress_stream=False, until_running=False): statuses = {s for s in RUNNING_STATUSES \ if not until_running or s.upper() != 'RUNNING'} + start_time = time.time() + while self._adapter.get_status(self._job_id).upper() in statuses: if i % 3 == 0 and not suppress_stream: self._write_to_stream('.') time.sleep(sleep_seconds) + + # handle client-side job timeout + if (job_timeout is not None) and (time.time() - start_time > job_timeout): + if kill_after_job_timeout: + self.kill() + return self + else: + raise JobTimeoutError("Timed out while waiting for job {} to finish" \ + .format(self.job_id)) + i += 1 if not suppress_stream: diff --git a/genie-client/src/main/python/setup.py b/genie-client/src/main/python/setup.py index dcf05052674..5494572b276 100644 --- a/genie-client/src/main/python/setup.py +++ b/genie-client/src/main/python/setup.py @@ -26,7 +26,7 @@ setup( name='nflx-genie-client', - version='3.0.28', + version='3.0.29', author='Netflix Inc.', author_email='genieoss@googlegroups.com', keywords='genie hadoop cloud netflix client bigdata presto',