diff --git a/ckanext/datapusher_plus/jobs.py b/ckanext/datapusher_plus/jobs.py index d368161..26ae4d0 100644 --- a/ckanext/datapusher_plus/jobs.py +++ b/ckanext/datapusher_plus/jobs.py @@ -19,6 +19,7 @@ import sys import traceback +import sqlalchemy as sa from pathlib import Path from datasize import DataSize from psycopg2 import sql @@ -535,7 +536,6 @@ def push_to_datastore(input, task_id, dry_run=False): file_last_modified = response.headers.get("last-modified") if file_last_modified: file_last_modified = parsedate(file_last_modified).replace(tzinfo=None) - breakpoint() if file_last_modified < resource_last_modified: resource_updated = True diff --git a/ckanext/datapusher_plus/logic/action.py b/ckanext/datapusher_plus/logic/action.py index a933b0c..98a3ab4 100644 --- a/ckanext/datapusher_plus/logic/action.py +++ b/ckanext/datapusher_plus/logic/action.py @@ -6,10 +6,9 @@ import logging import json import datetime -import time from dateutil.parser import parse as parse_date - +from six.moves.urllib.parse import urljoin import ckan.lib.helpers as h import ckan.lib.navl.dictization_functions @@ -71,6 +70,16 @@ def datapusher_submit(context, data_dict: dict[str, Any]): }) except logic.NotFound: return False + + callback_url_base = config.get('ckan.datapusher.callback_url_base') + if callback_url_base: + site_url = callback_url_base + callback_url = urljoin( + callback_url_base.rstrip('/'), '/api/3/action/datapusher_hook') + else: + site_url = h.url_for('/', qualified=True) + callback_url = h.url_for( + '/api/3/action/datapusher_hook', qualified=True) for plugin in p.PluginImplementations(interfaces.IDataPusher): upload = plugin.can_upload(res_id) @@ -150,12 +159,6 @@ def datapusher_submit(context, data_dict: dict[str, Any]): timeout = config.get('ckan.requests.timeout') # This setting is checked on startup api_token = utils.get_dp_plus_user_apitoken() - callback_url = tk.url_for( - "api.action", - ver=3, - logic_function="datapusher_hook", - qualified=True - ) data = { 'api_key': api_token, @@ -163,7 +166,7 @@ def datapusher_submit(context, data_dict: dict[str, Any]): 'result_url': callback_url, 'metadata': { 'ignore_hash': data_dict.get('ignore_hash', False), - 'ckan_url': tk.config.get('datapusher.site_url') or tk.config.get('ckan.site_url'), + 'ckan_url': site_url, 'resource_id': res_id, 'set_url_type': data_dict.get('set_url_type', False), 'task_created': task['last_updated'], @@ -176,52 +179,6 @@ def datapusher_submit(context, data_dict: dict[str, Any]): except Exception as e: log.error("Error submitting job to DataPusher: %s", e) return False - # try: - # r = requests.post( - # urljoin(datapusher_url, 'job'), - # headers={ - # 'Content-Type': 'application/json' - # }, - # timeout=timeout, - # data=json.dumps({ - # 'api_key': api_token, - # 'job_type': 'push_to_datastore', - # 'result_url': callback_url, - # 'metadata': { - # 'ignore_hash': data_dict.get('ignore_hash', False), - # 'ckan_url': site_url, - # 'resource_id': res_id, - # 'set_url_type': data_dict.get('set_url_type', False), - # 'task_created': task['last_updated'], - # 'original_url': resource_dict.get('url'), - # } - # })) - # except requests.exceptions.ConnectionError as e: - # error: dict[str, Any] = {'message': 'Could not connect to DataPusher.', - # 'details': str(e)} - # task['error'] = json.dumps(error) - # task['state'] = 'error' - # task['last_updated'] = str(datetime.datetime.utcnow()), - # p.toolkit.get_action('task_status_update')(context, task) - # raise p.toolkit.ValidationError(error) - # try: - # r.raise_for_status() - # except requests.exceptions.HTTPError as e: - # m = 'An Error occurred while sending the job: {0}'.format(str(e)) - # try: - # body = e.response.json() - # if body.get('error'): - # m += ' ' + body['error'] - # except ValueError: - # body = e.response.text - # error = {'message': m, - # 'details': body, - # 'status_code': r.status_code} - # task['error'] = json.dumps(error) - # task['state'] = 'error' - # task['last_updated'] = str(datetime.datetime.utcnow()), - # p.toolkit.get_action('task_status_update')(context, task) - # raise p.toolkit.ValidationError(error) value = json.dumps({'job_id': job.id}) task['value'] = value diff --git a/ckanext/datapusher_plus/utils.py b/ckanext/datapusher_plus/utils.py index aa843a9..8862980 100644 --- a/ckanext/datapusher_plus/utils.py +++ b/ckanext/datapusher_plus/utils.py @@ -101,7 +101,7 @@ def check_response( """ if not response.status_code: raise HTTPError( - "Xloader received an HTTP response with no status code", + "DP+ received an HTTP response with no status code", status_code=None, request_url=request_url, response=response.text,