Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature db models #120

Merged
merged 5 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions ckanext/datapusher_plus/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from ckanext.datapusher.cli import datapusher
from ckanext.datapusher_plus.model import init_tables


@datapusher.command()
def init_db():
"""Initialise the Datapusher Plus tables."""
init_tables()
print('Datapusher Plus tables created')

def get_commands():
return [init_db]
2 changes: 0 additions & 2 deletions ckanext/datapusher_plus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class DataPusherPlusConfig(MutableMapping):

PREFER_DMY: bool = False
PREVIEW_ROWS: int = 0
DOWNLOAD_PREVIEW_ONLY: bool = False

AUTO_INDEX_THRESHOLD: int = 3
AUTO_UNIQUE_INDEX:bool = True
Expand All @@ -79,7 +78,6 @@ class DataPusherPlusConfig(MutableMapping):
DEFAULT_EXCEL_SHEET: int = 0

ADD_SUMMARY_STATS_RESOURCE: bool = False
SUMMARY_STATS_WITH_PREVIEW: bool = False
SUMMARY_STATS_OPTIONS: str = ""

AUTO_ALIAS: bool = True
Expand Down
344 changes: 344 additions & 0 deletions ckanext/datapusher_plus/helpers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
# encoding: utf-8
from __future__ import annotations


import json
import logging
import datetime
from typing import Any
from sqlalchemy.orm import Query

import ckan.plugins.toolkit as toolkit
from ckan import model as ckan_model

from ckanext.datapusher_plus.model import Jobs, Metadata, Logs
import ckanext.datapusher_plus.job_exceptions as jex


log = logging.getLogger(__name__)

def datapusher_status(resource_id: str):
try:
return toolkit.get_action('datapusher_status')(
Expand All @@ -29,3 +41,335 @@ def datapusher_status_description(status: dict[str, Any]):
return captions.get(status['status'], status['status'].capitalize())
else:
return _('Not Uploaded Yet')


def get_job(job_id, limit=None, use_aps_id=False):
"""Return the job with the given job_id as a dict.

The dict also includes any metadata or logs associated with the job.

Returns None instead of a dict if there's no job with the given job_id.

The keys of a job dict are:

"job_id": The unique identifier for the job (unicode)

"job_type": The name of the job function that will be executed for this
job (unicode)

"status": The current status of the job, e.g. "pending", "complete", or
"error" (unicode)

"data": Any output data returned by the job if it has completed
successfully. This may be any JSON-serializable type, e.g. None, a
string, a dict, etc.

"error": If the job failed with an error this will be a dict with a
"message" key whose value is a string error message. The dict may also
have other keys specific to the particular type of error. If the job
did not fail with an error then "error" will be None.

"requested_timestamp": The time at which the job was requested (string)

"finished_timestamp": The time at which the job finished (string)

"sent_data": The input data for the job, provided by the client site.
This may be any JSON-serializable type, e.g. None, a string, a dict,
etc.

"result_url": The callback URL that CKAN Service Provider will post the
result to when the job finishes (unicode)

"api_key": The API key that CKAN Service Provider will use when posting
the job result to the result_url (unicode or None). A None here doesn't
mean that there was no API key: CKAN Service Provider deletes the API
key from the database after it has posted the result to the result_url.

"job_key": The key that users must provide (in the Authorization header of
the HTTP request) to be authorized to modify the job (unicode).
For example requests to the CKAN Service Provider API need this to get
the status or output data of a job or to delete a job.
If you login to CKAN Service Provider as an administrator then you can
administer any job without providing its job_key.

"metadata": Any custom metadata associated with the job (dict)

"logs": Any logs associated with the job (list)

"""
# Avoid SQLAlchemy "Unicode type received non-unicode bind param value"
# warnings.
if job_id:
job_id = str(job_id)
if use_aps_id:
result = Jobs.get_by_aps_id(use_aps_id)
else:
result = Jobs.get(job_id)

if not result:
return None

# Turn the result into a dictionary representation of the job.
result_dict = {}
for field in list(result.keys()):
value = getattr(result, field)
if value is None:
result_dict[field] = value
elif field in ("sent_data", "data", "error"):
result_dict[field] = json.loads(value)
elif isinstance(value, datetime.datetime):
result_dict[field] = value.isoformat()
else:
result_dict[field] = str(value)

result_dict["metadata"] = Metadata.get(job_id)
result_dict["logs"] = Logs.get_with_limit(job_id, limit=limit)

return result_dict

def add_pending_job(
job_id, api_key, job_type, job_key=None, data=None, metadata=None, result_url=None
):
"""Add a new job with status "pending" to the jobs table.

All code that adds jobs to the jobs table should go through this function.
Code that adds to the jobs table manually should be refactored to use this
function.

May raise unspecified exceptions from Python core, SQLAlchemy or JSON!
TODO: Document and unit test these!

:param job_id: a unique identifier for the job, used as the primary key in
ckanserviceprovider's "jobs" database table
:type job_id: unicode

:param job_key: the key required to administer the job via the API
:type job_key: unicode

:param job_type: the name of the job function that will be executed for
this job
:type job_key: unicode

:param api_key: the client site API key that ckanserviceprovider will use
when posting the job result to the result_url
:type api_key: unicode

:param data: The input data for the job (called sent_data elsewhere)
:type data: Any JSON-serializable type

:param metadata: A dict of arbitrary (key, value) metadata pairs to be
stored along with the job. The keys should be strings, the values can
be strings or any JSON-encodable type.
:type metadata: dict

:param result_url: the callback URL that ckanserviceprovider will post the
job result to when the job has finished
:type result_url: unicode

"""
if not data:
data = {}
data = json.dumps(data)

# Turn strings into unicode to stop SQLAlchemy
# "Unicode type received non-unicode bind param value" warnings.
if job_id:
job_id = str(job_id)
if job_type:
job_type = str(job_type)
if result_url:
result_url = str(result_url)
if api_key:
api_key = str(api_key)
if job_key:
job_key = str(job_key)
data = str(data)

if not metadata:
metadata = {}

job = Jobs(job_id, job_type, "pending", data, None, None, None, None, None, result_url, api_key, job_key)
try:
job.save()
except Exception as e:
raise e

inserts = {}
for key, value in metadata.items():
type_ = "string"
if not isinstance(value, str):
value = json.dumps(value)
type_ = "json"

# Turn strings into unicode to stop SQLAlchemy
# "Unicode type received non-unicode bind param value" warnings.
key = str(key)
value = str(value)

inserts.update({"job_id": job_id, "key": key, "value": value, "type": type_})
if inserts:
md = Metadata(**inserts)
try:
md.save()
except Exception as e:
raise e


def validate_error(error):
"""Validate and return the given error object.

Based on the given error object, return either None or a dict with a
"message" key whose value is a string (the dict may also have any other
keys that it wants).

The given "error" object can be:

- None, in which case None is returned

- A string, in which case a dict like this will be returned:
{"message": error_string}

- A dict with a "message" key whose value is a string, in which case the
dict will be returned unchanged

:param error: the error object to validate

:raises InvalidErrorObjectError: If the error object doesn't match any of
the allowed types

"""
if error is None:
return None
elif isinstance(error, str):
return {"message": error}
else:
try:
message = error["message"]
if isinstance(message, str):
return error
else:
raise jex.InvalidErrorObjectError("error['message'] must be a string")
except (TypeError, KeyError):
raise jex.InvalidErrorObjectError(
"error must be either a string or a dict with a message key"
)

def update_job(job_id, job_dict): # sourcery skip: raise-specific-error
"""Update the database row for the given job_id with the given job_dict.

All functions that update rows in the jobs table do it by calling this
helper function.

job_dict is a dict with values corresponding to the database columns that
should be updated, e.g.:

{"status": "complete", "data": ...}

"""
# Avoid SQLAlchemy "Unicode type received non-unicode bind param value"
# warnings.
if job_id:
job_id = str(job_id)

if "error" in job_dict:
job_dict["error"] = validate_error(job_dict["error"])
job_dict["error"] = json.dumps(job_dict["error"])
# Avoid SQLAlchemy "Unicode type received non-unicode bind param value"
# warnings.
job_dict["error"] = str(job_dict["error"])

# Avoid SQLAlchemy "Unicode type received non-unicode bind param value"
# warnings.
if "data" in job_dict:
job_dict["data"] = str(job_dict["data"])


try:
job = Jobs.get(job_id)
if not job:
raise Exception("Job not found")
#dicticize the job
jobs_dict = job.as_dict()
jobs_dict.update(job_dict)

Jobs.update(jobs_dict)

except Exception as e:
log.error("Failed to update job %s: %s", job_id, e)
raise e


def mark_job_as_completed(job_id, data=None):
"""Mark a job as completed successfully.

:param job_id: the job_id of the job to be updated
:type job_id: unicode

:param data: the output data returned by the job
:type data: any JSON-serializable type (including None)

"""
update_dict = {
"status": "complete",
"data": json.dumps(data),
"finished_timestamp": datetime.datetime.now(),
}
update_job(job_id, update_dict)


def mark_job_as_errored(job_id, error_object):
"""Mark a job as failed with an error.

:param job_id: the job_id of the job to be updated
:type job_id: unicode

:param error_object: the error returned by the job
:type error_object: either a string or a dict with a "message" key whose
value is a string

"""
update_dict = {
"status": "error",
"error": error_object,
"finished_timestamp": datetime.datetime.now(),
}
update_job(job_id, update_dict)


def mark_job_as_failed_to_post_result(job_id):
"""Mark a job as 'failed to post result'.

This happens when a job completes (either successfully or with an error)
then trying to post the job result back to the job's callback URL fails.

FIXME: This overwrites any error from the job itself!

:param job_id: the job_id of the job to be updated
:type job_id: unicode

"""
update_dict = {
"error": "Process completed but unable to post to result_url",
}
update_job(job_id, update_dict)


def delete_api_key(job_id):
"""Delete the given job's API key from the database.

The API key is used when posting the job's result to the client's callback
URL. This function should be called to delete the API key after the result
has been posted - the API key is no longer needed.

"""
update_job(job_id, {"api_key": None})


def set_aps_job_id(job_id, aps_job_id):

update_job(job_id, {"aps_job_id": aps_job_id})





3 changes: 3 additions & 0 deletions ckanext/datapusher_plus/job_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,6 @@ class LoaderError(JobError):
"""Exception that's raised if a load fails"""

pass

class InvalidErrorObjectError(Exception):
pass
Loading