Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QOLSVC-4689] add CLI option to process datasets immediately, #202 #82

Merged
merged 4 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ckanext/xloader/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ def xloader_submit(context, data_dict):

:rtype: bool
'''
p.toolkit.check_access('xloader_submit', context, data_dict)
custom_queue = data_dict.pop('queue', rq_jobs.DEFAULT_QUEUE_NAME)
schema = context.get('schema', ckanext.xloader.schema.xloader_submit_schema())
data_dict, errors = _validate(data_dict, schema, context)
if errors:
raise p.toolkit.ValidationError(errors)

p.toolkit.check_access('xloader_submit', context, data_dict)

res_id = data_dict['resource_id']
try:
resource_dict = p.toolkit.get_action('resource_show')(context, {
Expand Down Expand Up @@ -160,7 +160,7 @@ def xloader_submit(context, data_dict):

try:
job = enqueue_job(
jobs.xloader_data_into_datastore, [data], rq_kwargs=dict(timeout=timeout)
jobs.xloader_data_into_datastore, [data], queue=custom_queue, rq_kwargs=dict(timeout=timeout)
)
except Exception:
log.exception('Unable to enqueued xloader res_id=%s', res_id)
Expand Down
7 changes: 7 additions & 0 deletions ckanext/xloader/auth.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from ckan import authz
from ckan.lib import jobs as rq_jobs

import ckanext.datastore.logic.auth as auth


def xloader_submit(context, data_dict):
# only sysadmins can specify a custom processing queue
custom_queue = data_dict.get('queue')
if custom_queue and custom_queue != rq_jobs.DEFAULT_QUEUE_NAME:
return authz.is_authorized('config_option_update', context, data_dict)
return auth.datastore_auth(context, data_dict)


Expand Down
11 changes: 7 additions & 4 deletions ckanext/xloader/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,26 @@ def status():
@click.argument(u'dataset-spec')
@click.option('-y', is_flag=True, default=False, help='Always answer yes to questions')
@click.option('--dry-run', is_flag=True, default=False, help='Don\'t actually submit any resources')
def submit(dataset_spec, y, dry_run):
@click.option('--queue', help='Queue name for asynchronous processing, unused if executing immediately')
@click.option('--sync', is_flag=True, default=False,
help='Execute immediately instead of enqueueing for asynchronous processing')
def submit(dataset_spec, y, dry_run, queue, sync):
"""
xloader submit [options] <dataset-spec>
"""
cmd = XloaderCmd(dry_run)

if dataset_spec == 'all':
cmd._setup_xloader_logger()
cmd._submit_all()
cmd._submit_all(sync=sync, queue=queue)
elif dataset_spec == 'all-existing':
_confirm_or_abort(y, dry_run)
cmd._setup_xloader_logger()
cmd._submit_all_existing()
cmd._submit_all_existing(sync=sync, queue=queue)
else:
pkg_name_or_id = dataset_spec
cmd._setup_xloader_logger()
cmd._submit_package(pkg_name_or_id)
cmd._submit_package(pkg_name_or_id, sync=sync, queue=queue)

if cmd.error_occured:
print('Finished but saw errors - see above for details')
Expand Down
50 changes: 32 additions & 18 deletions ckanext/xloader/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import sys
import logging
import ckan.plugins.toolkit as tk

from ckanext.xloader.jobs import xloader_data_into_datastore_
from ckanext.xloader.utils import XLoaderFormats


Expand All @@ -23,7 +25,7 @@ def _setup_xloader_logger(self):
logger.setLevel(logging.DEBUG)
logger.propagate = False # in case the config

def _submit_all_existing(self):
def _submit_all_existing(self, sync=False, queue=None):
from ckanext.datastore.backend \
import get_all_resources_ids_in_datastore
resource_ids = get_all_resources_ids_in_datastore()
Expand All @@ -38,9 +40,9 @@ def _submit_all_existing(self):
print(' Skipping resource {} found in datastore but not in '
'metadata'.format(resource_id))
continue
self._submit_resource(resource_dict, user, indent=2)
self._submit_resource(resource_dict, user, indent=2, sync=sync, queue=queue)

def _submit_all(self):
def _submit_all(self, sync=False, queue=None):
# submit every package
# for each package in the package list,
# submit each resource w/ _submit_package
Expand All @@ -51,9 +53,9 @@ def _submit_all(self):
user = tk.get_action('get_site_user')(
{'ignore_auth': True}, {})
for p_id in package_list:
self._submit_package(p_id, user, indent=2)
self._submit_package(p_id, user, indent=2, sync=sync, queue=queue)

def _submit_package(self, pkg_id, user=None, indent=0):
def _submit_package(self, pkg_id, user=None, indent=0, sync=False, queue=None):
indentation = ' ' * indent
if not user:
user = tk.get_action('get_site_user')(
Expand All @@ -73,15 +75,15 @@ def _submit_package(self, pkg_id, user=None, indent=0):
for resource in pkg['resources']:
try:
resource['package_name'] = pkg['name'] # for debug output
self._submit_resource(resource, user, indent=indent + 2)
self._submit_resource(resource, user, indent=indent + 2, sync=sync, queue=queue)
except Exception as e:
self.error_occured = True
print(e)
print(str(e))
print(indentation + 'ERROR submitting resource "{}" '.format(
resource['id']))
continue

def _submit_resource(self, resource, user, indent=0):
def _submit_resource(self, resource, user, indent=0, sync=False, queue=None):
'''resource: resource dictionary
'''
indentation = ' ' * indent
Expand All @@ -99,23 +101,35 @@ def _submit_resource(self, resource, user, indent=0):
r=resource))
return
dataset_ref = resource.get('package_name', resource['package_id'])
print('{indent}Submitting /dataset/{dataset}/resource/{r[id]}\n'
print('{indent}{sync_style} /dataset/{dataset}/resource/{r[id]}\n'
'{indent} url={r[url]}\n'
'{indent} format={r[format]}'
.format(dataset=dataset_ref, r=resource, indent=indentation))
.format(sync_style='Processing' if sync else 'Submitting',
dataset=dataset_ref, r=resource, indent=indentation))
if self.dry_run:
print(indentation + '(not submitted - dry-run)')
return
data_dict = {
'resource_id': resource['id'],
'ignore_hash': True,
}
if self.dry_run:
print(indentation + '(not submitted - dry-run)')
return
success = tk.get_action('xloader_submit')({'user': user['name']}, data_dict)
if success:
print(indentation + '...ok')
if sync:
data_dict['ckan_url'] = tk.config.get('ckan.site_url')
input_dict = {
'metadata': data_dict,
'api_key': 'TODO'
}
logger = logging.getLogger('ckanext.xloader.cli')
xloader_data_into_datastore_(input_dict, None, logger)
else:
print(indentation + 'ERROR submitting resource')
self.error_occured = True
if queue:
data_dict['queue'] = queue
success = tk.get_action('xloader_submit')({'user': user['name']}, data_dict)
if success:
print(indentation + '...ok')
else:
print(indentation + 'ERROR submitting resource')
self.error_occured = True

def print_status(self):
import ckan.lib.jobs as rq_jobs
Expand Down
58 changes: 30 additions & 28 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,36 @@ def xloader_data_into_datastore(input):

job_id = get_current_job().id
errored = False

# Set-up logging to the db
handler = StoringHandler(job_id, input)
level = logging.DEBUG
handler.setLevel(level)
logger = logging.getLogger(job_id)
handler.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(handler)
# also show logs on stderr
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)

db.init(config)
try:
xloader_data_into_datastore_(input, job_dict)
# Store details of the job in the db
db.add_pending_job(job_id, **input)
xloader_data_into_datastore_(input, job_dict, logger)
job_dict['status'] = 'complete'
db.mark_job_as_completed(job_id, job_dict)
except sa.exc.IntegrityError as e:
db.mark_job_as_errored(job_id, str(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log.error('xloader error: job_id %s already exists', job_id)
errored = True
except JobError as e:
db.mark_job_as_errored(job_id, str(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
log.error('xloader error: %s, %s', e, traceback.format_exc())
errored = True
except Exception as e:
if isinstance(e, RETRYABLE_ERRORS):
Expand All @@ -114,7 +135,7 @@ def xloader_data_into_datastore(input):
job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
log.error('xloader error: %s, %s', e, traceback.format_exc())
errored = True
finally:
# job_dict is defined in xloader_hook's docstring
Expand All @@ -125,7 +146,7 @@ def xloader_data_into_datastore(input):
return 'error' if errored else None


def xloader_data_into_datastore_(input, job_dict):
def xloader_data_into_datastore_(input, job_dict, logger):
'''This function:
* downloads the resource (metadata) from CKAN
* downloads the data
Expand All @@ -134,26 +155,6 @@ def xloader_data_into_datastore_(input, job_dict):

(datapusher called this function 'push_to_datastore')
'''
job_id = get_current_job().id
db.init(config)

# Store details of the job in the db
try:
db.add_pending_job(job_id, **input)
except sa.exc.IntegrityError:
raise JobError('job_id {} already exists'.format(job_id))

# Set-up logging to the db
handler = StoringHandler(job_id, input)
level = logging.DEBUG
handler.setLevel(level)
logger = logging.getLogger(job_id)
handler.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(handler)
# also show logs on stderr
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)

validate_input(input)

data = input['metadata']
Expand Down Expand Up @@ -197,10 +198,11 @@ def direct_load():
loader.calculate_record_count(
resource_id=resource['id'], logger=logger)
set_datastore_active(data, resource, logger)
job_dict['status'] = 'running_but_viewable'
callback_xloader_hook(result_url=input['result_url'],
api_key=api_key,
job_dict=job_dict)
if 'result_url' in input:
job_dict['status'] = 'running_but_viewable'
callback_xloader_hook(result_url=input['result_url'],
api_key=api_key,
job_dict=job_dict)
logger.info('Data now available to users: %s', resource_ckan_url)
loader.create_column_indexes(
fields=fields,
Expand Down
20 changes: 20 additions & 0 deletions ckanext/xloader/tests/test_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
except ImportError:
import mock

from ckan.plugins.toolkit import NotAuthorized
from ckan.tests import helpers, factories

from ckanext.xloader.utils import get_xloader_user_apitoken
Expand All @@ -30,6 +31,25 @@ def test_submit(self):
)
assert 1 == enqueue_mock.call_count

def test_submit_to_custom_queue_without_auth(self):
# check that xloader_submit doesn't allow regular users to change queues
user = factories.User()
with pytest.raises(NotAuthorized):
helpers.call_auth(
"xloader_submit",
context=dict(user=user["name"], model=None),
queue='foo',
)

def test_submit_to_custom_queue_as_sysadmin(self):
# check that xloader_submit allows sysadmins to change queues
user = factories.Sysadmin()
assert helpers.call_auth(
"xloader_submit",
context=dict(user=user["name"], model=None),
queue='foo',
) is True

def test_duplicated_submits(self):
def submit(res, user):
return helpers.call_action(
Expand Down
2 changes: 1 addition & 1 deletion ckanext/xloader/tests/test_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ def test_simple_large_file(self, Session):
csv_filepath = get_sample_filepath("simple-large.csv")
resource = factories.Resource()
resource_id = resource['id']
fields = loader.load_table(
loader.load_table(
csv_filepath,
resource_id=resource_id,
mimetype="text/csv",
Expand Down
4 changes: 2 additions & 2 deletions ckanext/xloader/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import ckan.plugins as p
from ckan.plugins.toolkit import config

from .job_exceptions import JobError

# resource.formats accepted by ckanext-xloader. Must be lowercase here.
DEFAULT_FORMATS = [
"csv",
Expand All @@ -26,8 +28,6 @@
"application/vnd.oasis.opendocument.spreadsheet",
]

from .job_exceptions import JobError


class XLoaderFormats(object):
formats = None
Expand Down
Loading