Skip to content

Commit

Permalink
refactor db retries
Browse files Browse the repository at this point in the history
  • Loading branch information
honzajavorek committed Sep 22, 2020
1 parent dcf506d commit 89cdc37
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 81 deletions.
5 changes: 3 additions & 2 deletions juniorguru/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from juniorguru.models.base import db
from juniorguru.models.base import db, retry_when_db_locked
from juniorguru.models.job import Job, JobDropped, JobError, JobMetric, JobNewsletterMention
from juniorguru.models.metric import Metric
from juniorguru.models.story import Story
Expand All @@ -9,4 +9,5 @@


__all__ = [db, Job, JobDropped, JobError, JobMetric, Metric, Story, Supporter,
LastModified, PressRelease, JobNewsletterMention, Logo, LogoMetric]
LastModified, PressRelease, JobNewsletterMention, Logo, LogoMetric,
retry_when_db_locked]
30 changes: 29 additions & 1 deletion juniorguru/models/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import time
import json
from pathlib import Path
from collections.abc import Set

import scrapy
from peewee import Model, SqliteDatabase
from peewee import Model, SqliteDatabase, OperationalError
from playhouse.sqlite_ext import JSONField as BaseJSONField

from juniorguru.lib.log import get_log


log = get_log('db')


db_file = Path(__file__).parent / '..' / 'data' / 'data.db'
db = SqliteDatabase(db_file, check_same_thread=False)
Expand Down Expand Up @@ -34,3 +40,25 @@ def default(o):
raise TypeError(f'Object of type {o.__class__.__name__} is not JSON serializable')

return json.dumps(value, ensure_ascii=False, default=default)


def retry_when_db_locked(db, op, stats=None, retries=10, wait_sec=0.5):
last_error = None
for i in range(retries):
try:
with db:
return op()
except OperationalError as error:
if str(error) == 'database is locked':
log.debug(f"Database operation '{op.__qualname__}' failed! ({error}, attempt: {i + 1})")
last_error = error
if stats:
stats.inc_value('database/locked_retries')
time.sleep(wait_sec * i)
else:
if stats:
stats.inc_value('database/uncaught_errors')
raise
if stats:
stats.inc_value('database/uncaught_errors')
raise last_error
68 changes: 23 additions & 45 deletions juniorguru/scrapers/monitoring.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import hashlib
import logging
import time
from functools import wraps
from pathlib import Path
from urllib.parse import urlparse

from peewee import OperationalError
from scrapy import signals

from juniorguru.models import Job, JobDropped, JobError, db
from juniorguru.lib.log import get_log
from juniorguru.models import Job, JobDropped, JobError, db, retry_when_db_locked
from juniorguru.scrapers.pipelines.database import create_id


logger = logging.getLogger(__name__)
log = get_log(__name__)


RESPONSES_BACKUP_DIR = Path('juniorguru/data/responses/').absolute()
Expand All @@ -21,43 +18,20 @@
class BackupResponseMiddleware():
def process_response(self, request, response, spider):
if hasattr(spider, 'override_response_backup_path'):
logger.debug(f"Skipping backup of '{response.url}' per spider override")
log.debug(f"Skipping backup of '{response.url}' per spider override")
else:
try:
response_text = response.text
except AttributeError:
logger.debug(f"Unable to backup '{response.url}'")
log.debug(f"Unable to backup '{response.url}'")
else:
path = url_to_backup_path(response.url)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(response_text)
logger.debug(f"Backed up '{response.url}' as '{path.absolute()}'")
log.debug(f"Backed up '{response.url}' as '{path.absolute()}'")
return response


def retry_when_db_locked(method):
@wraps(method)
def wrapper(ext, *args, **kwargs):
kwargs.pop('signal')
kwargs.pop('sender')
last_error = None
for i in range(10):
try:
return method(ext, *args, **kwargs)
except OperationalError as error:
if str(error) == 'database is locked':
logger.debug(f"Monitoring operation '{method.__name__}' failed! ({error}, attempt: {i + 1})")
last_error = error
ext.stats.inc_value('monitoring/db_locked_retries')
time.sleep(0.5 * i)
else:
ext.stats.inc_value('monitoring/uncaught_errors')
raise
ext.stats.inc_value('monitoring/uncaught_errors')
raise last_error
return wrapper


class MonitoringExtension():
def __init__(self, stats):
self.stats = stats
Expand All @@ -72,50 +46,54 @@ def from_crawler(cls, crawler):
crawler.signals.connect(ext.item_scraped, signal=signals.item_scraped)
return ext

@retry_when_db_locked
def spider_error(self, failure, response, spider):
response_data = get_response_data(spider, response.url)
with db:

def operation():
JobError.create(message=get_failure_message(failure),
trace=failure.getTraceback(),
signal='spider',
spider=spider.name,
**response_data)
self.stats.inc_value('monitoring/job_error_saved')
self.stats.inc_value('monitoring/job_error_saved')
retry_when_db_locked(db, operation, stats=self.stats)

@retry_when_db_locked
def item_error(self, item, response, spider, failure):
response_data = get_response_data(spider, response.url)
with db:

def operation():
JobError.create(message=get_failure_message(failure),
trace=failure.getTraceback(),
signal='item',
spider=spider.name,
item=item,
**response_data)
self.stats.inc_value('monitoring/job_error_saved')
self.stats.inc_value('monitoring/job_error_saved')
retry_when_db_locked(db, operation, stats=self.stats)

@retry_when_db_locked
def item_dropped(self, item, response, exception, spider):
response_data = get_response_data(spider, response.url)
with db:

def operation():
JobDropped.create(type=exception.__class__.__name__,
reason=str(exception),
item=item,
**response_data)
self.stats.inc_value('monitoring/job_dropped_saved')
self.stats.inc_value('monitoring/job_dropped_saved')
retry_when_db_locked(db, operation, stats=self.stats)

@retry_when_db_locked
def item_scraped(self, item, response, spider):
response_data = get_response_data(spider, response.url)
with db:

def operation():
job = Job.get_by_id(item.get('id', create_id(item)))
job.item = item
for attr, value in response_data.items():
setattr(job, attr, value)
job.save()
logger.debug(f"Updated job '{job.id}' with monitoring data")
self.stats.inc_value('monitoring/job_saved')
log.debug(f"Updated job '{job.id}' with monitoring data")
self.stats.inc_value('monitoring/job_saved')
retry_when_db_locked(db, operation, stats=self.stats)


def get_response_data(spider, response_url):
Expand Down
38 changes: 5 additions & 33 deletions juniorguru/scrapers/pipelines/database.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,6 @@
import hashlib
import logging
import time
from functools import wraps

from peewee import OperationalError

from juniorguru.models import Job
from juniorguru.models import db as default_db


logger = logging.getLogger(__name__)


def retry_when_db_locked(method):
@wraps(method)
def wrapper(self, *args, **kwargs):
last_error = None
for i in range(10):
try:
return method(self, *args, **kwargs)
except OperationalError as error:
if str(error) == 'database is locked':
logger.debug(f"Operation '{self.__class__.__name__}.{method.__name__}' failed! ({error}, attempt: {i + 1})")
last_error = error
time.sleep(0.5 * i)
else:
raise
raise last_error
return wrapper
from juniorguru.models import Job, retry_when_db_locked, db as default_db


class Pipeline():
Expand All @@ -40,13 +13,12 @@ def __init__(self, db=None, model=None, stats=None):
def from_crawler(cls, crawler):
return cls(stats=crawler.stats)

@retry_when_db_locked
def process_item(self, item, spider):
with self.db:
def operation():
self.model.create(**prepare_data(item, spider.name))
if self.stats:
self.stats.inc_value('item_saved_count')
return item
if self.stats:
self.stats.inc_value('item_saved_count')
return retry_when_db_locked(self.db, operation, stats=self.stats)


def prepare_data(item, spider_name):
Expand Down
56 changes: 56 additions & 0 deletions tests/test_models_base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,41 @@
from datetime import date, datetime, time

import pytest
from peewee import OperationalError

from juniorguru.models import base as models_base
from juniorguru.scrapers.items import Job


@pytest.fixture
def db():
class DummyDB():
def __init__(self):
self.entered = 0
self.exited = 0

def __enter__(self):
self.entered += 1

def __exit__(self, *args, **kwargs):
self.exited += 1

return DummyDB()


@pytest.fixture
def stats():
class DummyStats():
def __init__(self):
self.values = {}

def inc_value(self, name):
self.values.setdefault(name, 0)
self.values[name] += 1

return DummyStats()


@pytest.mark.parametrize('o,expected', [
([1, 2, 3], '[1, 2, 3]'),
(datetime(2020, 4, 30, 14, 35, 10), '"2020-04-30T14:35:10"'),
Expand All @@ -30,3 +60,29 @@ def test_json_dumps_item():
'"title": "Junior developer", '
'"employment_types": ["full-time"]'
'}')


def test_retry_when_db_locked(db, stats):
def operation():
if stats.values.get('database/locked_retries', 0) < 5:
raise OperationalError('database is locked')
return 42

_ = models_base.retry_when_db_locked(db, operation, stats=stats, wait_sec=0)

assert _ == 42
assert db.entered == 6
assert db.exited == 6
assert stats.values == {'database/locked_retries': 5}


def test_retry_when_db_locked_raises(db, stats):
def operation():
raise OperationalError('database is locked')

with pytest.raises(OperationalError):
models_base.retry_when_db_locked(db, operation, stats=stats, wait_sec=0)

assert db.entered == 10
assert db.exited == 10
assert stats.values == {'database/locked_retries': 10, 'database/uncaught_errors': 1}

0 comments on commit 89cdc37

Please sign in to comment.