Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quality of life improvements #195

Merged
merged 7 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: 2
registries:
python-index-pypi-org:
type: python-index
url: https://pypi.org/
replaces-base: true
username: "${{secrets.PYTHON_INDEX_PYPI_ORG_USERNAME}}"
password: "${{secrets.PYTHON_INDEX_PYPI_ORG_PASSWORD}}"

updates:
- package-ecosystem: pip
directory: "/"
schedule:
interval: daily
time: "19:00"
open-pull-requests-limit: 10
registries:
- python-index-pypi-org
6 changes: 6 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ This setting is shared with other plugins that download resource files, such as

ckan.download_proxy = http://my-proxy:1234/

You may also wish to configure the database to use your preferred date input style on COPY.
For example, to make [PostgreSQL](https://www.postgresql.org/docs/current/runtime-config-client.html#RUNTIME-CONFIG-CLIENT-FORMAT)
expect European (day-first) dates, you could add to ``postgresql.conf``:

datestyle=ISO,DMY

------------------------
Developer installation
------------------------
Expand Down
11 changes: 9 additions & 2 deletions ckanext/xloader/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,17 @@ def xloader_submit(context, data_dict):
'original_url': resource_dict.get('url'),
}
}
timeout = config.get('ckanext.xloader.job_timeout', '3600')
# Expand timeout for resources that have to be type-guessed
timeout = config.get(
'ckanext.xloader.job_timeout',
'3600' if utils.datastore_resource_exists(res_id) else '10800')
log.debug("Timeout for XLoading resource %s is %s", res_id, timeout)

try:
job = enqueue_job(
jobs.xloader_data_into_datastore, [data], rq_kwargs=dict(timeout=timeout)
jobs.xloader_data_into_datastore, [data],
title="xloader_submit: package: {} resource: {}".format(resource_dict.get('package_id'), res_id),
rq_kwargs=dict(timeout=timeout)
)
except Exception:
log.exception('Unable to enqueued xloader res_id=%s', res_id)
Expand Down
13 changes: 10 additions & 3 deletions ckanext/xloader/config_declaration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ groups:
default: 1_000_000_000
example: 100000
description: |
The connection string for the jobs database used by XLoader. The
default of an sqlite file is fine for development. For production use a
Postgresql database.
The maximum file size that XLoader will attempt to load.
type: int
required: false
- key: ckanext.xloader.use_type_guessing
Expand All @@ -48,6 +46,15 @@ groups:
type: bool
required: false
legacy_key: ckanext.xloader.just_load_with_messytables
- key: ckanext.xloader.max_type_guessing_length
default: 0
example: 100000
description: |
The maximum file size that will be passed to Tabulator if the
use_type_guessing flag is enabled. Larger files will use COPY even if
the flag is set. Defaults to 1/10 of the maximum content length.
type: int
required: false
- key: ckanext.xloader.parse_dates_dayfirst
default: False
example: False
Expand Down
10 changes: 1 addition & 9 deletions ckanext/xloader/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,7 @@ def add_pending_job(job_id, job_type, api_key,
if not metadata:
metadata = {}

conn = ENGINE.connect()
trans = conn.begin()
try:
with ENGINE.begin() as conn:
conn.execute(JOBS_TABLE.insert().values(
job_id=job_id,
job_type=job_type,
Expand Down Expand Up @@ -225,12 +223,6 @@ def add_pending_job(job_id, job_type, api_key,
)
if inserts:
conn.execute(METADATA_TABLE.insert(), inserts)
trans.commit()
except Exception:
trans.rollback()
raise
finally:
conn.close()


class InvalidErrorObjectError(Exception):
Expand Down
14 changes: 9 additions & 5 deletions ckanext/xloader/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ def xloader_status_description(status):
return _('Not Uploaded Yet')


def is_resource_supported_by_xloader(res_dict, check_access = True):
def is_resource_supported_by_xloader(res_dict, check_access=True):
is_supported_format = XLoaderFormats.is_it_an_xloader_format(res_dict.get('format'))
is_datastore_active = res_dict.get('datastore_active', False)
user_has_access = not check_access or toolkit.h.check_access('package_update',
{'id':res_dict.get('package_id')})
try:
is_supported_url_type = res_dict.get('url_type') not in toolkit.h.datastore_rw_resource_url_types()
except AttributeError:
is_supported_url_type = (res_dict.get('url_type') == 'upload' or not res_dict.get('url_type'))
url_type = res_dict.get('url_type')
if url_type:
try:
is_supported_url_type = url_type not in toolkit.h.datastore_rw_resource_url_types()
except AttributeError:
is_supported_url_type = (url_type == 'upload')
else:
is_supported_url_type = True
return (is_supported_format or is_datastore_active) and user_has_access and is_supported_url_type
33 changes: 19 additions & 14 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tempfile
import json
import datetime
import os
import traceback
import sys

Expand All @@ -21,7 +22,7 @@

from . import db, loader
from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError
from .utils import set_resource_metadata
from .utils import datastore_resource_exists, set_resource_metadata

try:
from ckan.lib.api_token import get_user_from_token
Expand All @@ -35,10 +36,13 @@
requests.packages.urllib3.disable_warnings()

MAX_CONTENT_LENGTH = int(config.get('ckanext.xloader.max_content_length') or 1e9)
# Don't try Tabulator load on large files
MAX_TYPE_GUESSING_LENGTH = int(config.get('ckanext.xloader.max_type_guessing_length') or MAX_CONTENT_LENGTH / 10)
MAX_EXCERPT_LINES = int(config.get('ckanext.xloader.max_excerpt_lines') or 0)
CHUNK_SIZE = 16 * 1024 # 16kb
DOWNLOAD_TIMEOUT = 30

MAX_RETRIES = 1
RETRYABLE_ERRORS = (
errors.DeadlockDetected,
errors.LockNotAvailable,
Expand Down Expand Up @@ -89,18 +93,21 @@ def xloader_data_into_datastore(input):
db.mark_job_as_errored(job_id, str(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
log.error('xloader error: %s, %s', e, traceback.format_exc())
errored = True
except Exception as e:
if isinstance(e, RETRYABLE_ERRORS):
tries = job_dict['metadata'].get('tries', 0)
if tries == 0:
if tries < MAX_RETRIES:
tries = tries + 1
log.info("Job %s failed due to temporary error [%s], retrying", job_id, e)
job_dict['status'] = 'pending'
job_dict['metadata']['tries'] = tries + 1
job_dict['metadata']['tries'] = tries
enqueue_job(
xloader_data_into_datastore,
[input],
title="retry xloader_data_into_datastore: resource: {} attempt {}".format(
job_dict['metadata']['resource_id'], tries),
rq_kwargs=dict(timeout=RETRIED_JOB_TIMEOUT)
)
return None
Expand All @@ -109,7 +116,7 @@ def xloader_data_into_datastore(input):
job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
log.error('xloader error: %s, %s', e, traceback.format_exc())
errored = True
finally:
# job_dict is defined in xloader_hook's docstring
Expand Down Expand Up @@ -226,11 +233,12 @@ def tabulator_load():
logger.info('Loading CSV')
# If ckanext.xloader.use_type_guessing is not configured, fall back to
# deprecated ckanext.xloader.just_load_with_messytables
use_type_guessing = asbool(config.get(
'ckanext.xloader.use_type_guessing', config.get(
'ckanext.xloader.just_load_with_messytables', False)))
logger.info("'use_type_guessing' mode is: %s",
use_type_guessing)
use_type_guessing = asbool(
config.get('ckanext.xloader.use_type_guessing', config.get(
'ckanext.xloader.just_load_with_messytables', False))) \
and not datastore_resource_exists(resource['id']) \
and os.path.getsize(tmp_file.name) <= MAX_TYPE_GUESSING_LENGTH
logger.info("'use_type_guessing' mode is: %s", use_type_guessing)
try:
if use_type_guessing:
tabulator_load()
Expand Down Expand Up @@ -558,8 +566,7 @@ def __init__(self, task_id, input):
self.input = input

def emit(self, record):
conn = db.ENGINE.connect()
try:
with db.ENGINE.connect() as conn:
# Turn strings into unicode to stop SQLAlchemy
# "Unicode type received non-unicode bind param value" warnings.
message = str(record.getMessage())
Expand All @@ -575,8 +582,6 @@ def emit(self, record):
module=module,
funcName=funcName,
lineno=record.lineno))
finally:
conn.close()


class DatetimeJsonEncoder(json.JSONEncoder):
Expand Down
100 changes: 74 additions & 26 deletions ckanext/xloader/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@
from decimal import Decimal

import psycopg2
from chardet.universaldetector import UniversalDetector
from six.moves import zip
from tabulator import config as tabulator_config, Stream, TabulatorException
from tabulator import config as tabulator_config, EncodingError, Stream, TabulatorException
from unidecode import unidecode

import ckan.plugins as p

from .job_exceptions import FileCouldNotBeLoadedError, LoaderError
from .parser import CSV_SAMPLE_LINES, XloaderCSVParser
from .utils import headers_guess, type_guess
from .parser import CSV_SAMPLE_LINES, TypeConverter
from .utils import datastore_resource_exists, headers_guess, type_guess

from ckan.plugins.toolkit import config

Expand All @@ -30,6 +31,52 @@
MAX_COLUMN_LENGTH = 63
tabulator_config.CSV_SAMPLE_LINES = CSV_SAMPLE_LINES

SINGLE_BYTE_ENCODING = 'cp1252'


class UnknownEncodingStream(object):
""" Provides a context manager that wraps a Tabulator stream
and tries multiple encodings if one fails.

This is particularly relevant in cases like Latin-1 encoding,
which is usually ASCII and thus the sample could be sniffed as UTF-8,
only to run into problems later in the file.
"""

def __init__(self, filepath, file_format, decoding_result, **kwargs):
self.filepath = filepath
self.file_format = file_format
self.stream_args = kwargs
self.decoding_result = decoding_result # {'encoding': 'EUC-JP', 'confidence': 0.99}

def __enter__(self):
try:

if (self.decoding_result and self.decoding_result['confidence'] and self.decoding_result['confidence'] > 0.7):
self.stream = Stream(self.filepath, format=self.file_format, encoding=self.decoding_result['encoding'],
** self.stream_args).__enter__()
else:
self.stream = Stream(self.filepath, format=self.file_format, ** self.stream_args).__enter__()

except (EncodingError, UnicodeDecodeError):
self.stream = Stream(self.filepath, format=self.file_format,
encoding=SINGLE_BYTE_ENCODING, **self.stream_args).__enter__()
return self.stream

def __exit__(self, *args):
return self.stream.__exit__(*args)


def detect_encoding(file_path):
detector = UniversalDetector()
with open(file_path, 'rb') as file:
for line in file:
detector.feed(line)
if detector.done:
break
detector.close()
return detector.result # e.g. {'encoding': 'EUC-JP', 'confidence': 0.99}


def _fields_match(fields, existing_fields, logger):
''' Check whether all columns have the same names and types as previously,
Expand Down Expand Up @@ -77,15 +124,17 @@ def _clear_datastore_resource(resource_id):
def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
'''Loads a CSV into DataStore. Does not create the indexes.'''

decoding_result = detect_encoding(csv_filepath)
logger.info("load_csv: Decoded encoding: %s", decoding_result)
# Determine the header row
try:
file_format = os.path.splitext(csv_filepath)[1].strip('.')
with Stream(csv_filepath, format=file_format) as stream:
with UnknownEncodingStream(csv_filepath, file_format, decoding_result) as stream:
header_offset, headers = headers_guess(stream.sample)
except TabulatorException:
try:
file_format = mimetype.lower().split('/')[-1]
with Stream(csv_filepath, format=file_format) as stream:
with UnknownEncodingStream(csv_filepath, file_format, decoding_result) as stream:
header_offset, headers = headers_guess(stream.sample)
except TabulatorException as e:
raise LoaderError('Tabulator error: {}'.format(e))
Expand Down Expand Up @@ -116,10 +165,16 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
logger.info('Ensuring character coding is UTF8')
f_write = tempfile.NamedTemporaryFile(suffix=file_format, delete=False)
try:
with Stream(csv_filepath, format=file_format, skip_rows=skip_rows) as stream:
stream.save(target=f_write.name, format='csv', encoding='utf-8',
delimiter=delimiter)
csv_filepath = f_write.name
save_args = {'target': f_write.name, 'format': 'csv', 'encoding': 'utf-8', 'delimiter': delimiter}
try:
with UnknownEncodingStream(csv_filepath, file_format, decoding_result,
skip_rows=skip_rows) as stream:
stream.save(**save_args)
except (EncodingError, UnicodeDecodeError):
with Stream(csv_filepath, format=file_format, encoding=SINGLE_BYTE_ENCODING,
skip_rows=skip_rows) as stream:
stream.save(**save_args)
csv_filepath = f_write.name

# datastore db connection
engine = get_write_engine()
Expand Down Expand Up @@ -287,16 +342,18 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None):

# Determine the header row
logger.info('Determining column names and types')
decoding_result = detect_encoding(table_filepath)
logger.info("load_table: Decoded encoding: %s", decoding_result)
try:
file_format = os.path.splitext(table_filepath)[1].strip('.')
with Stream(table_filepath, format=file_format,
custom_parsers={'csv': XloaderCSVParser}) as stream:
with UnknownEncodingStream(table_filepath, file_format, decoding_result,
post_parse=[TypeConverter().convert_types]) as stream:
header_offset, headers = headers_guess(stream.sample)
except TabulatorException:
try:
file_format = mimetype.lower().split('/')[-1]
with Stream(table_filepath, format=file_format,
custom_parsers={'csv': XloaderCSVParser}) as stream:
with UnknownEncodingStream(table_filepath, file_format, decoding_result,
post_parse=[TypeConverter().convert_types]) as stream:
header_offset, headers = headers_guess(stream.sample)
except TabulatorException as e:
raise LoaderError('Tabulator error: {}'.format(e))
Expand Down Expand Up @@ -332,9 +389,11 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None):
for t, h in zip(types, headers)]

headers = [header.strip()[:MAX_COLUMN_LENGTH] for header in headers if header.strip()]
type_converter = TypeConverter(types=types)

with Stream(table_filepath, format=file_format, skip_rows=skip_rows,
custom_parsers={'csv': XloaderCSVParser}) as stream:
with UnknownEncodingStream(table_filepath, file_format, decoding_result,
skip_rows=skip_rows,
post_parse=[type_converter.convert_types]) as stream:
def row_iterator():
for row in stream:
data_row = {}
Expand Down Expand Up @@ -457,17 +516,6 @@ def send_resource_to_datastore(resource_id, headers, records):
.format(str(e)))


def datastore_resource_exists(resource_id):
from ckan import model
context = {'model': model, 'ignore_auth': True}
try:
response = p.toolkit.get_action('datastore_search')(context, dict(
id=resource_id, limit=0))
except p.toolkit.ObjectNotFound:
return False
return response or {'fields': []}


def delete_datastore_resource(resource_id):
from ckan import model
context = {'model': model, 'user': '', 'ignore_auth': True}
Expand Down
Loading
Loading