Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add is_data_dict_populated flag to load jobs #22

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,12 @@ def xloader_data_into_datastore_(input, job_dict, logger):
resource['hash'] = file_hash

def direct_load():
fields = loader.load_csv(
fields, is_data_dict_populated = loader.load_csv(
tmp_file.name,
resource_id=resource['id'],
mimetype=resource.get('format'),
logger=logger)
data['is_data_dict_populated'] = is_data_dict_populated
loader.calculate_record_count(
resource_id=resource['id'], logger=logger)
set_datastore_active(data, resource, logger)
Expand All @@ -223,16 +224,19 @@ def direct_load():
# logger.info('File Hash updated for resource: %s', resource['hash'])

def tabulator_load():
is_data_dict_populated = False
try:
loader.load_table(tmp_file.name,
resource_id=resource['id'],
mimetype=resource.get('format'),
logger=logger)
is_data_dict_populated = loader.load_table(
tmp_file.name,
resource_id=resource['id'],
mimetype=resource.get('format'),
logger=logger)
except JobError as e:
logger.error('Error during tabulator load: %s', e)
raise
loader.calculate_record_count(
resource_id=resource['id'], logger=logger)
data['is_data_dict_populated'] = is_data_dict_populated
set_datastore_active(data, resource, logger)
logger.info('Finished loading with tabulator')
# update_resource(resource={'id': resource['id'], 'hash': resource['hash']},
Expand All @@ -251,7 +255,7 @@ def tabulator_load():
logger.info("'use_type_guessing' mode is: %s", use_type_guessing)
try:
if use_type_guessing:
tabulator_load()
tabulator_load()
jguo144 marked this conversation as resolved.
Show resolved Hide resolved
else:
try:
direct_load()
Expand Down
15 changes: 13 additions & 2 deletions ckanext/xloader/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
{'id': header_name,
'type': 'text'}
for header_name in headers]

is_data_dict_populated = _is_data_dict_populated(fields, logger)
logger.info('Fields: %s', fields)

# Create table
Expand Down Expand Up @@ -319,8 +319,16 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
_populate_fulltext(connection, resource_id, fields=fields)
logger.info('...search index created')

return fields
return fields, is_data_dict_populated

def _is_data_dict_populated(fields, logger):
"""Return True if the data_dict has been populated with the fields"""
if (any((field for field in fields if field.get('info', {}).get('label'))) or
any((field for field in fields if field.get('info', {}).get('notes'))) or
any((field for field in fields if field.get('info', {}).get('type_override')))):
logger.info('Setting resource.is_data_dict_populated = True')
return True
return False

def create_column_indexes(fields, resource_id, logger):
logger.info('Creating column indexes (a speed optimization for queries)...')
Expand Down Expand Up @@ -442,6 +450,8 @@ def row_iterator():
logger.info('Deleting "%s" from datastore.', resource_id)
delete_datastore_resource(resource_id)

is_data_dict_populated = _is_data_dict_populated(headers_dicts, logger)
logger.info('Fields: %s', headers_dicts)
logger.info('Copying to database...')
count = 0
# Some types cannot be stored as empty strings and must be converted to None,
Expand All @@ -462,6 +472,7 @@ def row_iterator():
else:
# no datastore table is created
raise LoaderError('No entries found - nothing to load')
return is_data_dict_populated


_TYPE_MAPPING = {
Expand Down
10 changes: 10 additions & 0 deletions ckanext/xloader/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import pytest

import ckan.plugins as p


@pytest.fixture
def clean_db(reset_db, migrate_db_for):
reset_db()
if p.get_plugin('harvest'):
migrate_db_for('harvest')
23 changes: 1 addition & 22 deletions ckanext/xloader/tests/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
from sqlalchemy import orm
import os

import pytest
from ckanext.datastore.tests import helpers as datastore_helpers
from ckanext.xloader.loader import get_write_engine

Expand Down Expand Up @@ -99,27 +99,6 @@ def reset_index():
"""
return search.clear_all

@pytest.fixture
def clean_db(reset_db):
"""Resets the database to the initial state.

This can be used either for all tests in a class::

@pytest.mark.usefixtures("clean_db")
class TestExample(object):

def test_example(self):

or for a single test::

class TestExample(object):

@pytest.mark.usefixtures("clean_db")
def test_example(self):

"""
reset_db()

@pytest.fixture
def clean_index(reset_index):
"""Clear search index before starting the test.
Expand Down
56 changes: 37 additions & 19 deletions ckanext/xloader/tests/test_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ def test_simple(self, Session):
csv_filepath = get_sample_filepath("simple.csv")
resource = factories.Resource()
resource_id = resource['id']
loader.load_csv(
_, is_data_dict_populated = loader.load_csv(
csv_filepath,
resource_id=resource_id,
mimetype="text/csv",
logger=logger,
)

assert is_data_dict_populated == False
assert self._get_records(
Session, resource_id, limit=1, exclude_full_text_column=False
) == [
Expand Down Expand Up @@ -140,7 +140,7 @@ def test_simple_with_indexing(self, Session):
csv_filepath = get_sample_filepath("simple.csv")
resource = factories.Resource()
resource_id = resource['id']
fields = loader.load_csv(
fields, is_data_dict_populated = loader.load_csv(
csv_filepath,
resource_id=resource_id,
mimetype="text/csv",
Expand All @@ -149,6 +149,7 @@ def test_simple_with_indexing(self, Session):
loader.create_column_indexes(
fields=fields, resource_id=resource_id, logger=logger
)
assert is_data_dict_populated == False

assert (
self._get_records(
Expand Down Expand Up @@ -742,7 +743,7 @@ def test_reload_with_overridden_types(self, Session):
)

# Load it again with new types
fields = loader.load_csv(
fields, is_data_dict_populated = loader.load_csv(
csv_filepath,
resource_id=resource_id,
mimetype="text/csv",
Expand All @@ -751,6 +752,7 @@ def test_reload_with_overridden_types(self, Session):
loader.create_column_indexes(
fields=fields, resource_id=resource_id, logger=logger
)
assert is_data_dict_populated == True

assert len(self._get_records(Session, resource_id)) == 6
assert self._get_column_names(Session, resource_id) == [
Expand Down Expand Up @@ -797,13 +799,13 @@ def test_column_names(self, Session):
csv_filepath = get_sample_filepath("column_names.csv")
resource = factories.Resource()
resource_id = resource['id']
loader.load_csv(
_, is_data_dict_populated = loader.load_csv(
csv_filepath,
resource_id=resource_id,
mimetype="text/csv",
logger=logger,
)

assert is_data_dict_populated == False
assert self._get_column_names(Session, resource_id)[2:] == [
u"d@t$e",
u"t^e&m*pe!r(a)t?u:r%%e",
Expand Down Expand Up @@ -881,13 +883,13 @@ def test_shapefile_zip_python3(self, Session):
filepath = get_sample_filepath("polling_locations.shapefile.zip")
resource = factories.Resource()
resource_id = resource['id']
loader.load_csv(
_, is_data_dict_populated = loader.load_csv(
filepath,
resource_id=resource_id,
mimetype="text/csv",
logger=logger,
)

assert is_data_dict_populated == False
assert self._get_records(Session, resource_id) == []
assert self._get_column_names(Session, resource_id) == [
'_id',
Expand All @@ -901,13 +903,13 @@ def test_simple(self, Session):
csv_filepath = get_sample_filepath("simple.xls")
resource = factories.Resource()
resource_id = resource['id']
loader.load_table(
is_data_dict_populated = loader.load_table(
csv_filepath,
resource_id=resource_id,
mimetype="xls",
logger=logger,
)

assert is_data_dict_populated == False
assert (
"'galway':"
in self._get_records(
Expand Down Expand Up @@ -966,12 +968,14 @@ def test_simple_large_file(self, Session):
csv_filepath = get_sample_filepath("simple-large.csv")
resource = factories.Resource()
resource_id = resource['id']
loader.load_table(
is_data_dict_populated = loader.load_table(
csv_filepath,
resource_id=resource_id,
mimetype="text/csv",
logger=logger,
)
assert is_data_dict_populated == False

assert self._get_column_types(Session, resource_id) == [
u"int4",
u"tsvector",
Expand All @@ -983,12 +987,14 @@ def test_with_mixed_types(self, Session):
csv_filepath = get_sample_filepath("mixed_numeric_string_sample.csv")
resource = factories.Resource()
resource_id = resource['id']
loader.load_table(
is_data_dict_populated = loader.load_table(
csv_filepath,
resource_id=resource_id,
mimetype="text/csv",
logger=logger,
)
assert is_data_dict_populated == False

assert len(self._get_records(Session, resource_id)) == 6

assert self._get_column_types(Session, resource_id) == [
Expand Down Expand Up @@ -1016,12 +1022,14 @@ def test_boston_311_complete(self):
time.strftime("%H:%M:%S", time.localtime(t0))
)
)
loader.load_table(
is_data_dict_populated = loader.load_table(
csv_filepath,
resource_id=resource_id,
mimetype="csv",
logger=logger,
)
assert is_data_dict_populated == False

print("Load: {}s".format(time.time() - t0))

# test disabled by default to avoid adding large file to repo and slow test
Expand All @@ -1040,19 +1048,20 @@ def test_boston_311_sample5(self):
time.strftime("%H:%M:%S", time.localtime(t0))
)
)
loader.load_table(
is_data_dict_populated = loader.load_table(
csv_filepath,
resource_id=resource_id,
mimetype="csv",
logger=logger,
)
assert is_data_dict_populated == False
print("Load: {}s".format(time.time() - t0))

def test_boston_311(self, Session):
csv_filepath = get_sample_filepath("boston_311_sample.csv")
resource = factories.Resource()
resource_id = resource['id']
loader.load_table(
is_data_dict_populated = loader.load_table(
csv_filepath,
resource_id=resource_id,
mimetype="csv",
Expand All @@ -1061,6 +1070,8 @@ def test_boston_311(self, Session):

records = self._get_records(Session, resource_id)
print(records)
assert is_data_dict_populated == False

assert records == [
(
1,
Expand Down Expand Up @@ -1246,36 +1257,41 @@ def test_with_quoted_commas(self, Session):
csv_filepath = get_sample_filepath("sample_with_quoted_commas.csv")
resource = factories.Resource()
resource_id = resource['id']
loader.load_table(
is_data_dict_populated = loader.load_table(
csv_filepath,
resource_id=resource_id,
mimetype="text/csv",
logger=logger,
)
assert is_data_dict_populated == False

assert len(self._get_records(Session, resource_id)) == 3

def test_with_iso_8859_1(self, Session):
csv_filepath = get_sample_filepath("non_utf8_sample.csv")
resource = factories.Resource()
resource_id = resource['id']
loader.load_table(
is_data_dict_populated = loader.load_table(
csv_filepath,
resource_id=resource_id,
mimetype="text/csv",
logger=logger,
)
assert is_data_dict_populated == False

assert len(self._get_records(Session, resource_id)) == 266

def test_with_mixed_quotes(self, Session):
csv_filepath = get_sample_filepath("sample_with_mixed_quotes.csv")
resource = factories.Resource()
resource_id = resource['id']
loader.load_table(
is_data_dict_populated = loader.load_table(
csv_filepath,
resource_id=resource_id,
mimetype="text/csv",
logger=logger,
)
assert is_data_dict_populated == False
assert len(self._get_records(Session, resource_id)) == 2

def test_preserving_time_ranges(self, Session):
Expand All @@ -1284,12 +1300,14 @@ def test_preserving_time_ranges(self, Session):
csv_filepath = get_sample_filepath("non_timestamp_sample.csv")
resource = factories.Resource()
resource_id = resource['id']
loader.load_table(
is_data_dict_populated = loader.load_table(
csv_filepath,
resource_id=resource_id,
mimetype="text/csv",
logger=logger,
)
assert is_data_dict_populated == False

assert self._get_records(Session, resource_id) == [
(1, "Adavale", 4474, Decimal("-25.9092582"), Decimal("144.5975769"),
"8:00", "16:00", datetime.datetime(2018, 7, 19)),
Expand Down