Skip to content

Commit

Permalink
Instrument using custom attributes + add Worker tests (#18)
Browse files Browse the repository at this point in the history
* Refactor worker to instrument errors
* Set up tests and test worker.run
* Add tests for worker.handle_job
* Add Github action to run tests
  • Loading branch information
hammady authored Dec 18, 2023
1 parent fe559b7 commit 03e31c6
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 41 deletions.
22 changes: 22 additions & 0 deletions .github/workflows/run-tests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: Run tests

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]

jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Python 3.11
uses: actions/setup-python@v2
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements-test.txt
- name: Run tests
run: pytest
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@
*.pyc
/dist
/build
*.egg-info
*.egg-info
.venv

6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ Youc an also provide a logger class (from `logging` module) to have full control
- No access to your Ruby classes, you should implement all your logic from scratch in Python
- Reads only raw attributes of jobs from the database (job table columns), no relations
- Assumes you only need to call the `run` method in your job with no arguments
- No unit tests

## Contribute

Expand All @@ -127,6 +126,11 @@ Install the code for development:

Do your changes, then send a pull request.

## Test

pip install -r requirements-test.txt
pytest

## Publish

### Using Python
Expand Down
11 changes: 9 additions & 2 deletions pyworker/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@ def __new__(meta, name, bases, class_dict):
class Job(object, metaclass=Meta):
"""docstring for Job"""
def __init__(self, class_name, database, logger,
job_id, queue, run_at, attempts=0, max_attempts=1, attributes=None):
job_id, queue, run_at, attempts=0, max_attempts=1,
attributes=None, abstract=False):
super(Job, self).__init__()
self.class_name = class_name
self.database = database
self.logger = logger
self.job_id = job_id
self.job_name = '%s#run' % class_name
self.attempts = attempts
self.run_at = run_at
self.queue = queue
self.max_attempts = max_attempts
self.attributes = attributes
self.abstract = abstract

def __str__(self):
return "%s: %s" % (self.__class__.__name__, str(self.__dict__))
Expand Down Expand Up @@ -69,7 +72,8 @@ def extract_attributes(lines):
return Job(class_name=class_name, logger=logger,
max_attempts=max_attempts,
job_id=job_id, attempts=attempts,
run_at=run_at, queue=queue, database=database)
run_at=run_at, queue=queue, database=database,
abstract=True)

attributes = extract_attributes(handler[2:])
logger.debug("Found attributes: %s" % str(attributes))
Expand Down Expand Up @@ -100,6 +104,7 @@ def success(self):
self.logger.debug("Running Job.success hook")

def set_error_unlock(self, error):
failed = False
self.logger.error('Job %d raised error: %s' % (self.job_id, error))
# run error hook
self.error(error)
Expand All @@ -115,6 +120,7 @@ def set_error_unlock(self, error):
error
]
if self.attempts >= self.max_attempts:
failed = True
# set failed_at = now
setters.append('failed_at = %s')
values.append(now)
Expand All @@ -130,6 +136,7 @@ def set_error_unlock(self, error):
self.logger.debug('set error values: %s' % str(values))
self.database.cursor().execute(query, tuple(values))
self.database.commit()
return failed

def remove(self):
self.logger.debug('Job %d finished successfully' % self.job_id)
Expand Down
87 changes: 50 additions & 37 deletions pyworker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,16 @@ def _latency(job_run_at):

with newrelic.agent.BackgroundTask(
application=self.newrelic_app,
name='%s#run' % job.class_name,
name=job.job_name,
group='DelayedJob') as task:

# Record a custom metrics
# 1) Custom/DelayedJobQueueLatency/<job.queue> => latency
# 2) Custom/DelayedJobMethodLatency/<job.name> => latency
# 3) Custom/DelayedJobMethodAttempts/<job.name> => attempts
newrelic.agent.record_custom_metrics([
('Custom/DelayedJobQueueLatency/%s' % job.queue, latency),
('Custom/DelayedJobMethodLatency/%s' % job.class_name, latency),
('Custom/DelayedJobMethodAttempts/%s' % job.class_name, job.attempts)
], application=self.newrelic_app)
# Record custom attributes for the job transaction
newrelic.agent.add_custom_attribute('job_id', job.job_id)
newrelic.agent.add_custom_attribute('job_name', job.job_name)
newrelic.agent.add_custom_attribute('job_queue', job.queue)
newrelic.agent.add_custom_attribute('job_latency', latency)
newrelic.agent.add_custom_attribute('job_attempts', job.attempts)
# TODO report job.enqueue_attributes if available

yield task
else:
Expand All @@ -100,35 +98,11 @@ def run(self):
self.logger.debug('Picking up jobs...')
job = self.get_job()
self._current_job = job # used in signal handlers
start_time = time.time()
try:
if type(job) == Job:
raise ValueError(('Unsupported Job: %s, please import it ' \
+ 'before you can handle it') % job.class_name)
elif job is not None:
with self._instrument(job):
self.logger.info('Running Job %d' % job.job_id)
with self._time_limit(self.max_run_time):
job.before()
job.run()
job.after()
job.success()
job.remove()
except Exception as exception:
if job is not None:
error_str = traceback.format_exc()
job.set_error_unlock(error_str)
if type(exception) == TerminatedException:
break
finally:
if job is not None:
time_diff = time.time() - start_time
self.logger.info('Job %d finished in %d seconds' % \
(job.job_id, time_diff))

# Sleep for a while between each job and break if received SIGTERM
try:
time.sleep(self.sleep_delay)
self.handle_job(job)
else: # sleep for a while before checking again for new jobs
time.sleep(self.sleep_delay)
except TerminatedException:
break

Expand Down Expand Up @@ -165,3 +139,42 @@ def get_job_row():
database=self.database, logger=self.logger)
else:
return None

def handle_job(self, job):
if job is None:
return
with self._instrument(job):
start_time = time.time()
error = failed = False
caught_exception = None
try:
if job.abstract:
raise ValueError(('Unsupported Job: %s, please import it ' \
+ 'before you can handle it') % job.class_name)
else:
self.logger.info('Running Job %d' % job.job_id)
with self._time_limit(self.max_run_time):
job.before()
job.run()
job.after()
job.success()
job.remove()
except Exception as exception:
error = True
caught_exception = exception
# handle error
error_str = traceback.format_exc()
failed = job.set_error_unlock(error_str)
# if that was a termination error, bubble up to caller
if type(exception) == TerminatedException:
raise exception
finally:
# report error status
if self.newrelic_app:
newrelic.agent.add_custom_attribute('error', error)
newrelic.agent.add_custom_attribute('job_failure', failed)
if caught_exception:
newrelic.agent.record_exception(caught_exception)
time_diff = time.time() - start_time
self.logger.info('Job %d finished in %d seconds' % \
(job.job_id, time_diff))
6 changes: 6 additions & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pytest==7.4.3
pytest-cov==4.1.0
newrelic==9.3.0
PyYAML==6.0.1
python-dateutil==2.6.0
psycopg2-binary==2.9.9
Empty file added tests/__init__.py
Empty file.
Loading

0 comments on commit 03e31c6

Please sign in to comment.