Skip to content

Commit

Permalink
add more options for maintainers to expedite XLoader runs, GitHub cka…
Browse files Browse the repository at this point in the history
…n#202

- Add CLI flag for loading immediately instead of queueing
- Allow sysadmins to specify an alternate queue to run on
  • Loading branch information
ThrawnCA committed Mar 13, 2024
1 parent 95edb97 commit d1328e8
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 53 deletions.
10 changes: 7 additions & 3 deletions ckanext/xloader/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ def xloader_submit(context, data_dict):
:rtype: bool
'''
p.toolkit.check_access('xloader_submit', context, data_dict)
custom_queue = data_dict.pop('queue', rq_jobs.DEFAULT_QUEUE_NAME)
schema = context.get('schema', ckanext.xloader.schema.xloader_submit_schema())
data_dict, errors = _validate(data_dict, schema, context)
if errors:
raise p.toolkit.ValidationError(errors)

p.toolkit.check_access('xloader_submit', context, data_dict)

res_id = data_dict['resource_id']
try:
resource_dict = p.toolkit.get_action('resource_show')(context, {
Expand Down Expand Up @@ -152,6 +152,10 @@ def xloader_submit(context, data_dict):
'original_url': resource_dict.get('url'),
}
}
if custom_queue != rq_jobs.DEFAULT_QUEUE_NAME:
# Don't automatically retry if it's a custom run
data['metadata']['tries'] = jobs.MAX_RETRIES

# Expand timeout for resources that have to be type-guessed
timeout = config.get(
'ckanext.xloader.job_timeout',
Expand All @@ -160,7 +164,7 @@ def xloader_submit(context, data_dict):

try:
job = enqueue_job(
jobs.xloader_data_into_datastore, [data],
jobs.xloader_data_into_datastore, [data], queue=custom_queue,
title="xloader_submit: package: {} resource: {}".format(resource_dict.get('package_id'), res_id),
rq_kwargs=dict(timeout=timeout)
)
Expand Down
7 changes: 7 additions & 0 deletions ckanext/xloader/auth.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from ckan import authz
from ckan.lib import jobs as rq_jobs

import ckanext.datastore.logic.auth as auth


def xloader_submit(context, data_dict):
# only sysadmins can specify a custom processing queue
custom_queue = data_dict.get('queue')
if custom_queue and custom_queue != rq_jobs.DEFAULT_QUEUE_NAME:
return authz.is_authorized('config_option_update', context, data_dict)
return auth.datastore_auth(context, data_dict)


Expand Down
11 changes: 7 additions & 4 deletions ckanext/xloader/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,26 @@ def status():
@click.argument(u'dataset-spec')
@click.option('-y', is_flag=True, default=False, help='Always answer yes to questions')
@click.option('--dry-run', is_flag=True, default=False, help='Don\'t actually submit any resources')
def submit(dataset_spec, y, dry_run):
@click.option('--queue', help='Queue name for asynchronous processing, unused if executing immediately')
@click.option('--sync', is_flag=True, default=False,
help='Execute immediately instead of enqueueing for asynchronous processing')
def submit(dataset_spec, y, dry_run, queue, sync):
"""
xloader submit [options] <dataset-spec>
"""
cmd = XloaderCmd(dry_run)

if dataset_spec == 'all':
cmd._setup_xloader_logger()
cmd._submit_all()
cmd._submit_all(sync=sync, queue=queue)
elif dataset_spec == 'all-existing':
_confirm_or_abort(y, dry_run)
cmd._setup_xloader_logger()
cmd._submit_all_existing()
cmd._submit_all_existing(sync=sync, queue=queue)
else:
pkg_name_or_id = dataset_spec
cmd._setup_xloader_logger()
cmd._submit_package(pkg_name_or_id)
cmd._submit_package(pkg_name_or_id, sync=sync, queue=queue)

if cmd.error_occured:
print('Finished but saw errors - see above for details')
Expand Down
50 changes: 32 additions & 18 deletions ckanext/xloader/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import sys
import logging
import ckan.plugins.toolkit as tk

from ckanext.xloader.jobs import xloader_data_into_datastore_
from ckanext.xloader.utils import XLoaderFormats


Expand All @@ -23,7 +25,7 @@ def _setup_xloader_logger(self):
logger.setLevel(logging.DEBUG)
logger.propagate = False # in case the config

def _submit_all_existing(self):
def _submit_all_existing(self, sync=False, queue=None):
from ckanext.datastore.backend \
import get_all_resources_ids_in_datastore
resource_ids = get_all_resources_ids_in_datastore()
Expand All @@ -38,9 +40,9 @@ def _submit_all_existing(self):
print(' Skipping resource {} found in datastore but not in '
'metadata'.format(resource_id))
continue
self._submit_resource(resource_dict, user, indent=2)
self._submit_resource(resource_dict, user, indent=2, sync=sync, queue=queue)

def _submit_all(self):
def _submit_all(self, sync=False, queue=None):
# submit every package
# for each package in the package list,
# submit each resource w/ _submit_package
Expand All @@ -51,9 +53,9 @@ def _submit_all(self):
user = tk.get_action('get_site_user')(
{'ignore_auth': True}, {})
for p_id in package_list:
self._submit_package(p_id, user, indent=2)
self._submit_package(p_id, user, indent=2, sync=sync, queue=queue)

def _submit_package(self, pkg_id, user=None, indent=0):
def _submit_package(self, pkg_id, user=None, indent=0, sync=False, queue=None):
indentation = ' ' * indent
if not user:
user = tk.get_action('get_site_user')(
Expand All @@ -73,15 +75,15 @@ def _submit_package(self, pkg_id, user=None, indent=0):
for resource in pkg['resources']:
try:
resource['package_name'] = pkg['name'] # for debug output
self._submit_resource(resource, user, indent=indent + 2)
self._submit_resource(resource, user, indent=indent + 2, sync=sync, queue=queue)
except Exception as e:
self.error_occured = True
print(e)
print(str(e))
print(indentation + 'ERROR submitting resource "{}" '.format(
resource['id']))
continue

def _submit_resource(self, resource, user, indent=0):
def _submit_resource(self, resource, user, indent=0, sync=False, queue=None):
'''resource: resource dictionary
'''
indentation = ' ' * indent
Expand All @@ -99,23 +101,35 @@ def _submit_resource(self, resource, user, indent=0):
r=resource))
return
dataset_ref = resource.get('package_name', resource['package_id'])
print('{indent}Submitting /dataset/{dataset}/resource/{r[id]}\n'
print('{indent}{sync_style} /dataset/{dataset}/resource/{r[id]}\n'
'{indent} url={r[url]}\n'
'{indent} format={r[format]}'
.format(dataset=dataset_ref, r=resource, indent=indentation))
.format(sync_style='Processing' if sync else 'Submitting',
dataset=dataset_ref, r=resource, indent=indentation))
if self.dry_run:
print(indentation + '(not submitted - dry-run)')
return
data_dict = {
'resource_id': resource['id'],
'ignore_hash': True,
}
if self.dry_run:
print(indentation + '(not submitted - dry-run)')
return
success = tk.get_action('xloader_submit')({'user': user['name']}, data_dict)
if success:
print(indentation + '...ok')
if sync:
data_dict['ckan_url'] = tk.config.get('ckan.site_url')
input_dict = {
'metadata': data_dict,
'api_key': 'TODO'
}
logger = logging.getLogger('ckanext.xloader.cli')
xloader_data_into_datastore_(input_dict, None, logger)
else:
print(indentation + 'ERROR submitting resource')
self.error_occured = True
if queue:
data_dict['queue'] = queue
success = tk.get_action('xloader_submit')({'user': user['name']}, data_dict)
if success:
print(indentation + '...ok')
else:
print(indentation + 'ERROR submitting resource')
self.error_occured = True

def print_status(self):
import ckan.lib.jobs as rq_jobs
Expand Down
4 changes: 2 additions & 2 deletions ckanext/xloader/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def xloader_status_description(status):
def is_resource_supported_by_xloader(res_dict, check_access=True):
is_supported_format = XLoaderFormats.is_it_an_xloader_format(res_dict.get('format'))
is_datastore_active = res_dict.get('datastore_active', False)
user_has_access = not check_access or toolkit.h.check_access('package_update',
{'id':res_dict.get('package_id')})
user_has_access = not check_access or toolkit.h.check_access(
'package_update', {'id': res_dict.get('package_id')})
url_type = res_dict.get('url_type')
if url_type:
try:
Expand Down
56 changes: 30 additions & 26 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
errors.LockNotAvailable,
errors.ObjectInUse,
)
# Retries can only occur in cases where the datastore entry exists,
# so use the standard timeout
RETRIED_JOB_TIMEOUT = config.get('ckanext.xloader.job_timeout', '3600')


Expand Down Expand Up @@ -85,10 +87,31 @@ def xloader_data_into_datastore(input):

job_id = get_current_job().id
errored = False

# Set-up logging to the db
handler = StoringHandler(job_id, input)
level = logging.DEBUG
handler.setLevel(level)
logger = logging.getLogger(job_id)
handler.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(handler)
# also show logs on stderr
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)

db.init(config)
try:
xloader_data_into_datastore_(input, job_dict)
# Store details of the job in the db
db.add_pending_job(job_id, **input)
xloader_data_into_datastore_(input, job_dict, logger)
job_dict['status'] = 'complete'
db.mark_job_as_completed(job_id, job_dict)
except sa.exc.IntegrityError as e:
db.mark_job_as_errored(job_id, str(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log.error('xloader error: job_id %s already exists', job_id)
errored = True
except JobError as e:
db.mark_job_as_errored(job_id, str(e))
job_dict['status'] = 'error'
Expand Down Expand Up @@ -127,7 +150,7 @@ def xloader_data_into_datastore(input):
return 'error' if errored else None


def xloader_data_into_datastore_(input, job_dict):
def xloader_data_into_datastore_(input, job_dict, logger):
'''This function:
* downloads the resource (metadata) from CKAN
* downloads the data
Expand All @@ -136,26 +159,6 @@ def xloader_data_into_datastore_(input, job_dict):
(datapusher called this function 'push_to_datastore')
'''
job_id = get_current_job().id
db.init(config)

# Store details of the job in the db
try:
db.add_pending_job(job_id, **input)
except sa.exc.IntegrityError:
raise JobError('job_id {} already exists'.format(job_id))

# Set-up logging to the db
handler = StoringHandler(job_id, input)
level = logging.DEBUG
handler.setLevel(level)
logger = logging.getLogger(job_id)
handler.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(handler)
# also show logs on stderr
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)

validate_input(input)

data = input['metadata']
Expand Down Expand Up @@ -199,10 +202,11 @@ def direct_load():
loader.calculate_record_count(
resource_id=resource['id'], logger=logger)
set_datastore_active(data, resource, logger)
job_dict['status'] = 'running_but_viewable'
callback_xloader_hook(result_url=input['result_url'],
api_key=api_key,
job_dict=job_dict)
if 'result_url' in input:
job_dict['status'] = 'running_but_viewable'
callback_xloader_hook(result_url=input['result_url'],
api_key=api_key,
job_dict=job_dict)
logger.info('Data now available to users: %s', resource_ckan_url)
loader.create_column_indexes(
fields=fields,
Expand Down
20 changes: 20 additions & 0 deletions ckanext/xloader/tests/test_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
except ImportError:
import mock

from ckan.plugins.toolkit import NotAuthorized
from ckan.tests import helpers, factories

from ckanext.xloader.utils import get_xloader_user_apitoken
Expand All @@ -30,6 +31,25 @@ def test_submit(self):
)
assert 1 == enqueue_mock.call_count

def test_submit_to_custom_queue_without_auth(self):
# check that xloader_submit doesn't allow regular users to change queues
user = factories.User()
with pytest.raises(NotAuthorized):
helpers.call_auth(
"xloader_submit",
context=dict(user=user["name"], model=None),
queue='foo',
)

def test_submit_to_custom_queue_as_sysadmin(self):
# check that xloader_submit allows sysadmins to change queues
user = factories.Sysadmin()
assert helpers.call_auth(
"xloader_submit",
context=dict(user=user["name"], model=None),
queue='foo',
) is True

def test_duplicated_submits(self):
def submit(res, user):
return helpers.call_action(
Expand Down
1 change: 1 addition & 0 deletions ckanext/xloader/tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
except ImportError:
import mock
from six import text_type as str

from ckan.tests import helpers, factories
from ckan.logic import _actions
from ckanext.xloader.plugin import _should_remove_unsupported_resource_from_datastore
Expand Down

0 comments on commit d1328e8

Please sign in to comment.