Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add job timeout to RunningJob.wait() for client-side timeout in Python client #359

Merged
merged 1 commit into from
Aug 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions genie-client/src/main/python/pygenie/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,8 @@ class GenieJobNotFoundError(GenieError):
class GenieLogNotFoundError(GenieError):
"""Error when a log is not found."""
pass


class JobTimeoutError(GenieError):
"""Error when a job runs longer than a specified timeout."""
pass
21 changes: 20 additions & 1 deletion genie-client/src/main/python/pygenie/jobs/running.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from ..conf import GenieConf
from ..utils import dttm_to_epoch

from ..exceptions import JobTimeoutError


logger = logging.getLogger('com.netflix.genie.jobs.running')

Expand Down Expand Up @@ -600,7 +602,8 @@ def username(self):
str: The username.
"""

def wait(self, sleep_seconds=10, suppress_stream=False, until_running=False):
def wait(self, sleep_seconds=10, suppress_stream=False, until_running=False,
job_timeout=None, kill_after_job_timeout=False):
"""
Blocking call that will wait for the job to complete.

Expand All @@ -615,6 +618,10 @@ def wait(self, sleep_seconds=10, suppress_stream=False, until_running=False):
complete (default: False).
until_running (bool, optional): If True, only block until the job
status becomes 'RUNNING' (default: False).
job_timeout (int, optional): The number of seconds to wait for the
job to finish (default: None).
kill_after_job_timeout (bool, optional): Whether to kill the job or
not after the job_timeout (default: False).

Returns:
:py:class:`RunningJob`: self
Expand All @@ -625,10 +632,22 @@ def wait(self, sleep_seconds=10, suppress_stream=False, until_running=False):
statuses = {s for s in RUNNING_STATUSES \
if not until_running or s.upper() != 'RUNNING'}

start_time = time.time()

while self._adapter.get_status(self._job_id).upper() in statuses:
if i % 3 == 0 and not suppress_stream:
self._write_to_stream('.')
time.sleep(sleep_seconds)

# handle client-side job timeout
if (job_timeout is not None) and (time.time() - start_time > job_timeout):
if kill_after_job_timeout:
self.kill()
return self
else:
raise JobTimeoutError("Timed out while waiting for job {} to finish" \
.format(self.job_id))

i += 1

if not suppress_stream:
Expand Down
2 changes: 1 addition & 1 deletion genie-client/src/main/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

setup(
name='nflx-genie-client',
version='3.0.28',
version='3.0.29',
author='Netflix Inc.',
author_email='genieoss@googlegroups.com',
keywords='genie hadoop cloud netflix client bigdata presto',
Expand Down