Skip to content

Commit

Permalink
add job timeout to RunningJob.wait() for client-side timeout in Pytho…
Browse files Browse the repository at this point in the history
…n client (#359)
  • Loading branch information
irontablee authored Aug 18, 2016
1 parent 5975a12 commit 0461777
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 2 deletions.
5 changes: 5 additions & 0 deletions genie-client/src/main/python/pygenie/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,8 @@ class GenieJobNotFoundError(GenieError):
class GenieLogNotFoundError(GenieError):
"""Error when a log is not found."""
pass


class JobTimeoutError(GenieError):
"""Error when a job runs longer than a specified timeout."""
pass
21 changes: 20 additions & 1 deletion genie-client/src/main/python/pygenie/jobs/running.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from ..conf import GenieConf
from ..utils import dttm_to_epoch

from ..exceptions import JobTimeoutError


logger = logging.getLogger('com.netflix.genie.jobs.running')

Expand Down Expand Up @@ -600,7 +602,8 @@ def username(self):
str: The username.
"""

def wait(self, sleep_seconds=10, suppress_stream=False, until_running=False):
def wait(self, sleep_seconds=10, suppress_stream=False, until_running=False,
job_timeout=None, kill_after_job_timeout=False):
"""
Blocking call that will wait for the job to complete.
Expand All @@ -615,6 +618,10 @@ def wait(self, sleep_seconds=10, suppress_stream=False, until_running=False):
complete (default: False).
until_running (bool, optional): If True, only block until the job
status becomes 'RUNNING' (default: False).
job_timeout (int, optional): The number of seconds to wait for the
job to finish (default: None).
kill_after_job_timeout (bool, optional): Whether to kill the job or
not after the job_timeout (default: False).
Returns:
:py:class:`RunningJob`: self
Expand All @@ -625,10 +632,22 @@ def wait(self, sleep_seconds=10, suppress_stream=False, until_running=False):
statuses = {s for s in RUNNING_STATUSES \
if not until_running or s.upper() != 'RUNNING'}

start_time = time.time()

while self._adapter.get_status(self._job_id).upper() in statuses:
if i % 3 == 0 and not suppress_stream:
self._write_to_stream('.')
time.sleep(sleep_seconds)

# handle client-side job timeout
if (job_timeout is not None) and (time.time() - start_time > job_timeout):
if kill_after_job_timeout:
self.kill()
return self
else:
raise JobTimeoutError("Timed out while waiting for job {} to finish" \
.format(self.job_id))

i += 1

if not suppress_stream:
Expand Down
2 changes: 1 addition & 1 deletion genie-client/src/main/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

setup(
name='nflx-genie-client',
version='3.0.28',
version='3.0.29',
author='Netflix Inc.',
author_email='genieoss@googlegroups.com',
keywords='genie hadoop cloud netflix client bigdata presto',
Expand Down

0 comments on commit 0461777

Please sign in to comment.