From 183dff0ea777d157effba1e9388b151f4a21b1c7 Mon Sep 17 00:00:00 2001 From: Peter Vorman Date: Fri, 27 Sep 2024 17:24:37 +0300 Subject: [PATCH 1/3] added is_data_dict_populated flag to load jobs --- ckanext/xloader/jobs.py | 16 +++++--- ckanext/xloader/loader.py | 15 +++++++- ckanext/xloader/tests/conftest.py | 10 +++++ ckanext/xloader/tests/fixtures.py | 23 +----------- ckanext/xloader/tests/test_loader.py | 56 ++++++++++++++++++---------- 5 files changed, 71 insertions(+), 49 deletions(-) create mode 100644 ckanext/xloader/tests/conftest.py diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index 6ce5cdd3..add0d731 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -200,11 +200,12 @@ def xloader_data_into_datastore_(input, job_dict, logger): resource['hash'] = file_hash def direct_load(): - fields = loader.load_csv( + fields, is_data_dict_populated = loader.load_csv( tmp_file.name, resource_id=resource['id'], mimetype=resource.get('format'), logger=logger) + data['is_data_dict_populated'] = is_data_dict_populated loader.calculate_record_count( resource_id=resource['id'], logger=logger) set_datastore_active(data, resource, logger) @@ -223,16 +224,19 @@ def direct_load(): # logger.info('File Hash updated for resource: %s', resource['hash']) def tabulator_load(): + is_data_dict_populated = False try: - loader.load_table(tmp_file.name, - resource_id=resource['id'], - mimetype=resource.get('format'), - logger=logger) + is_data_dict_populated = loader.load_table( + tmp_file.name, + resource_id=resource['id'], + mimetype=resource.get('format'), + logger=logger) except JobError as e: logger.error('Error during tabulator load: %s', e) raise loader.calculate_record_count( resource_id=resource['id'], logger=logger) + data['is_data_dict_populated'] = is_data_dict_populated set_datastore_active(data, resource, logger) logger.info('Finished loading with tabulator') # update_resource(resource={'id': resource['id'], 'hash': resource['hash']}, @@ -251,7 +255,7 @@ def tabulator_load(): logger.info("'use_type_guessing' mode is: %s", use_type_guessing) try: if use_type_guessing: - tabulator_load() + tabulator_load() else: try: direct_load() diff --git a/ckanext/xloader/loader.py b/ckanext/xloader/loader.py index 8c913e0a..66722fa2 100644 --- a/ckanext/xloader/loader.py +++ b/ckanext/xloader/loader.py @@ -225,7 +225,7 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): {'id': header_name, 'type': 'text'} for header_name in headers] - + is_data_dict_populated = _is_data_dict_populated(fields, logger) logger.info('Fields: %s', fields) # Create table @@ -319,8 +319,16 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): _populate_fulltext(connection, resource_id, fields=fields) logger.info('...search index created') - return fields + return fields, is_data_dict_populated +def _is_data_dict_populated(fields, logger): + """Return True if the data_dict has been populated with the fields""" + if (any((field for field in fields if field.get('info', {}).get('label'))) or + any((field for field in fields if field.get('info', {}).get('notes'))) or + any((field for field in fields if field.get('info', {}).get('type_override')))): + logger.info('Setting resource.is_data_dict_populated = True') + return True + return False def create_column_indexes(fields, resource_id, logger): logger.info('Creating column indexes (a speed optimization for queries)...') @@ -442,6 +450,8 @@ def row_iterator(): logger.info('Deleting "%s" from datastore.', resource_id) delete_datastore_resource(resource_id) + is_data_dict_populated = _is_data_dict_populated(headers_dicts, logger) + logger.info('Fields: %s', headers_dicts) logger.info('Copying to database...') count = 0 # Some types cannot be stored as empty strings and must be converted to None, @@ -462,6 +472,7 @@ def row_iterator(): else: # no datastore table is created raise LoaderError('No entries found - nothing to load') + return is_data_dict_populated _TYPE_MAPPING = { diff --git a/ckanext/xloader/tests/conftest.py b/ckanext/xloader/tests/conftest.py new file mode 100644 index 00000000..bb474a6b --- /dev/null +++ b/ckanext/xloader/tests/conftest.py @@ -0,0 +1,10 @@ +import pytest + +import ckan.plugins as p + + +@pytest.fixture +def clean_db(reset_db, migrate_db_for): + reset_db() + if p.get_plugin('harvest'): + migrate_db_for('harvest') diff --git a/ckanext/xloader/tests/fixtures.py b/ckanext/xloader/tests/fixtures.py index 9a7ad37f..7ea55850 100644 --- a/ckanext/xloader/tests/fixtures.py +++ b/ckanext/xloader/tests/fixtures.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- from sqlalchemy import orm import os - +import pytest from ckanext.datastore.tests import helpers as datastore_helpers from ckanext.xloader.loader import get_write_engine @@ -99,27 +99,6 @@ def reset_index(): """ return search.clear_all - @pytest.fixture - def clean_db(reset_db): - """Resets the database to the initial state. - - This can be used either for all tests in a class:: - - @pytest.mark.usefixtures("clean_db") - class TestExample(object): - - def test_example(self): - - or for a single test:: - - class TestExample(object): - - @pytest.mark.usefixtures("clean_db") - def test_example(self): - - """ - reset_db() - @pytest.fixture def clean_index(reset_index): """Clear search index before starting the test. diff --git a/ckanext/xloader/tests/test_loader.py b/ckanext/xloader/tests/test_loader.py index e8816a13..814cfe37 100644 --- a/ckanext/xloader/tests/test_loader.py +++ b/ckanext/xloader/tests/test_loader.py @@ -95,13 +95,13 @@ def test_simple(self, Session): csv_filepath = get_sample_filepath("simple.csv") resource = factories.Resource() resource_id = resource['id'] - loader.load_csv( + _, is_data_dict_populated = loader.load_csv( csv_filepath, resource_id=resource_id, mimetype="text/csv", logger=logger, ) - + assert is_data_dict_populated == False assert self._get_records( Session, resource_id, limit=1, exclude_full_text_column=False ) == [ @@ -140,7 +140,7 @@ def test_simple_with_indexing(self, Session): csv_filepath = get_sample_filepath("simple.csv") resource = factories.Resource() resource_id = resource['id'] - fields = loader.load_csv( + fields, is_data_dict_populated = loader.load_csv( csv_filepath, resource_id=resource_id, mimetype="text/csv", @@ -149,6 +149,7 @@ def test_simple_with_indexing(self, Session): loader.create_column_indexes( fields=fields, resource_id=resource_id, logger=logger ) + assert is_data_dict_populated == False assert ( self._get_records( @@ -742,7 +743,7 @@ def test_reload_with_overridden_types(self, Session): ) # Load it again with new types - fields = loader.load_csv( + fields, is_data_dict_populated = loader.load_csv( csv_filepath, resource_id=resource_id, mimetype="text/csv", @@ -751,6 +752,7 @@ def test_reload_with_overridden_types(self, Session): loader.create_column_indexes( fields=fields, resource_id=resource_id, logger=logger ) + assert is_data_dict_populated == True assert len(self._get_records(Session, resource_id)) == 6 assert self._get_column_names(Session, resource_id) == [ @@ -797,13 +799,13 @@ def test_column_names(self, Session): csv_filepath = get_sample_filepath("column_names.csv") resource = factories.Resource() resource_id = resource['id'] - loader.load_csv( + _, is_data_dict_populated = loader.load_csv( csv_filepath, resource_id=resource_id, mimetype="text/csv", logger=logger, ) - + assert is_data_dict_populated == False assert self._get_column_names(Session, resource_id)[2:] == [ u"d@t$e", u"t^e&m*pe!r(a)t?u:r%%e", @@ -881,13 +883,13 @@ def test_shapefile_zip_python3(self, Session): filepath = get_sample_filepath("polling_locations.shapefile.zip") resource = factories.Resource() resource_id = resource['id'] - loader.load_csv( + _, is_data_dict_populated = loader.load_csv( filepath, resource_id=resource_id, mimetype="text/csv", logger=logger, ) - + assert is_data_dict_populated == False assert self._get_records(Session, resource_id) == [] assert self._get_column_names(Session, resource_id) == [ '_id', @@ -901,13 +903,13 @@ def test_simple(self, Session): csv_filepath = get_sample_filepath("simple.xls") resource = factories.Resource() resource_id = resource['id'] - loader.load_table( + is_data_dict_populated = loader.load_table( csv_filepath, resource_id=resource_id, mimetype="xls", logger=logger, ) - + assert is_data_dict_populated == False assert ( "'galway':" in self._get_records( @@ -966,12 +968,14 @@ def test_simple_large_file(self, Session): csv_filepath = get_sample_filepath("simple-large.csv") resource = factories.Resource() resource_id = resource['id'] - loader.load_table( + is_data_dict_populated = loader.load_table( csv_filepath, resource_id=resource_id, mimetype="text/csv", logger=logger, ) + assert is_data_dict_populated == False + assert self._get_column_types(Session, resource_id) == [ u"int4", u"tsvector", @@ -983,12 +987,14 @@ def test_with_mixed_types(self, Session): csv_filepath = get_sample_filepath("mixed_numeric_string_sample.csv") resource = factories.Resource() resource_id = resource['id'] - loader.load_table( + is_data_dict_populated = loader.load_table( csv_filepath, resource_id=resource_id, mimetype="text/csv", logger=logger, ) + assert is_data_dict_populated == False + assert len(self._get_records(Session, resource_id)) == 6 assert self._get_column_types(Session, resource_id) == [ @@ -1016,12 +1022,14 @@ def test_boston_311_complete(self): time.strftime("%H:%M:%S", time.localtime(t0)) ) ) - loader.load_table( + is_data_dict_populated = loader.load_table( csv_filepath, resource_id=resource_id, mimetype="csv", logger=logger, ) + assert is_data_dict_populated == False + print("Load: {}s".format(time.time() - t0)) # test disabled by default to avoid adding large file to repo and slow test @@ -1040,19 +1048,20 @@ def test_boston_311_sample5(self): time.strftime("%H:%M:%S", time.localtime(t0)) ) ) - loader.load_table( + is_data_dict_populated = loader.load_table( csv_filepath, resource_id=resource_id, mimetype="csv", logger=logger, ) + assert is_data_dict_populated == False print("Load: {}s".format(time.time() - t0)) def test_boston_311(self, Session): csv_filepath = get_sample_filepath("boston_311_sample.csv") resource = factories.Resource() resource_id = resource['id'] - loader.load_table( + is_data_dict_populated = loader.load_table( csv_filepath, resource_id=resource_id, mimetype="csv", @@ -1061,6 +1070,8 @@ def test_boston_311(self, Session): records = self._get_records(Session, resource_id) print(records) + assert is_data_dict_populated == False + assert records == [ ( 1, @@ -1246,36 +1257,41 @@ def test_with_quoted_commas(self, Session): csv_filepath = get_sample_filepath("sample_with_quoted_commas.csv") resource = factories.Resource() resource_id = resource['id'] - loader.load_table( + is_data_dict_populated = loader.load_table( csv_filepath, resource_id=resource_id, mimetype="text/csv", logger=logger, ) + assert is_data_dict_populated == False + assert len(self._get_records(Session, resource_id)) == 3 def test_with_iso_8859_1(self, Session): csv_filepath = get_sample_filepath("non_utf8_sample.csv") resource = factories.Resource() resource_id = resource['id'] - loader.load_table( + is_data_dict_populated = loader.load_table( csv_filepath, resource_id=resource_id, mimetype="text/csv", logger=logger, ) + assert is_data_dict_populated == False + assert len(self._get_records(Session, resource_id)) == 266 def test_with_mixed_quotes(self, Session): csv_filepath = get_sample_filepath("sample_with_mixed_quotes.csv") resource = factories.Resource() resource_id = resource['id'] - loader.load_table( + is_data_dict_populated = loader.load_table( csv_filepath, resource_id=resource_id, mimetype="text/csv", logger=logger, ) + assert is_data_dict_populated == False assert len(self._get_records(Session, resource_id)) == 2 def test_preserving_time_ranges(self, Session): @@ -1284,12 +1300,14 @@ def test_preserving_time_ranges(self, Session): csv_filepath = get_sample_filepath("non_timestamp_sample.csv") resource = factories.Resource() resource_id = resource['id'] - loader.load_table( + is_data_dict_populated = loader.load_table( csv_filepath, resource_id=resource_id, mimetype="text/csv", logger=logger, ) + assert is_data_dict_populated == False + assert self._get_records(Session, resource_id) == [ (1, "Adavale", 4474, Decimal("-25.9092582"), Decimal("144.5975769"), "8:00", "16:00", datetime.datetime(2018, 7, 19)), From acd39807749ce5b77a596273d22f6c35d9833cce Mon Sep 17 00:00:00 2001 From: Peter Vorman Date: Fri, 4 Oct 2024 17:46:31 +0300 Subject: [PATCH 2/3] removed clean_db --- ckanext/xloader/jobs.py | 2 +- ckanext/xloader/tests/conftest.py | 10 ---------- 2 files changed, 1 insertion(+), 11 deletions(-) delete mode 100644 ckanext/xloader/tests/conftest.py diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index add0d731..1291ca2c 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -255,7 +255,7 @@ def tabulator_load(): logger.info("'use_type_guessing' mode is: %s", use_type_guessing) try: if use_type_guessing: - tabulator_load() + tabulator_load() else: try: direct_load() diff --git a/ckanext/xloader/tests/conftest.py b/ckanext/xloader/tests/conftest.py deleted file mode 100644 index bb474a6b..00000000 --- a/ckanext/xloader/tests/conftest.py +++ /dev/null @@ -1,10 +0,0 @@ -import pytest - -import ckan.plugins as p - - -@pytest.fixture -def clean_db(reset_db, migrate_db_for): - reset_db() - if p.get_plugin('harvest'): - migrate_db_for('harvest') From 4fecab71f72a05275c417d8565acca913de67542 Mon Sep 17 00:00:00 2001 From: Jay Guo Date: Fri, 4 Oct 2024 17:11:04 -0400 Subject: [PATCH 3/3] Undo change to fixtures --- ckanext/xloader/loader.py | 2 +- ckanext/xloader/tests/fixtures.py | 23 ++++++++++++++++++++++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/ckanext/xloader/loader.py b/ckanext/xloader/loader.py index 66722fa2..bd4a7744 100644 --- a/ckanext/xloader/loader.py +++ b/ckanext/xloader/loader.py @@ -322,7 +322,7 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): return fields, is_data_dict_populated def _is_data_dict_populated(fields, logger): - """Return True if the data_dict has been populated with the fields""" + """Return True if the data dictionary has been populated""" if (any((field for field in fields if field.get('info', {}).get('label'))) or any((field for field in fields if field.get('info', {}).get('notes'))) or any((field for field in fields if field.get('info', {}).get('type_override')))): diff --git a/ckanext/xloader/tests/fixtures.py b/ckanext/xloader/tests/fixtures.py index 7ea55850..9a7ad37f 100644 --- a/ckanext/xloader/tests/fixtures.py +++ b/ckanext/xloader/tests/fixtures.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- from sqlalchemy import orm import os -import pytest + from ckanext.datastore.tests import helpers as datastore_helpers from ckanext.xloader.loader import get_write_engine @@ -99,6 +99,27 @@ def reset_index(): """ return search.clear_all + @pytest.fixture + def clean_db(reset_db): + """Resets the database to the initial state. + + This can be used either for all tests in a class:: + + @pytest.mark.usefixtures("clean_db") + class TestExample(object): + + def test_example(self): + + or for a single test:: + + class TestExample(object): + + @pytest.mark.usefixtures("clean_db") + def test_example(self): + + """ + reset_db() + @pytest.fixture def clean_index(reset_index): """Clear search index before starting the test.