Skip to content

Commit

Permalink
Add support for max_backoff_delay_seconds (#31)
Browse files Browse the repository at this point in the history
* add support for max delta

* rename max_delta to max_backoff_delay_seconds

* replace -1 with None for max_backoff_delay_seconds

* update readme

* add max_backoff_delay_seconds to the worker class

* add assertion in worker's test for max_backoff_delay_seconds

* make sure max_backoff_delay_seconds is 2 or more seconds

* add test for when max_backoff_delay_seconds_less_than_minimum and make max_backoff_delay_seconds unbounded
  • Loading branch information
amamd07 authored Jan 23, 2025
1 parent 1aac04d commit bacf943
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 11 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ w.sleep_delay = 3
# maximum attempts before marking the job as permanently failing (default 3)
w.max_attempts = 5

# sets a cap on how long a worker should wait between retry attempts.
# if not set, the backoff delay time will grow exponentially (minimum is 5)
w.max_backoff_delay_seconds = 60

# maximum run time allowed for the job, before it expires (default 3600)
w.max_run_time = 14400

Expand Down
15 changes: 11 additions & 4 deletions pyworker/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ class Job(object, metaclass=Meta):
def __init__(self, class_name, database, logger,
job_id, queue, run_at, attempts=0, max_attempts=1,
attributes=None, abstract=False, extra_fields=None,
reporter=None):
reporter=None, max_backoff_delay_seconds=None):
super(Job, self).__init__()
self.class_name = class_name
self.database = database
self.logger = logger
self.job_id = job_id
self.job_name = '%s#run' % class_name
self.attempts = attempts
if max_backoff_delay_seconds:
max_backoff_delay_seconds = max(max_backoff_delay_seconds, 5) # max_backoff_delay_seconds can not be less than 5 seconds
self.max_backoff_delay_seconds = max_backoff_delay_seconds
self.run_at = run_at
self.queue = queue
self.max_attempts = max_attempts
Expand All @@ -41,7 +44,7 @@ def __str__(self):

@classmethod
def from_row(cls, job_row, max_attempts, database, logger,
extra_fields=None, reporter=None):
extra_fields=None, reporter=None, max_backoff_delay_seconds=None):
'''job_row is a tuple of (id, attempts, run_at, queue, handler, *extra_fields)'''
def extract_class_name(line):
regex = re.compile('object: !ruby/object:(.+)')
Expand Down Expand Up @@ -84,7 +87,8 @@ def extract_extra_fields(extra_fields, extra_field_values):
job_id=job_id, attempts=attempts,
run_at=run_at, queue=queue, database=database,
abstract=True, extra_fields=extra_fields_dict,
reporter=reporter)
reporter=reporter, max_backoff_delay_seconds=max_backoff_delay_seconds
)

attributes = extract_attributes(handler[2:])
logger.debug("Found attributes: %s" % str(attributes))
Expand All @@ -99,7 +103,8 @@ def extract_extra_fields(extra_fields, extra_field_values):
max_attempts=max_attempts,
attributes=payload['object']['attributes'],
abstract=False, extra_fields=extra_fields_dict,
reporter=reporter)
reporter=reporter, max_backoff_delay_seconds=max_backoff_delay_seconds
)

def before(self):
self.logger.debug("Running Job.before hook")
Expand Down Expand Up @@ -145,6 +150,8 @@ def set_error_unlock(self, error):
# set new exponential run_at
setters.append('run_at = %s')
delta = (self.attempts**4) + 5
if self.max_backoff_delay_seconds and delta > self.max_backoff_delay_seconds:
delta = self.max_backoff_delay_seconds
values.append(str(now + get_time_delta(seconds=delta)))

self._update_job(setters, values)
Expand Down
7 changes: 5 additions & 2 deletions pyworker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ class TerminatedException(Exception): pass
class Worker(object):
def __init__(self, dbstring, logger=None,
extra_delayed_job_fields=None,
reported_attributes_prefix=''):
reported_attributes_prefix='',
max_backoff_delay_seconds=None):
super(Worker, self).__init__()
self.logger = Logger(logger)
self.logger.info('Starting pyworker...')
self.database = DBConnector(dbstring, self.logger)
self.sleep_delay = 10
self.max_attempts = 3
self.max_run_time = 3600
self.max_backoff_delay_seconds = max_backoff_delay_seconds
self.queue_names = 'default'
hostname = os.uname()[1]
pid = os.getpid()
Expand Down Expand Up @@ -147,7 +149,8 @@ def get_job_row():
return Job.from_row(job_row, max_attempts=self.max_attempts,
database=self.database, logger=self.logger,
extra_fields=self.extra_delayed_job_fields,
reporter=self.reporter)
reporter=self.reporter, max_backoff_delay_seconds=self.max_backoff_delay_seconds
)
else:
return None

Expand Down
37 changes: 33 additions & 4 deletions tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def load_fixture(self, filename):
with open('tests/fixtures/%s' % filename) as f:
return f.read()

def load_job(self, filename):
def load_job(self, filename, max_backoff_delay_seconds=None):
mock_handler = self.load_fixture(filename)
mock_row = (
self.mock_job_id,
Expand All @@ -43,7 +43,9 @@ def load_job(self, filename):
)
return Job.from_row(mock_row,
self.mock_max_attempts,
MagicMock(), MagicMock())
MagicMock(), MagicMock(),
max_backoff_delay_seconds=max_backoff_delay_seconds
)

def load_job_with_extra_fields(self, filename):
mock_handler = self.load_fixture(filename)
Expand Down Expand Up @@ -71,8 +73,8 @@ def load_unregistered_job_with_reporter(self, reporter):
job.reporter = reporter
return job

def load_registered_job(self):
job = self.load_job('handler_registered.yaml')
def load_registered_job(self, max_backoff_delay_seconds=None):
job = self.load_job('handler_registered.yaml', max_backoff_delay_seconds=max_backoff_delay_seconds)
job.error = MagicMock()
job.failure = MagicMock()
job._update_job = MagicMock()
Expand Down Expand Up @@ -210,6 +212,33 @@ def test_set_error_unlock_if_max_attempts_not_exceeded_updates_run_at_exponentia

self.assert_job_updated_run_at(job, attempts=0, expected_value=datetime.datetime(2023, 10, 7, 0, 0, 6))

## run_at
@patch('pyworker.job.get_current_time')
def test_set_error_unlock_if_delta_doesnt_exceed_max_backoff_delay_seconds(
self, mock_get_current_time):
mock_get_current_time.return_value = self.mock_now
job = self.load_registered_job(max_backoff_delay_seconds=8)

self.assert_job_updated_run_at(job, attempts=0, expected_value=datetime.datetime(2023, 10, 7, 0, 0, 6))

## run_at
@patch('pyworker.job.get_current_time')
def test_set_error_unlock_if_delta_exceeds_max_backoff_delay_seconds(
self, mock_get_current_time):
mock_get_current_time.return_value = self.mock_now
job = self.load_registered_job(max_backoff_delay_seconds=5)

self.assert_job_updated_run_at(job, attempts=0, expected_value=datetime.datetime(2023, 10, 7, 0, 0, 5))

## run_at
@patch('pyworker.job.get_current_time')
def test_set_error_unlock_if_max_backoff_delay_seconds_less_than_minimum(
self, mock_get_current_time):
mock_get_current_time.return_value = self.mock_now
job = self.load_registered_job(max_backoff_delay_seconds=2)

self.assert_job_updated_run_at(job, attempts=0, expected_value=datetime.datetime(2023, 10, 7, 0, 0, 5))

@patch('pyworker.job.get_current_time')
def test_set_error_unlock_if_max_attempts_not_exceeded_updates_run_at_exponentially_when_attempts_1(
self, mock_get_current_time):
Expand Down
3 changes: 2 additions & 1 deletion tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ def tearDown(self):
@patch('pyworker.worker.os.getpid', return_value=1234)
@patch('pyworker.worker.DBConnector')
def test_worker_init(self, mock_db, *_):
worker = Worker('dummy')
worker = Worker('dummy', max_backoff_delay_seconds=60)

self.assertEqual(worker.database, mock_db.return_value)
self.assertEqual(worker.sleep_delay, 10)
self.assertEqual(worker.max_attempts, 3)
self.assertEqual(worker.max_run_time, 3600)
self.assertEqual(worker.max_backoff_delay_seconds, 60)
self.assertEqual(worker.queue_names, 'default')
self.assertEqual(worker.name, 'host:localhost pid:1234')
self.assertIsNone(worker.extra_delayed_job_fields)
Expand Down

0 comments on commit bacf943

Please sign in to comment.