From b6e3d60de9a53e374e5ce3b0838a9585533bc0d9 Mon Sep 17 00:00:00 2001 From: Konstantin Sivakov Date: Fri, 6 Oct 2023 21:39:42 +0200 Subject: [PATCH 1/3] Fix migrations --- .../01_e9c4a88839c8_upgrade_jobs_table.py | 14 +------- .../02_ad4dccf78307_upgrade_logs_table.py | 32 +++++++++++++++++++ 2 files changed, 33 insertions(+), 13 deletions(-) create mode 100644 ckanext/datapusher_plus/migration/datapusher_plus/versions/02_ad4dccf78307_upgrade_logs_table.py diff --git a/ckanext/datapusher_plus/migration/datapusher_plus/versions/01_e9c4a88839c8_upgrade_jobs_table.py b/ckanext/datapusher_plus/migration/datapusher_plus/versions/01_e9c4a88839c8_upgrade_jobs_table.py index fd2a606..566fe7d 100644 --- a/ckanext/datapusher_plus/migration/datapusher_plus/versions/01_e9c4a88839c8_upgrade_jobs_table.py +++ b/ckanext/datapusher_plus/migration/datapusher_plus/versions/01_e9c4a88839c8_upgrade_jobs_table.py @@ -18,7 +18,6 @@ def upgrade(): #upgrade jobs table if it not exists - op.add_column( u'jobs', sa.Column( @@ -26,16 +25,5 @@ def upgrade(): sa.UnicodeText), ) - #upgrade logs table - op.add_column( - 'logs', - sa.Column( - 'id', - sa.Integer, - primary_key=True, - autoincrement=True), - ) - - def downgrade(): - pass + op.drop_column(u'jobs', 'aps_job_id') diff --git a/ckanext/datapusher_plus/migration/datapusher_plus/versions/02_ad4dccf78307_upgrade_logs_table.py b/ckanext/datapusher_plus/migration/datapusher_plus/versions/02_ad4dccf78307_upgrade_logs_table.py new file mode 100644 index 0000000..6d01aeb --- /dev/null +++ b/ckanext/datapusher_plus/migration/datapusher_plus/versions/02_ad4dccf78307_upgrade_logs_table.py @@ -0,0 +1,32 @@ +"""empty message + +Revision ID: ad4dccf78307 +Revises: e9c4a88839c8 +Create Date: 2023-10-06 21:32:16.409225 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'ad4dccf78307' +down_revision = 'e9c4a88839c8' +branch_labels = None +depends_on = None + + +def upgrade(): + #upgrade logs table + op.add_column( + 'logs', + sa.Column( + 'id', + sa.Integer, + primary_key=True, + autoincrement=True), + ) + + +def downgrade(): + op.drop_column('logs', 'id') From ca0d59a360ab675cc5960fc0dbb8e13475550672 Mon Sep 17 00:00:00 2001 From: Konstantin Sivakov Date: Mon, 9 Oct 2023 21:09:19 +0200 Subject: [PATCH 2/3] Small update --- ckanext/datapusher_plus/jobs.py | 119 +++++++++++--------------------- 1 file changed, 42 insertions(+), 77 deletions(-) diff --git a/ckanext/datapusher_plus/jobs.py b/ckanext/datapusher_plus/jobs.py index 020bdd2..b041e7c 100644 --- a/ckanext/datapusher_plus/jobs.py +++ b/ckanext/datapusher_plus/jobs.py @@ -52,6 +52,7 @@ import ckanext.datapusher_plus.helpers as dph from ckanext.datapusher_plus.config import config + if locale.getdefaultlocale()[0]: lang, encoding = locale.getdefaultlocale() locale.setlocale(locale.LC_ALL, locale=(lang, encoding)) @@ -102,70 +103,41 @@ def default(self, obj): return json.JSONEncoder.default(self, obj) -def delete_datastore_resource(resource_id, api_key, ckan_url): + +def delete_datastore_resource(resource_id): try: - delete_url = get_url("datastore_delete", ckan_url) - response = requests.post( - delete_url, - verify=SSL_VERIFY, - data=json.dumps({"id": resource_id, "force": True}), - headers={"Content-Type": "application/json", "Authorization": api_key}, - ) - utils.check_response( - response, - delete_url, - "CKAN", - good_status=(201, 200, 404), - ignore_no_success=True, - ) - except requests.exceptions.RequestException: + tk.get_action("datastore_delete")( + {"ignore_auth": True}, {"resource_id": resource_id, "force": True}) + except tk.ObjectNotFound: raise utils.JobError("Deleting existing datastore failed.") def delete_resource(resource_id, api_key, ckan_url): + if not tk.user: + raise utils.JobError("No user found.") try: - delete_url = get_url("resource_delete", ckan_url) - response = requests.post( - delete_url, - verify=SSL_VERIFY, - data=json.dumps({"id": resource_id, "force": True}), - headers={"Content-Type": "application/json", "Authorization": api_key}, - ) - utils.check_response( - response, - delete_url, - "CKAN", - good_status=(201, 200, 404), - ignore_no_success=True, - ) - except requests.exceptions.RequestException: + tk.get_action("resource_delete")({"user": tk.user}, {"id": resource_id, "force": True}) + except tk.ObjectNotFound: raise utils.JobError("Deleting existing resource failed.") def datastore_resource_exists(resource_id, api_key, ckan_url): - from ckanext.datapusher_plus.job_exceptions import HTTPError, JobError + from ckanext.datapusher_plus.job_exceptions import JobError + + data_dict = { + "resource_id": resource_id, + "limit": 0, + "include_total": False, + } + + context = {'ignore_auth': True } try: - search_url = get_url("datastore_search", ckan_url) - response = requests.post( - search_url, - verify=SSL_VERIFY, - data=json.dumps({"id": resource_id, "limit": 0}), - headers={"Content-Type": "application/json", "Authorization": api_key}, - ) - if response.status_code == 404: - return False - elif response.status_code == 200: - return response.json().get("result", {"fields": []}) - else: - raise HTTPError( - "Error getting datastore resource.", - response.status_code, - search_url, - response, - ) - except requests.exceptions.RequestException as e: - raise JobError("Error getting datastore resource ({!s}).".format(e)) + result = tk.get_action("datastore_search")(context, data_dict) + return result + except tk.ObjectNotFound: + return False + def send_resource_to_datastore( @@ -201,28 +173,23 @@ def send_resource_to_datastore( "aliases": aliases, "calculate_record_count": calculate_record_count, } - - url = get_url("datastore_create", ckan_url) - r = requests.post( - url, - verify=SSL_VERIFY, - data=json.dumps(request, cls=DatastoreEncoder), - headers={"Content-Type": "application/json", "Authorization": api_key}, - ) - utils.check_response(r, url, "CKAN DataStore") - return r.json() + try: + resource_dict = tk.get_action("datastore_create")({"ignore_auth": True}, request) + return resource_dict + except Exception as e: + raise utils.JobError("Error sending data to datastore ({!s}).".format(e)) def update_resource(resource, ckan_url, api_key): - url = get_url("resource_update", ckan_url) - r = requests.post( - url, - verify=SSL_VERIFY, - data=json.dumps(resource), - headers={"Content-Type": "application/json", "Authorization": api_key}, - ) - - utils.check_response(r, url, "CKAN") + """ + Updates resource in CKAN + """ + try: + tk.get_action("resource_update")( + {"ignore_auth": True}, resource + ) + except tk.ObjectNotFound: + raise utils.JobError("Updating existing resource failed.") def get_resource(resource_id, ckan_url, api_key): @@ -918,7 +885,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): res_id=resource_id ) ) - delete_datastore_resource(resource_id, api_key, ckan_url) + delete_datastore_resource(resource_id) # 1st pass of building headers_dict # here we map inferred types to postgresql data types @@ -1204,7 +1171,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): if pii_alias_result: existing_pii_alias_of = pii_alias_result[0] - delete_datastore_resource(existing_pii_alias_of, api_key, ckan_url) + delete_datastore_resource(existing_pii_alias_of) delete_resource(existing_pii_alias_of, api_key, ckan_url) pii_alias = [pii_resource_id] @@ -1500,7 +1467,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): if stats_alias_result: existing_stats_alias_of = stats_alias_result[0] - delete_datastore_resource(existing_stats_alias_of, api_key, ckan_url) + delete_datastore_resource(existing_stats_alias_of) delete_resource(existing_stats_alias_of, api_key, ckan_url) stats_aliases = [stats_resource_id] @@ -1530,9 +1497,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): if result: existing_stats_alias_of = result[0] - delete_datastore_resource( - existing_stats_alias_of, api_key, ckan_url - ) + delete_datastore_resource(existing_stats_alias_of) delete_resource(existing_stats_alias_of, api_key, ckan_url) # run stats on stats CSV to get header names and infer data types From b5aca7b0830d58751e1043c5d193fedf9c30b2f1 Mon Sep 17 00:00:00 2001 From: tino097 Date: Wed, 15 May 2024 22:53:17 +0200 Subject: [PATCH 3/3] Replace rest of the requests calls --- ckanext/datapusher_plus/jobs.py | 61 ++++++++----------- .../02_ad4dccf78307_upgrade_logs_table.py | 32 ---------- 2 files changed, 25 insertions(+), 68 deletions(-) delete mode 100644 ckanext/datapusher_plus/migration/datapusher_plus/versions/02_ad4dccf78307_upgrade_logs_table.py diff --git a/ckanext/datapusher_plus/jobs.py b/ckanext/datapusher_plus/jobs.py index 340ba08..293c06f 100644 --- a/ckanext/datapusher_plus/jobs.py +++ b/ckanext/datapusher_plus/jobs.py @@ -104,7 +104,6 @@ def default(self, obj): return json.JSONEncoder.default(self, obj) - def delete_datastore_resource(resource_id): try: tk.get_action("datastore_delete")( @@ -138,7 +137,6 @@ def datastore_resource_exists(resource_id, api_key, ckan_url): return result except tk.ObjectNotFound: return False - def send_resource_to_datastore( @@ -181,48 +179,40 @@ def send_resource_to_datastore( raise utils.JobError("Error sending data to datastore ({!s}).".format(e)) -def update_resource(resource, ckan_url, api_key): +def update_resource(resource): """ - Updates resource in CKAN + Updates resource metadata """ + site_user = tk.get_action('get_site_user')({'ignore_auth': True}, {}) + context = { + 'ignore_auth': True, + 'user': site_user['name'], + 'auth_user_obj': None + } try: - tk.get_action("resource_update")( - {"ignore_auth": True}, resource - ) + tk.get_action("resource_update")(context, resource) except tk.ObjectNotFound: raise utils.JobError("Updating existing resource failed.") -def get_resource(resource_id, ckan_url, api_key): +def get_resource(resource_id): """ Gets available information about the resource from CKAN """ - url = get_url("resource_show", ckan_url) - r = requests.post( - url, - verify=SSL_VERIFY, - data=json.dumps({"id": resource_id}), - headers={"Content-Type": "application/json", "Authorization": api_key}, - ) - utils.check_response(r, url, "CKAN") + resource_dict = tk.get_action('resource_show')({'ignore_auth': True}, + {'id': resource_id}) - return r.json()["result"] + return resource_dict -def get_package(package_id, ckan_url, api_key): +def get_package(package_id): """ Gets available information about a package from CKAN """ - url = get_url("package_show", ckan_url) - r = requests.post( - url, - verify=SSL_VERIFY, - data=json.dumps({"id": package_id}), - headers={"Content-Type": "application/json", "Authorization": api_key}, - ) - utils.check_response(r, url, "CKAN") + dataset_dict = tk.get_action('package_show')({'ignore_auth': True}, + {'id': package_id}) - return r.json()["result"] + return dataset_dict def validate_input(input): @@ -326,9 +316,9 @@ def push_to_datastore(input, task_id, dry_run=False): with tempfile.TemporaryDirectory() as temp_dir: return _push_to_datastore(task_id, input, dry_run=dry_run, temp_dir=temp_dir) - + def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): - #add job to dn (datapusher_plus_jobs table) + # add job to dn (datapusher_plus_jobs table) try: dph.add_pending_job(task_id, **input) except sa.exc.IntegrityError: @@ -362,8 +352,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): raise utils.JobError("qsv version check error: {}".format(e)) qsv_version_info = str(qsv_version.stdout) qsv_semver = qsv_version_info[ - qsv_version_info.find(" ") : qsv_version_info.find("-") - ].lstrip() + qsv_version_info.find(" "): qsv_version_info.find("-")].lstrip() try: if semver.compare(qsv_semver, MINIMUM_QSV_VERSION) < 0: raise utils.JobError( @@ -383,11 +372,11 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): api_key = input.get("api_key") try: - resource = get_resource(resource_id, ckan_url, api_key) + resource = get_resource(resource_id) except utils.JobError: # try again in 5 seconds just incase CKAN is slow at adding resource time.sleep(5) - resource = get_resource(resource_id, ckan_url, api_key) + resource = get_resource(resource_id) # check if the resource url_type is a datastore if resource.get("url_type") == "datastore": @@ -1313,7 +1302,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): pii_resource["pii_preview"] = True pii_resource["pii_of_resource"] = resource_id pii_resource["total_record_count"] = pii_rows_with_matches - update_resource(pii_resource, ckan_url, api_key) + update_resource(pii_resource) pii_msg = ( "{} PII candidate/s in {} row/s are available at {} for review".format( @@ -1634,7 +1623,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): stats_resource["id"] = new_stats_resource_id stats_resource["summary_statistics"] = True stats_resource["summary_of_resource"] = resource_id - update_resource(stats_resource, ckan_url, api_key) + update_resource(stats_resource) cur.close() raw_connection.commit() @@ -1648,7 +1637,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): resource["preview"] = False resource["preview_rows"] = None resource["partial_download"] = False - update_resource(resource, ckan_url, api_key) + update_resource(resource) # tell CKAN to calculate_record_count and set alias if set send_resource_to_datastore( diff --git a/ckanext/datapusher_plus/migration/datapusher_plus/versions/02_ad4dccf78307_upgrade_logs_table.py b/ckanext/datapusher_plus/migration/datapusher_plus/versions/02_ad4dccf78307_upgrade_logs_table.py deleted file mode 100644 index 6d01aeb..0000000 --- a/ckanext/datapusher_plus/migration/datapusher_plus/versions/02_ad4dccf78307_upgrade_logs_table.py +++ /dev/null @@ -1,32 +0,0 @@ -"""empty message - -Revision ID: ad4dccf78307 -Revises: e9c4a88839c8 -Create Date: 2023-10-06 21:32:16.409225 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = 'ad4dccf78307' -down_revision = 'e9c4a88839c8' -branch_labels = None -depends_on = None - - -def upgrade(): - #upgrade logs table - op.add_column( - 'logs', - sa.Column( - 'id', - sa.Integer, - primary_key=True, - autoincrement=True), - ) - - -def downgrade(): - op.drop_column('logs', 'id')