From 6a2665e05e9cf1fe8a793e0cccaee7cb045bf013 Mon Sep 17 00:00:00 2001 From: Reid Hewitt Date: Tue, 27 Feb 2024 09:14:50 -0700 Subject: [PATCH 01/20] add custom exceptions --- harvester/exceptions.py | 31 ++++++++ harvester/harvest.py | 152 +++++++++++++++++++++++++++------------- 2 files changed, 133 insertions(+), 50 deletions(-) create mode 100644 harvester/exceptions.py diff --git a/harvester/exceptions.py b/harvester/exceptions.py new file mode 100644 index 00000000..3a6b6ae8 --- /dev/null +++ b/harvester/exceptions.py @@ -0,0 +1,31 @@ +# critical exceptions + + +# irrecoverable/critical exceptions +class ExtractHarvestSourceException(Exception): + pass + + +class ExtractCKANSourceException(Exception): + pass + + +class CompareException(Exception): + pass + + +# non-critical exceptions +class ValidationException(Exception): + pass + + +class TranformationException(Exception): + pass + + +class DCATUSToCKANException(Exception): + pass + + +class SynchronizeException(Exception): + pass diff --git a/harvester/harvest.py b/harvester/harvest.py index e128dfea..82c232d1 100644 --- a/harvester/harvest.py +++ b/harvester/harvest.py @@ -24,6 +24,7 @@ sort_dataset, ) from .ckan_utils import munge_tag, munge_title_to_name +from .exceptions import * load_dotenv() @@ -243,16 +244,19 @@ def compare(self) -> None: """Compares records""" logger.info("comparing harvest and ckan records") - harvest_ids = set(self.records.keys()) - ckan_ids = set(self.ckan_records.keys()) - same_ids = harvest_ids & ckan_ids + try: + harvest_ids = set(self.records.keys()) + ckan_ids = set(self.ckan_records.keys()) + same_ids = harvest_ids & ckan_ids - self.compare_data["delete"] = ckan_ids - harvest_ids - self.compare_data["create"] = harvest_ids - ckan_ids + self.compare_data["delete"] = ckan_ids - harvest_ids + self.compare_data["create"] = harvest_ids - ckan_ids - for i in same_ids: - if self.records[i].metadata_hash != self.ckan_records[i][0]: - self.compare_data["update"].add(i) + for i in same_ids: + if self.records[i].metadata_hash != self.ckan_records[i][0]: + self.compare_data["update"].add(i) + except Exception as e: + raise CompareException("failed to run compare. exiting job") def get_ckan_records(self, results=[]) -> None: logger.info("querying ckan") @@ -265,40 +269,61 @@ def get_ckan_records(self, results=[]) -> None: def get_ckan_records_as_id_hash(self) -> None: logger.info("retrieving and preparing ckan records") - self.ckan_to_id_hash(self.get_ckan_records(results=[])) + try: + self.ckan_to_id_hash(self.get_ckan_records(results=[])) + except Exception as e: + raise ExtractCKANSourceException( + "failed to extract ckan records. exiting job" + ) def get_harvest_records_as_id_hash(self) -> None: logger.info("retrieving and preparing harvest records") - if self.extract_type == "datajson": - download_res = self.download_dcatus() - if download_res is None or "dataset" not in download_res: - logger.warning( - "no valid response from server or lack of 'dataset' array" + try: + if self.extract_type == "datajson": + download_res = self.download_dcatus() + if download_res is None or "dataset" not in download_res: + logger.warning( + "no valid response from server or lack of 'dataset' array" + ) + self.no_harvest_resp = True + raise ExtractHarvestSourceException( + "no valid response from server or lack of 'dataset' array. exiting job" + ) + self.harvest_to_id_hash(download_res["dataset"]) + if self.extract_type == "waf-collection": + # TODO: break out the xml catalogs as records + # TODO: handle no response + self.harvest_to_id_hash( + self.download_waf(self.traverse_waf(self.url, **self.waf_config)) ) - self.no_harvest_resp = True - return - self.harvest_to_id_hash(download_res["dataset"]) - if self.extract_type == "waf-collection": - # TODO: break out the xml catalogs as records - # TODO: handle no response - self.harvest_to_id_hash( - self.download_waf(self.traverse_waf(self.url, **self.waf_config)) + except Exception as e: + raise ExtractHarvestSourceException( + "extract from harvest failed. exiting job" ) def get_record_changes(self) -> None: """determine which records needs to be updated, deleted, or created""" + # irrecoverable error logger.info(f"getting records changes for {self.title} using {self.url}") try: self.get_ckan_records_as_id_hash() self.get_harvest_records_as_id_hash() - if self.no_harvest_resp is True: - return self.compare() - except Exception as e: - logger.error(self.title + " " + self.url + " " "\n") - logger.error( - "\n".join(traceback.format_exception(None, e, e.__traceback__)) - ) + except ExtractCKANSourceException as e: + # write to log and write to db error table + pass + except ExtractHarvestSourceException as e: + # write to log and write to db error table + pass + except CompareException as e: + # write to log and write to db error table + pass + + # except Exception as e: + # logger.error(self.title + " " + self.url + " " "\n") + # logger.error( + # "\n".join(traceback.format_exception(None, e, e.__traceback__)) + # ) def synchronize_records(self) -> None: """runs the delete, update, and create @@ -324,11 +349,22 @@ def synchronize_records(self) -> None: record.validate() # TODO: add transformation and validation record.sync() - except Exception as e: - logger.error(f"error processing '{operation}' on record {i}\n") - logger.error( - "\n".join(traceback.format_exception(None, e, e.__traceback__)) - ) + + except ValidationException as e: + # do something + pass + except DCATUSToCKANException as e: + # do something + pass + except SynchronizeException as e: + # do something + pass + + # except Exception as e: + # logger.error(f"error processing '{operation}' on record {i}\n") + # logger.error( + # "\n".join(traceback.format_exception(None, e, e.__traceback__)) + # ) def report(self) -> None: logger.info("report results") @@ -601,16 +637,24 @@ def simple_transform(self, metadata: dict) -> dict: def ckanify_dcatus(self) -> None: logger.info("ckanifying dcatus record") - self.ckanified_metadata = self.simple_transform(self.metadata) - self.ckanified_metadata["resources"] = self.create_ckan_resources(self.metadata) - self.ckanified_metadata["tags"] = ( - self.create_ckan_tags(self.metadata["keyword"]) - if "keyword" in self.metadata - else [] - ) - self.ckanified_metadata["extras"] = self.create_ckan_extras() - logger.info("completed ckanifying dcatus record") + try: + self.ckanified_metadata = self.simple_transform(self.metadata) + + self.ckanified_metadata["resources"] = self.create_ckan_resources( + self.metadata + ) + self.ckanified_metadata["tags"] = ( + self.create_ckan_tags(self.metadata["keyword"]) + if "keyword" in self.metadata + else [] + ) + self.ckanified_metadata["extras"] = self.create_ckan_extras() + logger.info("completed ckanifying dcatus record") + except Exception as e: + raise DCATUSToCKANException( + "failed to convert from dcatus to ckan. skipping record" + ) def validate(self) -> None: logger.info(f"validating {self.identifier}") @@ -621,6 +665,7 @@ def validate(self) -> None: except Exception as e: self.validation_msg = str(e) # TODO: verify this is what we want self.valid = False + raise ValidationException("failed validation") # def transform(self, metadata: dict): # """Transforms records""" @@ -646,9 +691,12 @@ def update_record(self) -> dict: self.status = "updated" def delete_record(self) -> None: - # purge returns nothing - ckan.action.dataset_purge(**{"id": self.identifier}) - self.status = "deleted" + try: + ckan.action.dataset_purge(**{"id": self.identifier}) + self.status = "deleted" + except Exception as e: + # do something with e + raise SynchronizeException("failed to synchronize record") def sync(self) -> None: if self.valid is False: @@ -659,10 +707,14 @@ def sync(self) -> None: start = datetime.now() - if self.operation == "create": - self.create_record() - if self.operation == "update": - self.update_record() + try: + if self.operation == "create": + self.create_record() + if self.operation == "update": + self.update_record() + except Exception as e: + # do something with e + raise SynchronizeException("failed to synchronize record") logger.info( f"time to {self.operation} {self.identifier} {datetime.now()-start}" From 62c6f51a7878282e3f68be609e8788a147d56fa5 Mon Sep 17 00:00:00 2001 From: Reid Hewitt Date: Tue, 5 Mar 2024 12:40:54 -0700 Subject: [PATCH 02/20] add log configuration and custom logger --- harvester/__init__.py | 11 +++++++++++ harvester/logger_config.py | 26 ++++++++++++++++++++++++++ harvester/loggers.py | 21 +++++++++++++++++++++ 3 files changed, 58 insertions(+) create mode 100644 harvester/logger_config.py create mode 100644 harvester/loggers.py diff --git a/harvester/__init__.py b/harvester/__init__.py index e69de29b..2d037132 100644 --- a/harvester/__init__.py +++ b/harvester/__init__.py @@ -0,0 +1,11 @@ +import logging.config +from .logger_config import LOGGING_CONFIG + +# logging data +# TODO: wire this up! +# db_session = get_sqlachemy_session( +# "postgresql://placeholder-user:placeholder-pass@0.0.0.0:5432/testdb" +# ) +db_session = None # TODO: wire this up! +LOGGING_CONFIG["handlers"]["db"]["session"] = db_session +logging.config.dictConfig(LOGGING_CONFIG) diff --git a/harvester/logger_config.py b/harvester/logger_config.py new file mode 100644 index 00000000..070d80b7 --- /dev/null +++ b/harvester/logger_config.py @@ -0,0 +1,26 @@ +LOGGING_CONFIG = { + "version": 1, + "formatters": { + "standard": { + "format": ( + "[%(asctime)s] %(levelname)s " + "[%(name)s.%(funcName)s:%(lineno)d] %(message)s" + ) + }, + }, + "handlers": { + "console": { + "level": "INFO", + "formatter": "standard", + "class": "logging.StreamHandler", + "stream": "ext://sys.stdout", + }, + "db": {"level": "ERROR", "class": "harvester.loggers.LogDBHandler"}, + }, + "loggers": { + "harvest_runner": { + "handlers": ["console", "db"], + "level": "INFO", + }, + }, +} diff --git a/harvester/loggers.py b/harvester/loggers.py new file mode 100644 index 00000000..15a1f5e5 --- /dev/null +++ b/harvester/loggers.py @@ -0,0 +1,21 @@ +import logging + + +class LogDBHandler(logging.Handler): + def __init__(self, session): + logging.Handler.__init__(self) + self.session = session + + def emit(self, record): + print(record) + + # TODO: wire up storing the harvest_error records somewhere + # harvest_error = HarvestError( + # message=record.__dict__["msg"], + # harvest_job_id=record.__dict__["harvest_job_id"], + # severity=record.__dict__["severity"], + # type=record.__dict__["type"], + # date_created=datetime.now(), + # ) + # self.session.add(harvest_error) + # self.session.commit() From 8aa04e422c805b1762514875ea1d9203b6a3cea7 Mon Sep 17 00:00:00 2001 From: Reid Hewitt Date: Tue, 5 Mar 2024 12:41:18 -0700 Subject: [PATCH 03/20] add custom exceptions --- harvester/exceptions.py | 59 ++++++++++-- harvester/harvest.py | 193 ++++++++++++++++++++++------------------ 2 files changed, 157 insertions(+), 95 deletions(-) diff --git a/harvester/exceptions.py b/harvester/exceptions.py index 3a6b6ae8..8f44cb39 100644 --- a/harvester/exceptions.py +++ b/harvester/exceptions.py @@ -1,31 +1,74 @@ +import logging + +# logging.getLogger(name) follows a singleton pattern +logger = logging.getLogger("harvest_runner") + + # critical exceptions +class HarvestCriticalException(Exception): + def __init__(self, msg, harvest_job_id): + super().__init__(msg, harvest_job_id) + + self.msg = msg + self.harvest_job_id = harvest_job_id + self.severity = "CRITICAL" + self.type = "job" + + self.extras = { + # m = message. need to avoid conflicting with existing kwargs + "m": self.msg, + "harvest_job_id": self.harvest_job_id, + "severity": self.severity, + "type": self.type, + } + + logger.critical(self.msg, exc_info=True, extra=self.extras) -# irrecoverable/critical exceptions -class ExtractHarvestSourceException(Exception): +class ExtractHarvestSourceException(HarvestCriticalException): pass -class ExtractCKANSourceException(Exception): +class ExtractCKANSourceException(HarvestCriticalException): pass -class CompareException(Exception): +class CompareException(HarvestCriticalException): pass # non-critical exceptions -class ValidationException(Exception): +class HarvestNonCriticalException(Exception): + def __init__(self, msg, harvest_job_id): + super().__init__(msg, harvest_job_id) + + self.msg = msg + self.harvest_job_id = harvest_job_id + self.severity = "ERROR" + self.type = "record" + + extras = { + # m = message. need to avoid conflicting with existing kwargs + "m": self.msg, + "harvest_job_id": self.harvest_job_id, + "severity": self.severity, + "type": self.type, + } + + logger.error(msg, exc_info=True, extra=extras) + + +class ValidationException(HarvestNonCriticalException): pass -class TranformationException(Exception): +class TranformationException(HarvestNonCriticalException): pass -class DCATUSToCKANException(Exception): +class DCATUSToCKANException(HarvestNonCriticalException): pass -class SynchronizeException(Exception): +class SynchronizeException(HarvestNonCriticalException): pass diff --git a/harvester/harvest.py b/harvester/harvest.py index 82c232d1..fec8ecb9 100644 --- a/harvester/harvest.py +++ b/harvester/harvest.py @@ -1,14 +1,12 @@ import json -import logging import os import re -import sys -import traceback import xml.etree.ElementTree as ET from dataclasses import asdict, dataclass, field from datetime import datetime from pathlib import Path import functools +import logging import ckanapi import requests @@ -23,32 +21,35 @@ open_json, sort_dataset, ) -from .ckan_utils import munge_tag, munge_title_to_name -from .exceptions import * - -load_dotenv() -logging.basicConfig( - level=logging.NOTSET, - format="[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s", - handlers=[ - logging.FileHandler("harvest_load.log"), - logging.StreamHandler(sys.stdout), - ], +from .ckan_utils import munge_tag, munge_title_to_name +from .exceptions import ( + ExtractHarvestSourceException, + ExtractCKANSourceException, + ValidationException, + DCATUSToCKANException, + SynchronizeException, + CompareException, ) -logger = logging.getLogger(__name__) +load_dotenv() -# TODD: make sure this doesn't change all requests +# requests data session = requests.Session() +# TODD: make sure this timeout config doesn't change all requests! session.request = functools.partial(session.request, timeout=15) +# ckan entrypoint ckan = ckanapi.RemoteCKAN( os.getenv("CKAN_URL_STAGE"), apikey=os.getenv("CKAN_API_TOKEN_STAGE"), session=session, ) +# logging data +# logging.getLogger(name) follows a singleton pattern +logger = logging.getLogger("harvest_runner") + ROOT_DIR = Path(__file__).parents[0] @@ -61,9 +62,10 @@ class HarvestSource: _title: str _url: str _owner_org: str # uuid + _job_id: str _extract_type: str # "datajson" or "waf-collection" _waf_config: dict = field(default_factory=lambda: {}) - _extra_source_name: str = "harvest_source_name" + _extra_source_name: str = "harvest_source_name" # TODO: change this! _dataset_schema: dict = field( default_factory=lambda: open_json( ROOT_DIR / "data" / "dcatus" / "schemas" / "dataset.json" @@ -112,9 +114,12 @@ def url(self) -> str: def owner_org(self) -> str: return self._owner_org + @property + def job_id(self) -> str: + return self._job_id + @property def extract_type(self) -> str: - # TODO: this is probably safe to assume right? if "json" in self._extract_type: return "datajson" if "waf" in self._extract_type: @@ -171,18 +176,26 @@ def download_dcatus(self): return resp.json() def harvest_to_id_hash(self, records: list[dict]) -> None: + # ruff: noqa: F841 logger.info("converting harvest records to id: hash") for record in records: - if self.extract_type == "datajson": - if "identifier" not in record: # TODO: what to do? - continue - identifier = record["identifier"] - dataset_hash = dataset_to_hash(sort_dataset(record)) - if self.extract_type == "waf-collection": - identifier = self.get_title_from_fgdc(record["content"]) - dataset_hash = dataset_to_hash(record["content"].decode("utf-8")) - - self.records[identifier] = Record(self, identifier, record, dataset_hash) + try: + if self.extract_type == "datajson": + identifier = record["identifier"] + dataset_hash = dataset_to_hash(sort_dataset(record)) + if self.extract_type == "waf-collection": + identifier = self.get_title_from_fgdc(record["content"]) + dataset_hash = dataset_to_hash(record["content"].decode("utf-8")) + + self.records[identifier] = Record( + self, identifier, record, dataset_hash + ) + except Exception as e: + # TODO: do something with 'e' + raise ExtractHarvestSourceException( + f"{self.title} {self.url} failed to convert record to id: hash format. exiting.", + self.job_id, + ) def ckan_to_id_hash(self, results: list[dict]) -> None: logger.info("converting ckan records to id: hash") @@ -242,6 +255,7 @@ def download_waf(self, files): def compare(self) -> None: """Compares records""" + # ruff: noqa: F841 logger.info("comparing harvest and ckan records") try: @@ -256,7 +270,10 @@ def compare(self) -> None: if self.records[i].metadata_hash != self.ckan_records[i][0]: self.compare_data["update"].add(i) except Exception as e: - raise CompareException("failed to run compare. exiting job") + # TODO: do something with 'e' + raise CompareException( + f"{self.title} {self.url} failed to run compare. exiting.", self.job_id + ) def get_ckan_records(self, results=[]) -> None: logger.info("querying ckan") @@ -271,9 +288,11 @@ def get_ckan_records_as_id_hash(self) -> None: logger.info("retrieving and preparing ckan records") try: self.ckan_to_id_hash(self.get_ckan_records(results=[])) - except Exception as e: + except Exception as e: # ruff: noqa: E841 + # TODO: do something with 'e' raise ExtractCKANSourceException( - "failed to extract ckan records. exiting job" + f"{self.title} {self.url} failed to extract ckan records. exiting.", + self.job_id, ) def get_harvest_records_as_id_hash(self) -> None: @@ -281,14 +300,6 @@ def get_harvest_records_as_id_hash(self) -> None: try: if self.extract_type == "datajson": download_res = self.download_dcatus() - if download_res is None or "dataset" not in download_res: - logger.warning( - "no valid response from server or lack of 'dataset' array" - ) - self.no_harvest_resp = True - raise ExtractHarvestSourceException( - "no valid response from server or lack of 'dataset' array. exiting job" - ) self.harvest_to_id_hash(download_res["dataset"]) if self.extract_type == "waf-collection": # TODO: break out the xml catalogs as records @@ -296,34 +307,27 @@ def get_harvest_records_as_id_hash(self) -> None: self.harvest_to_id_hash( self.download_waf(self.traverse_waf(self.url, **self.waf_config)) ) - except Exception as e: + except Exception as e: # ruff: noqa: E841 + # TODO: do something with 'e' raise ExtractHarvestSourceException( - "extract from harvest failed. exiting job" + f"{self.title} {self.url} failed to extract harvest source. exiting", + self.job_id, ) def get_record_changes(self) -> None: """determine which records needs to be updated, deleted, or created""" - # irrecoverable error logger.info(f"getting records changes for {self.title} using {self.url}") try: self.get_ckan_records_as_id_hash() self.get_harvest_records_as_id_hash() self.compare() - except ExtractCKANSourceException as e: - # write to log and write to db error table - pass - except ExtractHarvestSourceException as e: - # write to log and write to db error table - pass - except CompareException as e: - # write to log and write to db error table - pass - - # except Exception as e: - # logger.error(self.title + " " + self.url + " " "\n") - # logger.error( - # "\n".join(traceback.format_exception(None, e, e.__traceback__)) - # ) + except ( + ExtractCKANSourceException, + ExtractHarvestSourceException, + CompareException, + ) as e: + # TODO: do something with 'e'? + raise def synchronize_records(self) -> None: """runs the delete, update, and create @@ -350,22 +354,14 @@ def synchronize_records(self) -> None: # TODO: add transformation and validation record.sync() - except ValidationException as e: - # do something - pass - except DCATUSToCKANException as e: - # do something - pass - except SynchronizeException as e: - # do something + except ( + ValidationException, + DCATUSToCKANException, + SynchronizeException, + ) as e: + # TODO: do something with 'e'? pass - # except Exception as e: - # logger.error(f"error processing '{operation}' on record {i}\n") - # logger.error( - # "\n".join(traceback.format_exception(None, e, e.__traceback__)) - # ) - def report(self) -> None: logger.info("report results") # log our original compare data @@ -416,6 +412,7 @@ def prepare_for_s3_upload(self) -> dict: return json.dumps(data, default=convert_set_to_list) def upload_to_s3(self, s3handler: S3Handler) -> None: + # ruff: noqa: F841 try: # TODO: confirm out_path out_path = f"{s3handler.endpoint_url}/{self.title}/job-id/{self.title}.json" @@ -423,10 +420,11 @@ def upload_to_s3(self, s3handler: S3Handler) -> None: logger.info(f"saved harvest source {self.title} in s3 at {out_path}") return out_path except Exception as e: - logger.error(f"error uploading harvest source ({self.title}) to s3 \n") - logger.error( - "\n".join(traceback.format_exception(None, e, e.__traceback__)) - ) + # logger.error(f"error uploading harvest source ({self.title}) to s3 \n") + # logger.error( + # "\n".join(traceback.format_exception(None, e, e.__traceback__)) + # ) + pass return False @@ -636,6 +634,7 @@ def simple_transform(self, metadata: dict) -> dict: return output def ckanify_dcatus(self) -> None: + # ruff: noqa: F841 logger.info("ckanifying dcatus record") try: @@ -652,12 +651,15 @@ def ckanify_dcatus(self) -> None: self.ckanified_metadata["extras"] = self.create_ckan_extras() logger.info("completed ckanifying dcatus record") except Exception as e: + # TODO: something with 'e' raise DCATUSToCKANException( - "failed to convert from dcatus to ckan. skipping record" + f"unable to ckanify dcatus record {self.identifier}", + self.harvest_source.job_id, ) def validate(self) -> None: logger.info(f"validating {self.identifier}") + # ruff: noqa: F841 validator = Draft202012Validator(self.harvest_source.dataset_schema) try: validator.validate(self.metadata) @@ -665,7 +667,10 @@ def validate(self) -> None: except Exception as e: self.validation_msg = str(e) # TODO: verify this is what we want self.valid = False - raise ValidationException("failed validation") + # TODO: do something with 'e' in logger? + raise ValidationException( + f"{self.identifier} failed validation", self.harvest_source.job_id + ) # def transform(self, metadata: dict): # """Transforms records""" @@ -691,14 +696,19 @@ def update_record(self) -> dict: self.status = "updated" def delete_record(self) -> None: + # ruff: noqa: F841 try: ckan.action.dataset_purge(**{"id": self.identifier}) self.status = "deleted" except Exception as e: - # do something with e - raise SynchronizeException("failed to synchronize record") + # TODO: something with 'e' + raise SynchronizeException( + f"failed to delete {self.identifier}", + self.harvest_source.job_id, + ) def sync(self) -> None: + # ruff: noqa: F841 if self.valid is False: logger.warning(f"{self.identifier} is invalid. bypassing {self.operation}") return @@ -713,8 +723,11 @@ def sync(self) -> None: if self.operation == "update": self.update_record() except Exception as e: - # do something with e - raise SynchronizeException("failed to synchronize record") + # TODO: something with 'e' + raise SynchronizeException( + f"failed to {self.operation} for {self.identifier}", + self.harvest_source.job_id, + ) logger.info( f"time to {self.operation} {self.identifier} {datetime.now()-start}" @@ -735,6 +748,7 @@ def prepare_for_s3_upload(self) -> dict: def upload_to_s3(self, s3handler: S3Handler) -> None: # ruff: noqa: E501 + # ruff: noqa: F841 try: out_path = f"{s3handler.endpoint_url}/{self.harvest_source.title}/job-id/{self.status}/{self.identifier}.json" s3handler.put_object(self.prepare_for_s3_upload(), out_path) @@ -743,11 +757,16 @@ def upload_to_s3(self, s3handler: S3Handler) -> None: ) return out_path except Exception as e: - logger.error( - f"error uploading harvest record ({self.identifier} of {self.harvest_source.title}) to s3 \n" - ) - logger.error( - "\n".join(traceback.format_exception(None, e, e.__traceback__)) - ) + # TODO: this exception + pass return False + + +def main(): + + pass + + +if __name__ == "__main__": + main() From 7f37dc37aac3f041636a6e920f2e4520a502eaa8 Mon Sep 17 00:00:00 2001 From: Reid Hewitt Date: Tue, 5 Mar 2024 12:41:59 -0700 Subject: [PATCH 04/20] tweak test catalogs --- .../data/dcatus/jsons/null-spatial.data.json | 43 ------------------- .../harvest-sources/dcatus/missing_title.json | 36 ++++++++++++++++ 2 files changed, 36 insertions(+), 43 deletions(-) delete mode 100644 harvester/data/dcatus/jsons/null-spatial.data.json create mode 100644 tests/harvest-sources/dcatus/missing_title.json diff --git a/harvester/data/dcatus/jsons/null-spatial.data.json b/harvester/data/dcatus/jsons/null-spatial.data.json deleted file mode 100644 index fd146ea5..00000000 --- a/harvester/data/dcatus/jsons/null-spatial.data.json +++ /dev/null @@ -1,43 +0,0 @@ -{ - "@type": "dcat:Catalog", - "describedBy": "https://project-open-data.cio.gov/v1.1/schema/catalog.json", - "conformsTo": "https://project-open-data.cio.gov/v1.1/schema", - "@context": "https://project-open-data.cio.gov/v1.1/schema/catalog.jsonld", - "dataset": [ - { - "identifier": "null-spatial", - "accessLevel": "public", - "contactPoint": { - "hasEmail": "mailto:Alexis.Graves@ocio.usda.gov", - "@type": "vcard:Contact", - "fn": "Alexi Graves" - }, - "programCode": [ - "005:059" - ], - "description": "Sample dataset. Spatial can be null", - "title": "Sample Title NUll Spatial", - "distribution": [ - { - "@type": "dcat:Distribution", - "downloadURL": "http://www.dm.usda.gov/foia/docs/Copy%20of%20ECM%20Congressional%20Logs%20FY14.xls", - "mediaType": "application/vnd.ms-excel", - "title": "Congressional Logs for Fiscal Year 2014" - } - ], - "license": "https://creativecommons.org/publicdomain/zero/1.0/", - "bureauCode": [ - "005:12" - ], - "modified": "2014-10-03", - "publisher": { - "@type": "org:Organization", - "name": "Department of Agriculture" - }, - "spatial": null, - "keyword": [ - "Congressional Logs" - ] - } - ] -} \ No newline at end of file diff --git a/tests/harvest-sources/dcatus/missing_title.json b/tests/harvest-sources/dcatus/missing_title.json new file mode 100644 index 00000000..faca3065 --- /dev/null +++ b/tests/harvest-sources/dcatus/missing_title.json @@ -0,0 +1,36 @@ +{ + "@type": "dcat:Catalog", + "describedBy": "https://project-open-data.cio.gov/v1.1/schema/catalog.json", + "conformsTo": "https://project-open-data.cio.gov/v1.1/schema", + "@context": "https://project-open-data.cio.gov/v1.1/schema/catalog.jsonld", + "dataset": [ + { + "identifier": "null-spatial", + "accessLevel": "public", + "contactPoint": { + "hasEmail": "mailto:Alexis.Graves@ocio.usda.gov", + "@type": "vcard:Contact", + "fn": "Alexi Graves" + }, + "programCode": ["005:059"], + "description": "Sample dataset. Spatial can be null", + "distribution": [ + { + "@type": "dcat:Distribution", + "downloadURL": "http://www.dm.usda.gov/foia/docs/Copy%20of%20ECM%20Congressional%20Logs%20FY14.xls", + "mediaType": "application/vnd.ms-excel", + "title": "Congressional Logs for Fiscal Year 2014" + } + ], + "license": "https://creativecommons.org/publicdomain/zero/1.0/", + "bureauCode": ["005:12"], + "modified": "2014-10-03", + "publisher": { + "@type": "org:Organization", + "name": "Department of Agriculture" + }, + "spatial": null, + "keyword": ["Congressional Logs"] + } + ] +} From 21197753274b333033d3ba9062716f9499bff8cc Mon Sep 17 00:00:00 2001 From: Reid Hewitt Date: Tue, 5 Mar 2024 12:42:26 -0700 Subject: [PATCH 05/20] update fixtures --- tests/conftest.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index 204d31f6..b1875132 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ from pathlib import Path +from uuid import uuid4 import pytest @@ -7,6 +8,17 @@ HARVEST_SOURCES = Path(__file__).parents[0] / "harvest-sources" +@pytest.fixture +def bad_url_dcatus_config() -> dict: + return { + "_title": "test_harvest_source_title", + "_url": "http://localhost/dcatus/bad_url.json", + "_extract_type": "datajson", + "_owner_org": "example_organization", + "_job_id": str(uuid4()), + } + + @pytest.fixture def dcatus_config() -> dict: """example dcatus job payload""" @@ -15,6 +27,18 @@ def dcatus_config() -> dict: "_url": "http://localhost/dcatus/dcatus.json", "_extract_type": "datajson", "_owner_org": "example_organization", + "_job_id": str(uuid4()), + } + + +@pytest.fixture +def invalid_dcatus_config() -> dict: + return { + "_title": "test_harvest_source_name", + "_url": "http://localhost/dcatus/missing_title.json", + "_extract_type": "datajson", + "_owner_org": "example_organization", + "_job_id": str(uuid4()), } @@ -27,6 +51,7 @@ def waf_config() -> dict: "_extract_type": "waf-collection", "_owner_org": "example_organization", "_waf_config": {"filters": ["../", "dcatus/"]}, + "_job_id": str(uuid4()), } @@ -38,6 +63,7 @@ def dcatus_compare_config() -> dict: "_url": "http://localhost/dcatus/dcatus_compare.json", "_extract_type": "datajson", "_owner_org": "example_organization", + "_job_id": str(uuid4()), } From 8544f40bbfedb89ed6f7d0ece4ac5f6d49b04507 Mon Sep 17 00:00:00 2001 From: Reid Hewitt Date: Tue, 5 Mar 2024 12:42:37 -0700 Subject: [PATCH 06/20] add exception handling tests --- .../unit/exception/test_exception_handling.py | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 tests/unit/exception/test_exception_handling.py diff --git a/tests/unit/exception/test_exception_handling.py b/tests/unit/exception/test_exception_handling.py new file mode 100644 index 00000000..d31a998b --- /dev/null +++ b/tests/unit/exception/test_exception_handling.py @@ -0,0 +1,62 @@ +import pytest +from unittest.mock import patch + +import harvester +from harvester.harvest import HarvestSource +from harvester.exceptions import ( + ExtractHarvestSourceException, + ExtractCKANSourceException, + ValidationException, + DCATUSToCKANException, + SynchronizeException, +) + +import ckanapi + +# ruff: noqa: F401 +# ruff: noqa: F841 + + +class TestExceptionHandling: + def test_bad_harvest_source_url_exception(self, bad_url_dcatus_config): + harvest_source = HarvestSource(**bad_url_dcatus_config) + + with pytest.raises(ExtractHarvestSourceException) as e: + harvest_source.get_harvest_records_as_id_hash() + + @patch("harvester.harvest.ckan", ckanapi.RemoteCKAN("mock_address")) + def test_get_ckan_records_exception(self, bad_url_dcatus_config): + # using bad_url_dcatus_config just to populate required fields + harvest_source = HarvestSource(**bad_url_dcatus_config) + + with pytest.raises(ExtractCKANSourceException) as e: + harvest_source.get_ckan_records_as_id_hash() + + def test_validation_exception(self, invalid_dcatus_config): + harvest_source = HarvestSource(**invalid_dcatus_config) + harvest_source.get_harvest_records_as_id_hash() + + test_record = harvest_source.records["null-spatial"] + + with pytest.raises(ValidationException) as e: + test_record.validate() + + def test_dcatus_to_ckan_exception(self, invalid_dcatus_config): + harvest_source = HarvestSource(**invalid_dcatus_config) + harvest_source.get_harvest_records_as_id_hash() + + test_record = harvest_source.records["null-spatial"] + + with pytest.raises(DCATUSToCKANException) as e: + test_record.ckanify_dcatus() + + @patch("harvester.harvest.ckan", ckanapi.RemoteCKAN("mock_address")) + def test_synchronization_exception(self, dcatus_config): + harvest_source = HarvestSource(**dcatus_config) + harvest_source.get_harvest_records_as_id_hash() + + test_record = harvest_source.records["cftc-dc1"] + test_record.operation = "create" + + with pytest.raises(SynchronizeException) as e: + test_record.sync() From f478433d8d0d1ebf8cee9eb6e133b100e2b8da73 Mon Sep 17 00:00:00 2001 From: Reid Hewitt Date: Tue, 5 Mar 2024 12:42:53 -0700 Subject: [PATCH 07/20] remove uneeded functions and add hash comment --- harvester/utils.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/harvester/utils.py b/harvester/utils.py index 042f86df..51614b0e 100644 --- a/harvester/utils.py +++ b/harvester/utils.py @@ -1,8 +1,6 @@ import hashlib import json import os -import string -import random import boto3 import sansjson @@ -10,16 +8,6 @@ # ruff: noqa: F841 -def create_random_text(str_len: int) -> str: - - alphabet = string.ascii_lowercase - output = "" - - for _ in range(str_len): - output += alphabet[random.randint(0, len(alphabet))] - return output - - def convert_set_to_list(obj): if isinstance(obj, set): return list(obj) @@ -31,6 +19,8 @@ def sort_dataset(d): def dataset_to_hash(d): + # TODO: check for sh1 or sha256? + # https://github.com/GSA/ckanext-datajson/blob/a3bc214fa7585115b9ff911b105884ef209aa416/ckanext/datajson/datajson.py#L279 return hashlib.sha256(json.dumps(d, sort_keys=True).encode("utf-8")).hexdigest() From a4a875ea5f94284e1c87de8c7f72e7ee1563960a Mon Sep 17 00:00:00 2001 From: Reid Hewitt Date: Tue, 5 Mar 2024 13:03:45 -0700 Subject: [PATCH 08/20] disable console capture --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 089f150b..dd43cd7c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -44,7 +44,7 @@ jobs: run: docker-compose up -d - name: Run Pytest - run: set -o pipefail; poetry run pytest --junitxml=pytest.xml --cov=harvester ./tests/unit | tee pytest-coverage.txt + run: set -o pipefail; poetry run pytest -s --junitxml=pytest.xml --cov=harvester ./tests/unit | tee pytest-coverage.txt - name: Report test coverage uses: MishaKav/pytest-coverage-comment@main From ba2dfee5e05a8c69cc1fbf1c7707c1e0e67e9aa6 Mon Sep 17 00:00:00 2001 From: Reid Hewitt Date: Tue, 12 Mar 2024 10:16:27 -0600 Subject: [PATCH 09/20] update to localhost --- .env | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.env b/.env index 70192494..028dbe68 100644 --- a/.env +++ b/.env @@ -8,7 +8,7 @@ S3FILESTORE__SIGNATURE_VERSION=s3v4 MDTRANSLATOR_URL=http://127.0.0.1:3000/translates -DATABASE_SERVER=db +DATABASE_SERVER=localhost DATABASE_PORT=5432 DATABASE_NAME=mydb DATABASE_USER=myuser From 5fa0fcadd074def0475db8d805b916be0a89946e Mon Sep 17 00:00:00 2001 From: Reid Hewitt Date: Tue, 12 Mar 2024 10:16:50 -0600 Subject: [PATCH 10/20] remove db log handler --- harvester/__init__.py | 9 +-------- harvester/logger_config.py | 3 +-- harvester/loggers.py | 21 --------------------- 3 files changed, 2 insertions(+), 31 deletions(-) delete mode 100644 harvester/loggers.py diff --git a/harvester/__init__.py b/harvester/__init__.py index 2d037132..1435f74e 100644 --- a/harvester/__init__.py +++ b/harvester/__init__.py @@ -1,11 +1,4 @@ import logging.config -from .logger_config import LOGGING_CONFIG +from harvester.logger_config import LOGGING_CONFIG -# logging data -# TODO: wire this up! -# db_session = get_sqlachemy_session( -# "postgresql://placeholder-user:placeholder-pass@0.0.0.0:5432/testdb" -# ) -db_session = None # TODO: wire this up! -LOGGING_CONFIG["handlers"]["db"]["session"] = db_session logging.config.dictConfig(LOGGING_CONFIG) diff --git a/harvester/logger_config.py b/harvester/logger_config.py index 070d80b7..219d7575 100644 --- a/harvester/logger_config.py +++ b/harvester/logger_config.py @@ -15,11 +15,10 @@ "class": "logging.StreamHandler", "stream": "ext://sys.stdout", }, - "db": {"level": "ERROR", "class": "harvester.loggers.LogDBHandler"}, }, "loggers": { "harvest_runner": { - "handlers": ["console", "db"], + "handlers": ["console"], "level": "INFO", }, }, diff --git a/harvester/loggers.py b/harvester/loggers.py deleted file mode 100644 index 15a1f5e5..00000000 --- a/harvester/loggers.py +++ /dev/null @@ -1,21 +0,0 @@ -import logging - - -class LogDBHandler(logging.Handler): - def __init__(self, session): - logging.Handler.__init__(self) - self.session = session - - def emit(self, record): - print(record) - - # TODO: wire up storing the harvest_error records somewhere - # harvest_error = HarvestError( - # message=record.__dict__["msg"], - # harvest_job_id=record.__dict__["harvest_job_id"], - # severity=record.__dict__["severity"], - # type=record.__dict__["type"], - # date_created=datetime.now(), - # ) - # self.session.add(harvest_error) - # self.session.commit() From 1ea68d32cfae8d42a7f83318bfc4c3479d9011c9 Mon Sep 17 00:00:00 2001 From: Reid Hewitt Date: Tue, 12 Mar 2024 10:17:22 -0600 Subject: [PATCH 11/20] move fixtures --- tests/conftest.py | 50 +++++++++-- tests/database/test_db.py | 177 ++++++++++++++++---------------------- 2 files changed, 120 insertions(+), 107 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index b1875132..efc7985e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,13 +1,51 @@ from pathlib import Path -from uuid import uuid4 +import os import pytest +from dotenv import load_dotenv + +from sqlalchemy import create_engine, text +from sqlalchemy.orm import sessionmaker, scoped_session +from harvester.database.models import Base +from harvester.database.interface import HarvesterDBInterface from harvester.utils import open_json +load_dotenv() + HARVEST_SOURCES = Path(__file__).parents[0] / "harvest-sources" +@pytest.fixture(scope="session") +def db_session(): + DATABASE_SERVER = os.getenv("DATABASE_SERVER") + DATABASE_URI = os.getenv("DATABASE_URI") + TEST_SCHEMA = "test_schema" + modified_uri = DATABASE_URI.replace("@" + DATABASE_SERVER, "@localhost") + engine = create_engine(modified_uri) + + with engine.connect() as connection: + connection.execute(text(f"CREATE SCHEMA IF NOT EXISTS {TEST_SCHEMA};")) + connection.execute(text(f"SET search_path TO {TEST_SCHEMA};")) + + Base.metadata.create_all(engine) + SessionLocal = sessionmaker(bind=engine) + + session = scoped_session(SessionLocal) + yield session + + session.remove() + engine.dispose() + + with engine.begin() as connection: + connection.execute(text(f"DROP SCHEMA IF EXISTS {TEST_SCHEMA} CASCADE;")) + + +@pytest.fixture(scope="session") +def db_interface(db_session): + return HarvesterDBInterface(db_session) + + @pytest.fixture def bad_url_dcatus_config() -> dict: return { @@ -15,7 +53,7 @@ def bad_url_dcatus_config() -> dict: "_url": "http://localhost/dcatus/bad_url.json", "_extract_type": "datajson", "_owner_org": "example_organization", - "_job_id": str(uuid4()), + "_job_id": "1db556ff-fb02-438b-b7d2-ad914e1f2531", } @@ -27,7 +65,7 @@ def dcatus_config() -> dict: "_url": "http://localhost/dcatus/dcatus.json", "_extract_type": "datajson", "_owner_org": "example_organization", - "_job_id": str(uuid4()), + "_job_id": "1db556ff-fb02-438b-b7d2-ad914e1f2531", } @@ -38,7 +76,7 @@ def invalid_dcatus_config() -> dict: "_url": "http://localhost/dcatus/missing_title.json", "_extract_type": "datajson", "_owner_org": "example_organization", - "_job_id": str(uuid4()), + "_job_id": "1db556ff-fb02-438b-b7d2-ad914e1f2531", } @@ -51,7 +89,7 @@ def waf_config() -> dict: "_extract_type": "waf-collection", "_owner_org": "example_organization", "_waf_config": {"filters": ["../", "dcatus/"]}, - "_job_id": str(uuid4()), + "_job_id": "1db556ff-fb02-438b-b7d2-ad914e1f2531", } @@ -63,7 +101,7 @@ def dcatus_compare_config() -> dict: "_url": "http://localhost/dcatus/dcatus_compare.json", "_extract_type": "datajson", "_owner_org": "example_organization", - "_job_id": str(uuid4()), + "_job_id": "1db556ff-fb02-438b-b7d2-ad914e1f2531", } diff --git a/tests/database/test_db.py b/tests/database/test_db.py index 7c3c1462..b9d15e8b 100644 --- a/tests/database/test_db.py +++ b/tests/database/test_db.py @@ -1,121 +1,96 @@ -import pytest -from sqlalchemy import create_engine, text -from sqlalchemy.orm import sessionmaker, scoped_session -from harvester.database.models import Base -from harvester.database.interface import HarvesterDBInterface -from dotenv import load_dotenv -import os - -load_dotenv() - -@pytest.fixture(scope='session') -def db_session(): - DATABASE_SERVER = os.getenv("DATABASE_SERVER") - DATABASE_URI = os.getenv("DATABASE_URI") - TEST_SCHEMA = "test_schema" - modified_uri = DATABASE_URI.replace('@' + DATABASE_SERVER, '@localhost') - engine = create_engine(modified_uri) - - with engine.connect() as connection: - connection.execute(text(f"CREATE SCHEMA IF NOT EXISTS {TEST_SCHEMA};")) - connection.execute(text(f"SET search_path TO {TEST_SCHEMA};")) - - Base.metadata.create_all(engine) - SessionLocal = sessionmaker(bind=engine) - - session = scoped_session(SessionLocal) - yield session - - session.remove() - engine.dispose() - - with engine.begin() as connection: - connection.execute(text(f"DROP SCHEMA IF EXISTS {TEST_SCHEMA} CASCADE;")) - -@pytest.fixture(scope='session') -def db_interface(db_session): - return HarvesterDBInterface(db_session) - def test_add_harvest_source(db_interface): - source_data = {'name': 'Test Source', - 'organization_id': 'Test Org', - 'frequency': 'daily', - 'url': "http://example.com", - 'schema_type': 'strict', - 'source_type': 'json'} + source_data = { + "name": "Test Source", + "organization_id": "Test Org", + "frequency": "daily", + "url": "http://example.com", + "schema_type": "strict", + "source_type": "json", + } new_source = db_interface.add_harvest_source(source_data) - assert new_source.name == 'Test Source' + assert new_source.name == "Test Source" + def test_add_and_get_harvest_source(db_interface): - new_source = db_interface.add_harvest_source({ - 'name': 'Test Source', - 'notification_emails': ['test@example.com'], - 'organization_id': 'Test Org', - 'frequency': 'daily', - 'url': "http://example.com", - 'schema_type': 'strict', - 'source_type': 'json' - }) - assert new_source.name == 'Test Source' - + new_source = db_interface.add_harvest_source( + { + "name": "Test Source", + "notification_emails": ["test@example.com"], + "organization_id": "Test Org", + "frequency": "daily", + "url": "http://example.com", + "schema_type": "strict", + "source_type": "json", + } + ) + assert new_source.name == "Test Source" + sources = db_interface.get_all_harvest_sources() - assert any(source['name'] == 'Test Source' for source in sources) + assert any(source["name"] == "Test Source" for source in sources) def test_add_harvest_job(db_interface): - new_source = db_interface.add_harvest_source({ - 'name': 'Test Source', - 'notification_emails': ['test@example.com'], - 'organization_id': 'Test Org', - 'frequency': 'daily', - 'url': "http://example.com", - 'schema_type': 'strict', - 'source_type': 'json' - }) - + new_source = db_interface.add_harvest_source( + { + "name": "Test Source", + "notification_emails": ["test@example.com"], + "organization_id": "Test Org", + "frequency": "daily", + "url": "http://example.com", + "schema_type": "strict", + "source_type": "json", + } + ) + job_data = { - 'date_created': '2022-01-01', - 'date_finished': '2022-01-02', - 'records_added': 10, - 'records_updated': 5, - 'records_deleted': 2, - 'records_errored': 1, - 'records_ignored': 0 + "date_created": "2022-01-01", + "date_finished": "2022-01-02", + "records_added": 10, + "records_updated": 5, + "records_deleted": 2, + "records_errored": 1, + "records_ignored": 0, } new_job = db_interface.add_harvest_job(job_data, str(new_source.id)) assert new_job.harvest_source_id == new_source.id -def test_add_harvest_error(db_interface): - new_source = db_interface.add_harvest_source({ - 'name': 'Error Test Source', - 'notification_emails': ['error@example.com'], - 'organization_id': 'Error Org', - 'frequency': 'weekly', - 'url': "http://example.com", - 'schema_type': 'strict', - 'source_type': 'json' - }) - new_job = db_interface.add_harvest_job({ - 'date_created': '2022-01-03', - 'date_finished': '2022-01-04', - 'records_added': 5, - 'records_updated': 3, - 'records_deleted': 1, - 'records_errored': 0, - 'records_ignored': 2 - }, str(new_source.id)) +def test_add_harvest_error(db_interface): + new_source = db_interface.add_harvest_source( + { + "name": "Error Test Source", + "notification_emails": ["error@example.com"], + "organization_id": "Error Org", + "frequency": "weekly", + "url": "http://example.com", + "schema_type": "strict", + "source_type": "json", + } + ) + + new_job = db_interface.add_harvest_job( + { + "date_created": "2022-01-03", + "date_finished": "2022-01-04", + "records_added": 5, + "records_updated": 3, + "records_deleted": 1, + "records_errored": 0, + "records_ignored": 2, + }, + str(new_source.id), + ) error_data = { - 'harvest_job_id': str(new_job.id), - 'record_reported_id': 'test_record', - 'date_created': '2022-01-04', - 'type': 'Test Error', - 'severity': 'high', - 'message': 'This is a test error' + "harvest_job_id": str(new_job.id), + "record_reported_id": "test_record", + "date_created": "2022-01-04", + "type": "Test Error", + "severity": "high", + "message": "This is a test error", } new_error = db_interface.add_harvest_error(error_data, str(new_job.id)) assert new_error.harvest_job_id == new_job.id - assert new_error.type == 'Test Error' - assert new_error.message == 'This is a test error' + assert new_error.type == "Test Error" + assert new_error.message == "This is a test error" From b82612220faf64903989fafe79727870c36063ac Mon Sep 17 00:00:00 2001 From: Reid Hewitt Date: Tue, 12 Mar 2024 10:18:44 -0600 Subject: [PATCH 12/20] add db interface --- harvester/exceptions.py | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/harvester/exceptions.py b/harvester/exceptions.py index 8f44cb39..103250de 100644 --- a/harvester/exceptions.py +++ b/harvester/exceptions.py @@ -1,7 +1,6 @@ import logging - -# logging.getLogger(name) follows a singleton pattern -logger = logging.getLogger("harvest_runner") +from datetime import datetime +from .database.interface import HarvesterDBInterface # critical exceptions @@ -14,15 +13,19 @@ def __init__(self, msg, harvest_job_id): self.severity = "CRITICAL" self.type = "job" - self.extras = { - # m = message. need to avoid conflicting with existing kwargs - "m": self.msg, + self.db_interface = HarvesterDBInterface() + self.logger = logging.getLogger("harvest_runner") + + error_data = { "harvest_job_id": self.harvest_job_id, + "message": self.msg, "severity": self.severity, "type": self.type, + "date_created": datetime.utcnow(), } - logger.critical(self.msg, exc_info=True, extra=self.extras) + self.db_interface.add_harvest_error(error_data, self.harvest_job_id) + self.logger.critical(self.msg, exc_info=True) class ExtractHarvestSourceException(HarvestCriticalException): @@ -39,23 +42,29 @@ class CompareException(HarvestCriticalException): # non-critical exceptions class HarvestNonCriticalException(Exception): - def __init__(self, msg, harvest_job_id): - super().__init__(msg, harvest_job_id) + def __init__(self, msg, harvest_job_id, title): + super().__init__(msg, harvest_job_id, title) + self.title = title self.msg = msg self.harvest_job_id = harvest_job_id self.severity = "ERROR" self.type = "record" - extras = { - # m = message. need to avoid conflicting with existing kwargs - "m": self.msg, + self.db_interface = HarvesterDBInterface() + self.logger = logging.getLogger("harvest_runner") + + error_data = { "harvest_job_id": self.harvest_job_id, + "message": self.msg, "severity": self.severity, "type": self.type, + "date_created": datetime.utcnow(), + "record_id": self.title, } - logger.error(msg, exc_info=True, extra=extras) + self.db_interface.add_harvest_error(error_data, self.harvest_job_id) + self.logger.error(self.msg, exc_info=True) class ValidationException(HarvestNonCriticalException): From 23b42c613102a38ac6db774ca2f09bd887d39605 Mon Sep 17 00:00:00 2001 From: Reid Hewitt Date: Tue, 12 Mar 2024 10:18:50 -0600 Subject: [PATCH 13/20] fix lint --- scripts/smoke-test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/smoke-test.py b/scripts/smoke-test.py index d782c0b7..73a693fe 100755 --- a/scripts/smoke-test.py +++ b/scripts/smoke-test.py @@ -2,14 +2,14 @@ import os import sys +from harvester import HarvestSource, Record sys.path.insert(1, "/".join(os.path.realpath(__file__).split("/")[0:-2])) -from harvester import HarvestSource, Record -harvest_source = HarvestSource('a', 'a', 'a', 'a') +harvest_source = HarvestSource("a", "a", "a", "a") -record = Record('a', 'a') +record = Record("a", "a") print(harvest_source) From 51d56b51a63ce9a7ac735b50fa1fbcc8e5041d77 Mon Sep 17 00:00:00 2001 From: Reid Hewitt Date: Tue, 12 Mar 2024 10:19:35 -0600 Subject: [PATCH 14/20] add records to satisfy FK contraints --- .../unit/exception/test_exception_handling.py | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/tests/unit/exception/test_exception_handling.py b/tests/unit/exception/test_exception_handling.py index d31a998b..86d68c9b 100644 --- a/tests/unit/exception/test_exception_handling.py +++ b/tests/unit/exception/test_exception_handling.py @@ -1,6 +1,8 @@ -import pytest +from datetime import datetime from unittest.mock import patch +import pytest + import harvester from harvester.harvest import HarvestSource from harvester.exceptions import ( @@ -18,6 +20,34 @@ class TestExceptionHandling: + def test_add_harvest_source(self, db_interface): + + harvest_source = { + "id": "9347a852-2498-4bee-b817-90b8e93c9cec", + "name": "harvest_source_test", + "notification_emails": ["admin@example.com"], + "organization_id": "Example Organization", + "frequency": "daily", + "url": "http://example.com", + "schema_type": "strict", + "source_type": "json", + "harvest_source_name": "source name from ckan", + } + + harvest_job = { + "harvest_source_id": "9347a852-2498-4bee-b817-90b8e93c9cec", + "id": "1db556ff-fb02-438b-b7d2-ad914e1f2531", + "date_created": datetime.utcnow(), + "date_finished": datetime.utcnow(), + "records_added": 0, + "records_updated": 0, + "records_deleted": 0, + "records_errored": 0, + "records_ignored": 0, + } + db_interface.add_harvest_source(harvest_source) + db_interface.add_harvest_job(harvest_job, harvest_job["harvest_source_id"]) + def test_bad_harvest_source_url_exception(self, bad_url_dcatus_config): harvest_source = HarvestSource(**bad_url_dcatus_config) From 5eb516eb26d6e9984d3627fe275dc1d449efafb8 Mon Sep 17 00:00:00 2001 From: Reid Hewitt Date: Tue, 12 Mar 2024 10:19:49 -0600 Subject: [PATCH 15/20] update exception args --- harvester/harvest.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/harvester/harvest.py b/harvester/harvest.py index af67e42a..5b79ca9a 100644 --- a/harvester/harvest.py +++ b/harvester/harvest.py @@ -274,7 +274,8 @@ def compare(self) -> None: except Exception as e: # TODO: do something with 'e' raise CompareException( - f"{self.title} {self.url} failed to run compare. exiting.", self.job_id + f"{self.title} {self.url} failed to run compare. exiting.", + self.job_id, ) def get_ckan_records(self, results=[]) -> None: @@ -310,7 +311,6 @@ def get_harvest_records_as_id_hash(self) -> None: self.download_waf(self.traverse_waf(self.url, **self.waf_config)) ) except Exception as e: # ruff: noqa: E841 - # TODO: do something with 'e' raise ExtractHarvestSourceException( f"{self.title} {self.url} failed to extract harvest source. exiting", self.job_id, @@ -657,6 +657,7 @@ def ckanify_dcatus(self) -> None: raise DCATUSToCKANException( f"unable to ckanify dcatus record {self.identifier}", self.harvest_source.job_id, + self.identifier, ) def validate(self) -> None: @@ -671,7 +672,9 @@ def validate(self) -> None: self.valid = False # TODO: do something with 'e' in logger? raise ValidationException( - f"{self.identifier} failed validation", self.harvest_source.job_id + f"{self.identifier} failed validation", + self.harvest_source.job_id, + self.identifier, ) # def transform(self, metadata: dict): @@ -707,6 +710,7 @@ def delete_record(self) -> None: raise SynchronizeException( f"failed to delete {self.identifier}", self.harvest_source.job_id, + self.identifier, ) def sync(self) -> None: @@ -729,6 +733,7 @@ def sync(self) -> None: raise SynchronizeException( f"failed to {self.operation} for {self.identifier}", self.harvest_source.job_id, + self.identifier, ) logger.info( From c90ace15060a83d40e4d6e8e3a9f9fb2548a0439 Mon Sep 17 00:00:00 2001 From: Reid Hewitt Date: Tue, 12 Mar 2024 10:33:29 -0600 Subject: [PATCH 16/20] move dot env call to one place --- harvester/__init__.py | 3 +++ harvester/database/__init__.py | 2 +- harvester/harvest.py | 3 --- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/harvester/__init__.py b/harvester/__init__.py index 1435f74e..99a4defd 100644 --- a/harvester/__init__.py +++ b/harvester/__init__.py @@ -1,4 +1,7 @@ import logging.config from harvester.logger_config import LOGGING_CONFIG +from dotenv import load_dotenv + +load_dotenv() logging.config.dictConfig(LOGGING_CONFIG) diff --git a/harvester/database/__init__.py b/harvester/database/__init__.py index 1499876c..f70900f0 100644 --- a/harvester/database/__init__.py +++ b/harvester/database/__init__.py @@ -1,3 +1,3 @@ import os -DATABASE_URI = os.getenv('DATABASE_URI') +DATABASE_URI = os.getenv("DATABASE_URI") diff --git a/harvester/harvest.py b/harvester/harvest.py index 5b79ca9a..32a158b5 100644 --- a/harvester/harvest.py +++ b/harvester/harvest.py @@ -11,7 +11,6 @@ import ckanapi import requests from bs4 import BeautifulSoup -from dotenv import load_dotenv from jsonschema import Draft202012Validator from .utils import ( @@ -32,8 +31,6 @@ CompareException, ) -load_dotenv() - # requests data session = requests.Session() # TODD: make sure this timeout config doesn't change all requests! From eb62344e73a73a814e4b71414fd887f20a09d4e8 Mon Sep 17 00:00:00 2001 From: Reid Hewitt Date: Tue, 12 Mar 2024 10:39:11 -0600 Subject: [PATCH 17/20] bump pypi --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 28d043fb..33f93ad3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "datagov-harvesting-logic" -version = "0.3.3" +version = "0.3.4" description = "" # authors = [ # {name = "Jin Sun", email = "jin.sun@gsa.gov"}, From 719c58c96722033fe5279f1bd64f64d57f793ad8 Mon Sep 17 00:00:00 2001 From: Jin-Sun-tts Date: Tue, 19 Mar 2024 17:43:58 -0400 Subject: [PATCH 18/20] add some indexes --- .github/workflows/commit.yml | 6 +- .gitignore | 1 + Dockerfile | 5 +- README.md | 16 +- app.py | 91 ---------- docker-compose.yml | 4 + harvester/database/__init__.py | 3 - harvester/database/init_db.py | 14 -- harvester/database/interface.py | 79 --------- harvester/database/models.py | 58 ------ harvester/exceptions.py | 4 +- manifest.yml | 3 +- poetry.lock | 113 +++++++++++- pyproject.toml | 3 + requirements.txt | 9 +- tests/conftest.py | 4 +- tests/database/data.py | 14 +- tests/database/test_db.py | 165 +++++++++--------- .../unit/exception/test_exception_handling.py | 13 +- vars.development.yml | 1 - 20 files changed, 253 insertions(+), 353 deletions(-) delete mode 100644 app.py delete mode 100644 harvester/database/__init__.py delete mode 100644 harvester/database/init_db.py delete mode 100644 harvester/database/interface.py delete mode 100644 harvester/database/models.py diff --git a/.github/workflows/commit.yml b/.github/workflows/commit.yml index 450c74e4..087315cd 100644 --- a/.github/workflows/commit.yml +++ b/.github/workflows/commit.yml @@ -74,7 +74,7 @@ jobs: - name: deploy DHL uses: cloud-gov/cg-cli-tools@main with: - command: cf push datagov-harvesting-logic --vars-file vars.development.yml --strategy rolling --no-wait + command: cf push --vars-file vars.development.yml --strategy rolling --no-wait cf_org: gsa-datagov cf_space: ${{vars.ENVIRONMENT_NAME}} cf_username: ${{secrets.CF_SERVICE_USER}} @@ -82,7 +82,7 @@ jobs: - name: smoke test uses: cloud-gov/cg-cli-tools@main with: - command: cf run-task datagov-harvesting-logic -c "/home/vcap/app/scripts/smoke-test.py" --name smoke-test + command: cf run-task harvesting-logic -c "/home/vcap/app/scripts/smoke-test.py" --name smoke-test cf_org: gsa-datagov cf_space: ${{vars.ENVIRONMENT_NAME}} cf_username: ${{secrets.CF_SERVICE_USER}} @@ -91,7 +91,7 @@ jobs: uses: cloud-gov/cg-cli-tools@main with: command: > - scripts/monitor-cf-logs.sh datagov-harvesting-logic smoke-test + scripts/monitor-cf-logs.sh harvesting-logic smoke-test cf_org: gsa-datagov cf_space: ${{vars.ENVIRONMENT_NAME}} cf_username: ${{secrets.CF_SERVICE_USER}} diff --git a/.gitignore b/.gitignore index a5e2d643..074aac09 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,5 @@ tmp/ # vscode debugger .vscode/ .env +requirements.txt diff --git a/Dockerfile b/Dockerfile index 2c313462..7f3d6ab2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,4 +8,7 @@ RUN pip install --no-cache-dir -r requirements.txt EXPOSE 8080 -CMD ["python", "app.py"] \ No newline at end of file +ENV FLASK_APP=run.py + +# Run run.py when the container launches +CMD ["flask", "run", "--host=0.0.0.0", "--port=8080"] \ No newline at end of file diff --git a/README.md b/README.md index 6767a3fb..ffd6400d 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,14 @@ If you followed the instructions for `CKAN load testing` and `Harvester testing` This will start the necessary services and execute the test. +3. when there are database DDL changes, use following steps to generate migration scripts and update database: + + ```bash + docker-compose db up + docker-compose run app flask db migrate -m "migration description" + docker-compose run app flask db upgrade + ``` + ### Deployment to cloud.gov #### Database Service Setup @@ -143,4 +151,10 @@ Accessing the service can be done with service keys. They can be created with `c ```bash poetry export -f requirements.txt --output requirements.txt --without-hashes cf push --vars-file vars.development.yml - ``` \ No newline at end of file + ``` + +3. when there are database DDL changes, use following to do the database update: + + ```bash + cf run-task harvesting-logic --command "flask db upgrade" --name database-upgrade + ``` \ No newline at end of file diff --git a/app.py b/app.py deleted file mode 100644 index 5580ba4d..00000000 --- a/app.py +++ /dev/null @@ -1,91 +0,0 @@ -from flask import Flask, request -from harvester.database.interface import HarvesterDBInterface -from harvester.database import init_db -from tests.database.data import new_source, new_job, new_error - -app = Flask(__name__) -db = HarvesterDBInterface() - -@app.route('/', methods=['GET']) -def index(): - html = "" + init_db.create_tables() + "" - html += "
    " - for rule in app.url_map.iter_rules(): - if 'static' not in rule.endpoint and 'index' not in rule.endpoint: - html += (f"
  • {rule.endpoint} : " - f"{rule.rule}

  • ") - html += "
" - return html - -@app.route('/add_source', methods=['GET']) -def add_harvest_source(): - source=db.add_harvest_source(new_source) - return(f"Added new source with ID: {source.id}") - -@app.route('/add_job', methods=['GET']) -def add_harvest_job(): - source_id = request.args.get('source_id', None) - if source_id is None: - return 'Please provide source_id: /add_job?source_id=xxx' - else: - job=db.add_harvest_job(new_job, source_id) - return(f"Added new job with ID: {job.id}") - -@app.route('/add_error', methods=['GET']) -def add_harvest_error(): - job_id = request.args.get('job_id', None) - if job_id is None: - return 'Please provide job_id: /add_error?job_id=xxx' - else: - err=db.add_harvest_error(new_error, job_id) - return(f"Added new error with ID: {err.id}") - -@app.route('/harvest_sources', methods=['GET']) -def get_all_harvest_sources(): - result = db.get_all_harvest_sources() - return result - -@app.route('/harvest_jobs', methods=['GET']) -def get_all_harvest_jobs(): - result = db.get_all_harvest_jobs() - return result - -@app.route('/harvest_errors_by_job/', methods=['GET']) -def get_all_harvest_errors_by_job(job_id): - try: - result = db.get_all_harvest_errors_by_job(job_id) - return result - except Exception: - return " provide job_id" - - -@app.route('/harvest_source/', methods=['GET']) -def get_harvest_source(source_id): - try: - result = db.get_harvest_source(source_id) - return result - except Exception: - return " provide source_id" - -@app.route('/harvest_job/', methods=['GET']) -def get_harvest_job(job_id): - try: - result = db.get_harvest_job(job_id) - return result - except Exception: - return "provide job_id" - -@app.route('/harvest_error/', methods=['GET']) -def get_harvest_error(error_id): - try: - result = db.get_harvest_error(error_id) - return result - except Exception: - return "provide error_id" - -@app.teardown_appcontext -def shutdown_session(exception=None): - db.close() - -if __name__ == '__main__': - app.run(host="0.0.0.0", port=8080) diff --git a/docker-compose.yml b/docker-compose.yml index 1ede7a6b..ba57f9c2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -54,10 +54,14 @@ services: build: . depends_on: - db + volumes: + - .:/app environment: DATABASE_URI: ${DATABASE_URI} + FLASK_APP: run.py ports: - "8080:8080" + command: flask run --host=0.0.0.0 --port=8080 volumes: postgres_data: \ No newline at end of file diff --git a/harvester/database/__init__.py b/harvester/database/__init__.py deleted file mode 100644 index f70900f0..00000000 --- a/harvester/database/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -import os - -DATABASE_URI = os.getenv("DATABASE_URI") diff --git a/harvester/database/init_db.py b/harvester/database/init_db.py deleted file mode 100644 index 5b93ad5e..00000000 --- a/harvester/database/init_db.py +++ /dev/null @@ -1,14 +0,0 @@ -from sqlalchemy import create_engine -from harvester.database.models import Base -from sqlalchemy.engine.reflection import Inspector -from . import DATABASE_URI - -def create_tables(): - engine = create_engine(DATABASE_URI) - Base.metadata.create_all(engine) - inspector = Inspector.from_engine(engine) - table_names = inspector.get_table_names() - return (f"Database tables : {table_names}") - - - diff --git a/harvester/database/interface.py b/harvester/database/interface.py deleted file mode 100644 index a7eb0b4f..00000000 --- a/harvester/database/interface.py +++ /dev/null @@ -1,79 +0,0 @@ -from sqlalchemy import create_engine, inspect -from sqlalchemy.orm import sessionmaker, scoped_session -from harvester.database.models import HarvestSource, HarvestJob, HarvestError -from . import DATABASE_URI - -class HarvesterDBInterface: - def __init__(self, session=None): - if session is None: - engine = create_engine(DATABASE_URI) - session_factory = sessionmaker(bind=engine, - autocommit=False, - autoflush=False) - self.db = scoped_session(session_factory) - else: - self.db = session - - @staticmethod - def _to_dict(obj): - return {c.key: getattr(obj, c.key) - for c in inspect(obj).mapper.column_attrs} - - def add_harvest_source(self, source_data): - new_source = HarvestSource(**source_data) - self.db.add(new_source) - self.db.commit() - self.db.refresh(new_source) - return new_source - - def get_all_harvest_sources(self): - harvest_sources = self.db.query(HarvestSource).all() - harvest_sources_data = [ - HarvesterDBInterface._to_dict(source) for source in harvest_sources] - return harvest_sources_data - - def get_harvest_source(self, source_id): - result = self.db.query(HarvestSource).filter_by(id=source_id).first() - return HarvesterDBInterface._to_dict(result) - - def add_harvest_job(self, job_data, source_id): - job_data['harvest_source_id'] = source_id - new_job = HarvestJob(**job_data) - self.db.add(new_job) - self.db.commit() - self.db.refresh(new_job) - return new_job - - def get_all_harvest_jobs(self): - harvest_jobs = self.db.query(HarvestJob).all() - harvest_jobs_data = [ - HarvesterDBInterface._to_dict(job) for job in harvest_jobs] - return harvest_jobs_data - - def get_harvest_job(self, job_id): - result = self.db.query(HarvestJob).filter_by(id=job_id).first() - return HarvesterDBInterface._to_dict(result) - - def add_harvest_error(self, error_data, job_id): - error_data['harvest_job_id'] = job_id - new_error = HarvestError(**error_data) - self.db.add(new_error) - self.db.commit() - self.db.refresh(new_error) - return new_error - - def get_all_harvest_errors_by_job(self, job_id): - harvest_errors = self.db.query(HarvestError).filter_by(harvest_job_id=job_id) - harvest_errors_data = [ - HarvesterDBInterface._to_dict(err) for err in harvest_errors] - return harvest_errors_data - - def get_harvest_error(self, error_id): - result = self.db.query(HarvestError).filter_by(id=error_id).first() - return HarvesterDBInterface._to_dict(result) - - def close(self): - if hasattr(self.db, 'remove'): - self.db.remove() - elif hasattr(self.db, 'close'): - self.db.close() \ No newline at end of file diff --git a/harvester/database/models.py b/harvester/database/models.py deleted file mode 100644 index 1d775e6e..00000000 --- a/harvester/database/models.py +++ /dev/null @@ -1,58 +0,0 @@ -from sqlalchemy import text, String, Integer, ARRAY, ForeignKey, DateTime -from sqlalchemy.orm import DeclarativeBase, relationship, mapped_column -from sqlalchemy.dialects.postgresql import UUID - - -class Base(DeclarativeBase): - id = mapped_column( - UUID(as_uuid=True), primary_key=True, server_default=text("gen_random_uuid()") - ) - -class HarvestSource(Base): - __tablename__ = 'harvest_source' - - name = mapped_column(String, nullable=False) - notification_emails = mapped_column(ARRAY(String)) - organization_id = mapped_column(String, nullable=False) - frequency = mapped_column(String, nullable=False) - url = mapped_column(String, nullable=False) - schema_type = mapped_column(String, nullable=False) - source_type = mapped_column(String, nullable=False) - harvest_source_id = mapped_column(String) - harvest_source_name = mapped_column(String) - -class HarvestJob(Base): - __tablename__ = 'harvest_job' - - harvest_source_id = mapped_column(UUID(as_uuid=True), - ForeignKey('harvest_source.id'), - nullable=False) - date_created = mapped_column(DateTime) - date_finished = mapped_column(DateTime) - records_added = mapped_column(Integer) - records_updated = mapped_column(Integer) - records_deleted = mapped_column(Integer) - records_errored = mapped_column(Integer) - records_ignored = mapped_column(Integer) - - source = relationship("HarvestSource", back_populates="jobs") - -class HarvestError(Base): - __tablename__ = 'harvest_error' - - harvest_job_id = mapped_column(UUID(as_uuid=True), - ForeignKey('harvest_job.id'), - nullable=False) - record_id = mapped_column(String, nullable=True) - record_reported_id = mapped_column(String) - date_created = mapped_column(DateTime) - type = mapped_column(String) - severity = mapped_column(String) - message = mapped_column(String) - - job = relationship("HarvestJob", back_populates="errors") - -HarvestSource.jobs = relationship( - "HarvestJob", order_by=HarvestJob.id,back_populates="source") -HarvestJob.errors = relationship( - "HarvestError", order_by=HarvestError.id, back_populates="job") diff --git a/harvester/exceptions.py b/harvester/exceptions.py index 103250de..2fccd18d 100644 --- a/harvester/exceptions.py +++ b/harvester/exceptions.py @@ -1,6 +1,6 @@ import logging from datetime import datetime -from .database.interface import HarvesterDBInterface +from app.interface import HarvesterDBInterface # critical exceptions @@ -60,7 +60,7 @@ def __init__(self, msg, harvest_job_id, title): "severity": self.severity, "type": self.type, "date_created": datetime.utcnow(), - "record_id": self.title, + "harvest_record_id": self.title # to-do } self.db_interface.add_harvest_error(error_data, self.harvest_job_id) diff --git a/manifest.yml b/manifest.yml index 0bf990ab..a40107e2 100644 --- a/manifest.yml +++ b/manifest.yml @@ -10,4 +10,5 @@ applications: instances: 1 env: DATABASE_NAME: ((database_name)) - command: python app.py + FLASK_APP: run.py + command: flask run --host=0.0.0.0 --port=8080 diff --git a/poetry.lock b/poetry.lock index d77068df..a7a7f8c6 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,5 +1,25 @@ # This file is automatically @generated by Poetry 1.4.2 and should not be changed by hand. +[[package]] +name = "alembic" +version = "1.13.1" +description = "A database migration tool for SQLAlchemy." +category = "main" +optional = false +python-versions = ">=3.8" +files = [ + {file = "alembic-1.13.1-py3-none-any.whl", hash = "sha256:2edcc97bed0bd3272611ce3a98d98279e9c209e7186e43e75bbb1b2bdfdbcc43"}, + {file = "alembic-1.13.1.tar.gz", hash = "sha256:4932c8558bf68f2ee92b9bbcb8218671c627064d5b08939437af6d77dc05e595"}, +] + +[package.dependencies] +Mako = "*" +SQLAlchemy = ">=1.3.0" +typing-extensions = ">=4" + +[package.extras] +tz = ["backports.zoneinfo"] + [[package]] name = "attrs" version = "23.2.0" @@ -388,6 +408,59 @@ Werkzeug = ">=3.0.0" async = ["asgiref (>=3.2)"] dotenv = ["python-dotenv"] +[[package]] +name = "flask-migrate" +version = "4.0.7" +description = "SQLAlchemy database migrations for Flask applications using Alembic." +category = "main" +optional = false +python-versions = ">=3.6" +files = [ + {file = "Flask-Migrate-4.0.7.tar.gz", hash = "sha256:dff7dd25113c210b069af280ea713b883f3840c1e3455274745d7355778c8622"}, + {file = "Flask_Migrate-4.0.7-py3-none-any.whl", hash = "sha256:5c532be17e7b43a223b7500d620edae33795df27c75811ddf32560f7d48ec617"}, +] + +[package.dependencies] +alembic = ">=1.9.0" +Flask = ">=0.9" +Flask-SQLAlchemy = ">=1.0" + +[[package]] +name = "flask-sqlalchemy" +version = "3.1.1" +description = "Add SQLAlchemy support to your Flask application." +category = "main" +optional = false +python-versions = ">=3.8" +files = [ + {file = "flask_sqlalchemy-3.1.1-py3-none-any.whl", hash = "sha256:4ba4be7f419dc72f4efd8802d69974803c37259dd42f3913b0dcf75c9447e0a0"}, + {file = "flask_sqlalchemy-3.1.1.tar.gz", hash = "sha256:e4b68bb881802dda1a7d878b2fc84c06d1ee57fb40b874d3dc97dabfa36b8312"}, +] + +[package.dependencies] +flask = ">=2.2.5" +sqlalchemy = ">=2.0.16" + +[[package]] +name = "flask-wtf" +version = "1.2.1" +description = "Form rendering, validation, and CSRF protection for Flask with WTForms." +category = "main" +optional = false +python-versions = ">=3.8" +files = [ + {file = "flask_wtf-1.2.1-py3-none-any.whl", hash = "sha256:fa6793f2fb7e812e0fe9743b282118e581fb1b6c45d414b8af05e659bd653287"}, + {file = "flask_wtf-1.2.1.tar.gz", hash = "sha256:8bb269eb9bb46b87e7c8233d7e7debdf1f8b74bf90cc1789988c29b37a97b695"}, +] + +[package.dependencies] +flask = "*" +itsdangerous = "*" +wtforms = "*" + +[package.extras] +email = ["email-validator"] + [[package]] name = "greenlet" version = "3.0.3" @@ -563,6 +636,26 @@ files = [ [package.dependencies] referencing = ">=0.31.0" +[[package]] +name = "mako" +version = "1.3.2" +description = "A super-fast templating language that borrows the best ideas from the existing templating languages." +category = "main" +optional = false +python-versions = ">=3.8" +files = [ + {file = "Mako-1.3.2-py3-none-any.whl", hash = "sha256:32a99d70754dfce237019d17ffe4a282d2d3351b9c476e90d8a60e63f133b80c"}, + {file = "Mako-1.3.2.tar.gz", hash = "sha256:2a0c8ad7f6274271b3bb7467dd37cf9cc6dab4bc19cb69a4ef10669402de698e"}, +] + +[package.dependencies] +MarkupSafe = ">=0.9.2" + +[package.extras] +babel = ["Babel"] +lingua = ["lingua"] +testing = ["pytest"] + [[package]] name = "markupsafe" version = "2.1.5" @@ -1361,7 +1454,25 @@ MarkupSafe = ">=2.1.1" [package.extras] watchdog = ["watchdog (>=2.3)"] +[[package]] +name = "wtforms" +version = "3.1.2" +description = "Form validation and rendering for Python web development." +category = "main" +optional = false +python-versions = ">=3.8" +files = [ + {file = "wtforms-3.1.2-py3-none-any.whl", hash = "sha256:bf831c042829c8cdbad74c27575098d541d039b1faa74c771545ecac916f2c07"}, + {file = "wtforms-3.1.2.tar.gz", hash = "sha256:f8d76180d7239c94c6322f7990ae1216dae3659b7aa1cee94b6318bdffb474b9"}, +] + +[package.dependencies] +markupsafe = "*" + +[package.extras] +email = ["email-validator"] + [metadata] lock-version = "2.0" python-versions = ">=3.10" -content-hash = "b07530d8a96c663721dca3c45b225e7b5e612e7a20dd9c5b3af9587856554096" +content-hash = "455443e5c7bb03de7b2d454997bb6c971f6e6f74397226c2808e5a09e521ce89" diff --git a/pyproject.toml b/pyproject.toml index 33f93ad3..213f8a7f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,9 @@ boto3 = "^1.34.29" sqlalchemy = "^2.0.25" flask = "^3.0.2" psycopg2-binary = "^2.9.9" +flask-sqlalchemy = "^3.1.1" +flask-wtf = "^1.2.1" +flask-migrate = "^4.0.7" [tool.poetry.group.dev.dependencies] pytest = "^7.3.0" diff --git a/requirements.txt b/requirements.txt index 9c414ba7..ebc69b25 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,8 @@ # poetry not supported by cloud.gov, use this file instead for manually push -# run this command to generate the file: -# poetry export -f requirements.txt --output requirements.txt --without-hashes \ No newline at end of file + +flask-migrate==4.0.7 +flask-sqlalchemy==3.1.1 +flask==3.0.2 +psycopg2-binary==2.9.9 +pytest==7.4.4 +python-dotenv==1.0.1 diff --git a/tests/conftest.py b/tests/conftest.py index efc7985e..1e0dbf1b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,8 +6,8 @@ from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker, scoped_session -from harvester.database.models import Base -from harvester.database.interface import HarvesterDBInterface +from app.models import Base +from app.interface import HarvesterDBInterface from harvester.utils import open_json diff --git a/tests/database/data.py b/tests/database/data.py index d32f3b5c..7e7d7fe9 100644 --- a/tests/database/data.py +++ b/tests/database/data.py @@ -1,18 +1,21 @@ from datetime import datetime +new_org = { + 'name': 'GSA', + 'logo' : 'url for the logo' +} + new_source = { 'name': 'Example Harvest Source', 'notification_emails': ['admin@example.com'], - 'organization_id': 'Example Organization', 'frequency': 'daily', 'url': "http://example.com", 'schema_type': 'strict', - 'source_type': 'json', - 'harvest_source_id': '8d15bfa3-2b48-4166-bce0-631a1e336ae4', - 'harvest_source_name': 'source name from ckan' + 'source_type': 'json' } new_job = { + "status": "in_progress", "date_created": datetime.utcnow(), "date_finished": datetime.utcnow(), "records_added": 100, @@ -23,8 +26,7 @@ } new_error = { - "record_id": "record123", - "record_reported_id": "recordXYZ", + "harvest_record_id": "record123", "date_created": datetime.utcnow(), "type": "Validation Error", "severity": "ERROR", diff --git a/tests/database/test_db.py b/tests/database/test_db.py index b9d15e8b..a150bc4f 100644 --- a/tests/database/test_db.py +++ b/tests/database/test_db.py @@ -1,96 +1,91 @@ -def test_add_harvest_source(db_interface): - source_data = { - "name": "Test Source", - "organization_id": "Test Org", - "frequency": "daily", - "url": "http://example.com", - "schema_type": "strict", - "source_type": "json", - } - new_source = db_interface.add_harvest_source(source_data) - assert new_source.name == "Test Source" +import pytest +from sqlalchemy import create_engine, text +from sqlalchemy.orm import sessionmaker, scoped_session +from app.models import Base +from app.interface import HarvesterDBInterface +from dotenv import load_dotenv +import os +load_dotenv() -def test_add_and_get_harvest_source(db_interface): - new_source = db_interface.add_harvest_source( - { - "name": "Test Source", - "notification_emails": ["test@example.com"], - "organization_id": "Test Org", - "frequency": "daily", - "url": "http://example.com", - "schema_type": "strict", - "source_type": "json", - } - ) - assert new_source.name == "Test Source" +@pytest.fixture(scope='session') +def db_session(): + DATABASE_SERVER = os.getenv("DATABASE_SERVER") + DATABASE_URI = os.getenv("DATABASE_URI") + TEST_SCHEMA = "test_schema" + modified_uri = DATABASE_URI.replace('@' + DATABASE_SERVER, '@localhost') + engine = create_engine(modified_uri) - sources = db_interface.get_all_harvest_sources() - assert any(source["name"] == "Test Source" for source in sources) + with engine.connect() as connection: + connection.execute(text(f"CREATE SCHEMA IF NOT EXISTS {TEST_SCHEMA};")) + connection.execute(text(f"SET search_path TO {TEST_SCHEMA};")) + Base.metadata.create_all(engine) + SessionLocal = sessionmaker(bind=engine) -def test_add_harvest_job(db_interface): - new_source = db_interface.add_harvest_source( - { - "name": "Test Source", - "notification_emails": ["test@example.com"], - "organization_id": "Test Org", - "frequency": "daily", - "url": "http://example.com", - "schema_type": "strict", - "source_type": "json", - } - ) + session = scoped_session(SessionLocal) + yield session - job_data = { - "date_created": "2022-01-01", - "date_finished": "2022-01-02", - "records_added": 10, - "records_updated": 5, - "records_deleted": 2, - "records_errored": 1, - "records_ignored": 0, - } - new_job = db_interface.add_harvest_job(job_data, str(new_source.id)) - assert new_job.harvest_source_id == new_source.id + session.remove() + with engine.begin() as connection: + connection.execute(text(f"DROP SCHEMA IF EXISTS {TEST_SCHEMA} CASCADE;")) -def test_add_harvest_error(db_interface): - new_source = db_interface.add_harvest_source( - { - "name": "Error Test Source", - "notification_emails": ["error@example.com"], - "organization_id": "Error Org", - "frequency": "weekly", - "url": "http://example.com", - "schema_type": "strict", - "source_type": "json", - } - ) + engine.dispose() - new_job = db_interface.add_harvest_job( - { - "date_created": "2022-01-03", - "date_finished": "2022-01-04", - "records_added": 5, - "records_updated": 3, - "records_deleted": 1, - "records_errored": 0, - "records_ignored": 2, - }, - str(new_source.id), - ) +@pytest.fixture(scope='session') +def db_interface(db_session): + return HarvesterDBInterface(db_session) + +def test_add_harvest_source(db_interface): + org_data = db_interface.add_organization({'name': 'GSA', + 'logo': 'url for the logo'}) + source_data = {'name': 'Test Source', + 'frequency': 'daily', + 'url': "http://example-1.com", + 'schema_type': 'strict', + 'source_type': 'json'} + new_source = db_interface.add_harvest_source(source_data, str(org_data.id)) + assert new_source.name == 'Test Source' + +def test_add_and_get_harvest_source(db_interface): + org_data = db_interface.add_organization({'name': 'GSA', + 'logo': 'url for the logo'}) + new_source = db_interface.add_harvest_source({ + 'name': 'Test Source', + 'notification_emails': ['test@example.com'], + 'frequency': 'daily', + 'url': "http://example-2.com", + 'schema_type': 'strict', + 'source_type': 'json' + }, str(org_data.id)) + assert new_source.name == 'Test Source' + + sources = db_interface.get_all_harvest_sources() + assert any(source['name'] == 'Test Source' for source in sources) - error_data = { - "harvest_job_id": str(new_job.id), - "record_reported_id": "test_record", - "date_created": "2022-01-04", - "type": "Test Error", - "severity": "high", - "message": "This is a test error", - } - new_error = db_interface.add_harvest_error(error_data, str(new_job.id)) - assert new_error.harvest_job_id == new_job.id - assert new_error.type == "Test Error" - assert new_error.message == "This is a test error" +def test_add_harvest_job(db_interface): + org_data = db_interface.add_organization({'name': 'GSA', + 'logo': 'url for the logo'}) + new_source = db_interface.add_harvest_source({ + 'name': 'Test Source', + 'notification_emails': ['test@example.com'], + 'frequency': 'daily', + 'url': "http://example-3.com", + 'schema_type': 'strict', + 'source_type': 'json' + }, str(org_data.id)) + + job_data = { + 'status': 'in_progress', + 'date_created': '2022-01-01', + 'date_finished': '2022-01-02', + 'records_added': 10, + 'records_updated': 5, + 'records_deleted': 2, + 'records_errored': 1, + 'records_ignored': 0 + } + new_job = db_interface.add_harvest_job(job_data, str(new_source.id)) + assert new_job.harvest_source_id == new_source.id diff --git a/tests/unit/exception/test_exception_handling.py b/tests/unit/exception/test_exception_handling.py index 86d68c9b..73e361df 100644 --- a/tests/unit/exception/test_exception_handling.py +++ b/tests/unit/exception/test_exception_handling.py @@ -21,22 +21,27 @@ class TestExceptionHandling: def test_add_harvest_source(self, db_interface): + organization = { + "id": "919bfb9e-89eb-4032-9abf-eee54be5a00c", + "logo": "url for the logo", + "name": "GSA" + } harvest_source = { "id": "9347a852-2498-4bee-b817-90b8e93c9cec", "name": "harvest_source_test", "notification_emails": ["admin@example.com"], - "organization_id": "Example Organization", + "organization_id": "919bfb9e-89eb-4032-9abf-eee54be5a00c", "frequency": "daily", "url": "http://example.com", "schema_type": "strict", "source_type": "json", - "harvest_source_name": "source name from ckan", } harvest_job = { "harvest_source_id": "9347a852-2498-4bee-b817-90b8e93c9cec", "id": "1db556ff-fb02-438b-b7d2-ad914e1f2531", + "status": "in_progress", "date_created": datetime.utcnow(), "date_finished": datetime.utcnow(), "records_added": 0, @@ -45,7 +50,9 @@ def test_add_harvest_source(self, db_interface): "records_errored": 0, "records_ignored": 0, } - db_interface.add_harvest_source(harvest_source) + db_interface.add_organization(organization) + db_interface.add_harvest_source(harvest_source, + harvest_source["organization_id"]) db_interface.add_harvest_job(harvest_job, harvest_job["harvest_source_id"]) def test_bad_harvest_source_url_exception(self, bad_url_dcatus_config): diff --git a/vars.development.yml b/vars.development.yml index 4bcd9c69..74f476ec 100644 --- a/vars.development.yml +++ b/vars.development.yml @@ -1,4 +1,3 @@ app_name: harvesting-logic database_name: harvesting-logic-db - route-external: harvester-dev-datagov.app.cloud.gov \ No newline at end of file From b67dd4aafd924cc758fc1b2b63c41062c2085e26 Mon Sep 17 00:00:00 2001 From: Jin-Sun-tts Date: Tue, 19 Mar 2024 18:59:27 -0400 Subject: [PATCH 19/20] fix the pytest --- app/__init__.py | 22 ++++ app/flask-app-structure.txt | 22 ++++ app/interface.py | 93 ++++++++++++++ app/models.py | 82 +++++++++++++ app/routes.py | 92 ++++++++++++++ app/static/styles.css | 0 app/templates/index.html | 38 ++++++ migrations/README | 1 + migrations/alembic.ini | 50 ++++++++ migrations/env.py | 113 ++++++++++++++++++ migrations/script.py.mako | 24 ++++ .../versions/701baacbc2f2_base_models.py | 113 ++++++++++++++++++ run.py | 6 + 13 files changed, 656 insertions(+) create mode 100644 app/__init__.py create mode 100644 app/flask-app-structure.txt create mode 100644 app/interface.py create mode 100644 app/models.py create mode 100644 app/routes.py create mode 100644 app/static/styles.css create mode 100644 app/templates/index.html create mode 100644 migrations/README create mode 100644 migrations/alembic.ini create mode 100644 migrations/env.py create mode 100644 migrations/script.py.mako create mode 100644 migrations/versions/701baacbc2f2_base_models.py create mode 100644 run.py diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 00000000..fe306ee1 --- /dev/null +++ b/app/__init__.py @@ -0,0 +1,22 @@ +from flask import Flask +from .models import db +from flask_migrate import Migrate +import os +from dotenv import load_dotenv + +load_dotenv() + +DATABASE_URI = os.getenv('DATABASE_URI') + +def create_app(): + app = Flask(__name__) + app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URI + db.init_app(app) + + # Initialize Flask-Migrate + Migrate(app, db) + + from .routes import register_routes + register_routes(app) + + return app \ No newline at end of file diff --git a/app/flask-app-structure.txt b/app/flask-app-structure.txt new file mode 100644 index 00000000..26f38763 --- /dev/null +++ b/app/flask-app-structure.txt @@ -0,0 +1,22 @@ +DATAGOV-HARVESTING-LOGIC +├── app/ +│ ├── __init__.py +│ ├── models.py +│ ├── routes.py +│ ├── forms.py (to-do) +│ └── templates/ +│ ├── index.html +│ ├── harvest_source_form.html (to-do) +│ └── xxx.html (to-do) +│ └── static/ +│ └── styles.css (to-do) +│ +├── migrations/ +│ └── versions/ +│ ├── alembic.ini +│ ├── env.py +│ └── script.py.mako +│ +├── docker-compose.yml +├── Dockerfile +└── run.py diff --git a/app/interface.py b/app/interface.py new file mode 100644 index 00000000..ed3f91a8 --- /dev/null +++ b/app/interface.py @@ -0,0 +1,93 @@ +from sqlalchemy import create_engine, inspect +from sqlalchemy.orm import sessionmaker, scoped_session +from app.models import Organization, HarvestSource, HarvestJob, HarvestError +from . import DATABASE_URI + +class HarvesterDBInterface: + def __init__(self, session=None): + if session is None: + engine = create_engine(DATABASE_URI) + session_factory = sessionmaker(bind=engine, + autocommit=False, + autoflush=False) + self.db = scoped_session(session_factory) + else: + self.db = session + + @staticmethod + def _to_dict(obj): + return {c.key: getattr(obj, c.key) + for c in inspect(obj).mapper.column_attrs} + + def add_organization(self, org_data): + new_org = Organization(**org_data) + self.db.add(new_org) + self.db.commit() + self.db.refresh(new_org) + return new_org + + def add_harvest_source(self, source_data, org_id): + source_data['organization_id'] = org_id + new_source = HarvestSource(**source_data) + self.db.add(new_source) + self.db.commit() + self.db.refresh(new_source) + return new_source + + def get_all_organizations(self): + orgs = self.db.query(Organization).all() + orgs_data = [ + HarvesterDBInterface._to_dict(org) for org in orgs] + return orgs_data + + def get_all_harvest_sources(self): + harvest_sources = self.db.query(HarvestSource).all() + harvest_sources_data = [ + HarvesterDBInterface._to_dict(source) for source in harvest_sources] + return harvest_sources_data + + def get_harvest_source(self, source_id): + result = self.db.query(HarvestSource).filter_by(id=source_id).first() + return HarvesterDBInterface._to_dict(result) + + def add_harvest_job(self, job_data, source_id): + job_data['harvest_source_id'] = source_id + new_job = HarvestJob(**job_data) + self.db.add(new_job) + self.db.commit() + self.db.refresh(new_job) + return new_job + + def get_all_harvest_jobs(self): + harvest_jobs = self.db.query(HarvestJob).all() + harvest_jobs_data = [ + HarvesterDBInterface._to_dict(job) for job in harvest_jobs] + return harvest_jobs_data + + def get_harvest_job(self, job_id): + result = self.db.query(HarvestJob).filter_by(id=job_id).first() + return HarvesterDBInterface._to_dict(result) + + def add_harvest_error(self, error_data, job_id): + error_data['harvest_job_id'] = job_id + new_error = HarvestError(**error_data) + self.db.add(new_error) + self.db.commit() + self.db.refresh(new_error) + return new_error + + def get_all_harvest_errors_by_job(self, job_id): + harvest_errors = self.db.query(HarvestError).filter_by(harvest_job_id=job_id) + harvest_errors_data = [ + HarvesterDBInterface._to_dict(err) for err in harvest_errors] + return harvest_errors_data + + def get_harvest_error(self, error_id): + result = self.db.query(HarvestError).filter_by(id=error_id).first() + return HarvesterDBInterface._to_dict(result) + + def close(self): + if hasattr(self.db, 'remove'): + self.db.remove() + elif hasattr(self.db, 'close'): + self.db.close() \ No newline at end of file diff --git a/app/models.py b/app/models.py new file mode 100644 index 00000000..d035b929 --- /dev/null +++ b/app/models.py @@ -0,0 +1,82 @@ +from flask_sqlalchemy import SQLAlchemy +from sqlalchemy.dialects.postgresql import UUID, ARRAY +from sqlalchemy.sql import text +from sqlalchemy import Enum +from sqlalchemy.schema import Index + +db = SQLAlchemy() + +class Base(db.Model): + __abstract__ = True # Indicates that this class should not be created as a table + id = db.Column(UUID(as_uuid=True), primary_key=True, + server_default=text("gen_random_uuid()")) + +class Organization(Base): + __tablename__ = 'organization' + + name = db.Column(db.String(), nullable=False, index=True) + logo = db.Column(db.String()) + +class HarvestSource(Base): + __tablename__ = 'harvest_source' + + name = db.Column(db.String, nullable=False) + notification_emails = db.Column(ARRAY(db.String)) + organization_id = db.Column(UUID(as_uuid=True), + db.ForeignKey('organization.id'), + nullable=False) + frequency = db.Column(db.String, nullable=False) + url = db.Column(db.String, nullable=False, unique=True) + schema_type = db.Column(db.String, nullable=False) + source_type = db.Column(db.String, nullable=False) + jobs = db.relationship('HarvestJob', backref='source') + +class HarvestJob(Base): + __tablename__ = 'harvest_job' + + harvest_source_id = db.Column(UUID(as_uuid=True), + db.ForeignKey('harvest_source.id'), + nullable=False) + status = db.Column(Enum('new', 'in_progress', 'complete', name='job_status'), + nullable=False, + index=True) + date_created = db.Column(db.DateTime, index=True) + date_finished = db.Column(db.DateTime) + records_added = db.Column(db.Integer) + records_updated = db.Column(db.Integer) + records_deleted = db.Column(db.Integer) + records_errored = db.Column(db.Integer) + records_ignored = db.Column(db.Integer) + errors = db.relationship('HarvestError', backref='job', lazy=True) + +class HarvestError(Base): + __tablename__ = 'harvest_error' + + harvest_job_id = db.Column(UUID(as_uuid=True), + db.ForeignKey('harvest_job.id'), + nullable=False) + harvest_record_id = db.Column(db.String) + # to-do + # harvest_record_id = db.Column(UUID(as_uuid=True), + # db.ForeignKey('harvest_record.id'), + # nullable=True) + date_created = db.Column(db.DateTime) + type = db.Column(db.String) + severity = db.Column(Enum('CRITICAL', 'ERROR', 'WARN', name='error_serverity'), + nullable=False, + index=True) + message = db.Column(db.String) + +class HarvestRecord(Base): + __tablename__ = 'harvest_record' + + job_id = db.Column(UUID(as_uuid=True), + db.ForeignKey('harvest_job.id'), + nullable=False) + identifier = db.Column(db.String(), nullable=False) + ckan_id = db.Column(db.String(), nullable=False, index=True) + type = db.Column(db.String(), nullable=False) + source_metadata = db.Column(db.String(), nullable=True) + __table_args__ = ( + Index('ix_job_id_identifier', 'job_id', 'identifier'), + ) \ No newline at end of file diff --git a/app/routes.py b/app/routes.py new file mode 100644 index 00000000..648e505c --- /dev/null +++ b/app/routes.py @@ -0,0 +1,92 @@ +from flask import Blueprint, request, render_template +from .interface import HarvesterDBInterface +from tests.database.data import new_org, new_source, new_job, new_error + +mod = Blueprint('harvest', __name__) +db = HarvesterDBInterface() + +@mod.route('/', methods=['GET']) +def index(): + return render_template('index.html') + +@mod.route('/add_org', methods=['POST', 'GET']) +def add_organization(): + org=db.add_organization(new_org) + return(f"Added new organization with ID: {org.id}") + +@mod.route('/add_source', methods=['POST', 'GET']) +def add_harvest_source(): + org_id = request.args.get('org_id', None) + if org_id is None: + return 'Please provide org_id: /add_source?org_id=xxx' + else: + source=db.add_harvest_source(new_source, org_id) + return(f"Added new source with ID: {source.id}") + +@mod.route('/add_job', methods=['POST', 'GET']) +def add_harvest_job(): + source_id = request.args.get('source_id', None) + if source_id is None: + return 'Please provide source_id: /add_job?source_id=xxx' + else: + job=db.add_harvest_job(new_job, source_id) + return(f"Added new job with ID: {job.id}") + +@mod.route('/add_error', methods=['POST', 'GET']) +def add_harvest_error(): + job_id = request.args.get('job_id', None) + if job_id is None: + return 'Please provide job_id: /add_error?job_id=xxx' + else: + err=db.add_harvest_error(new_error, job_id) + return(f"Added new error with ID: {err.id}") + +@mod.route('/organizations', methods=['GET']) +def get_all_organizations(): + result = db.get_all_organizations() + return result + +@mod.route('/harvest_sources', methods=['GET']) +def get_all_harvest_sources(): + result = db.get_all_harvest_sources() + return result + +@mod.route('/harvest_jobs', methods=['GET']) +def get_all_harvest_jobs(): + result = db.get_all_harvest_jobs() + return result + +@mod.route('/harvest_errors_by_job/', methods=['GET']) +def get_all_harvest_errors_by_job(job_id): + try: + result = db.get_all_harvest_errors_by_job(job_id) + return result + except Exception: + return " provide job_id" + +@mod.route('/harvest_source/', methods=['GET']) +def get_harvest_source(source_id): + try: + result = db.get_harvest_source(source_id) + return result + except Exception: + return " provide source_id" + +@mod.route('/harvest_job/', methods=['GET']) +def get_harvest_job(job_id): + try: + result = db.get_harvest_job(job_id) + return result + except Exception: + return "provide job_id" + +@mod.route('/harvest_error/', methods=['GET']) +def get_harvest_error(error_id): + try: + result = db.get_harvest_error(error_id) + return result + except Exception: + return "provide error_id" + +def register_routes(app): + app.register_blueprint(mod) \ No newline at end of file diff --git a/app/static/styles.css b/app/static/styles.css new file mode 100644 index 00000000..e69de29b diff --git a/app/templates/index.html b/app/templates/index.html new file mode 100644 index 00000000..f3114184 --- /dev/null +++ b/app/templates/index.html @@ -0,0 +1,38 @@ + + + + + Harvest Actions + + + +

Harvest Actions

+ + + diff --git a/migrations/README b/migrations/README new file mode 100644 index 00000000..0e048441 --- /dev/null +++ b/migrations/README @@ -0,0 +1 @@ +Single-database configuration for Flask. diff --git a/migrations/alembic.ini b/migrations/alembic.ini new file mode 100644 index 00000000..ec9d45c2 --- /dev/null +++ b/migrations/alembic.ini @@ -0,0 +1,50 @@ +# A generic, single database configuration. + +[alembic] +# template used to generate migration files +# file_template = %%(rev)s_%%(slug)s + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic,flask_migrate + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[logger_flask_migrate] +level = INFO +handlers = +qualname = flask_migrate + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/migrations/env.py b/migrations/env.py new file mode 100644 index 00000000..4c970927 --- /dev/null +++ b/migrations/env.py @@ -0,0 +1,113 @@ +import logging +from logging.config import fileConfig + +from flask import current_app + +from alembic import context + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +fileConfig(config.config_file_name) +logger = logging.getLogger('alembic.env') + + +def get_engine(): + try: + # this works with Flask-SQLAlchemy<3 and Alchemical + return current_app.extensions['migrate'].db.get_engine() + except (TypeError, AttributeError): + # this works with Flask-SQLAlchemy>=3 + return current_app.extensions['migrate'].db.engine + + +def get_engine_url(): + try: + return get_engine().url.render_as_string(hide_password=False).replace( + '%', '%%') + except AttributeError: + return str(get_engine().url).replace('%', '%%') + + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +config.set_main_option('sqlalchemy.url', get_engine_url()) +target_db = current_app.extensions['migrate'].db + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def get_metadata(): + if hasattr(target_db, 'metadatas'): + return target_db.metadatas[None] + return target_db.metadata + + +def run_migrations_offline(): + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, target_metadata=get_metadata(), literal_binds=True + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online(): + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + + # this callback is used to prevent an auto-migration from being generated + # when there are no changes to the schema + # reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html + def process_revision_directives(context, revision, directives): + if getattr(config.cmd_opts, 'autogenerate', False): + script = directives[0] + if script.upgrade_ops.is_empty(): + directives[:] = [] + logger.info('No changes in schema detected.') + + conf_args = current_app.extensions['migrate'].configure_args + if conf_args.get("process_revision_directives") is None: + conf_args["process_revision_directives"] = process_revision_directives + + connectable = get_engine() + + with connectable.connect() as connection: + context.configure( + connection=connection, + target_metadata=get_metadata(), + **conf_args + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/migrations/script.py.mako b/migrations/script.py.mako new file mode 100644 index 00000000..2c015630 --- /dev/null +++ b/migrations/script.py.mako @@ -0,0 +1,24 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + + +def upgrade(): + ${upgrades if upgrades else "pass"} + + +def downgrade(): + ${downgrades if downgrades else "pass"} diff --git a/migrations/versions/701baacbc2f2_base_models.py b/migrations/versions/701baacbc2f2_base_models.py new file mode 100644 index 00000000..65035397 --- /dev/null +++ b/migrations/versions/701baacbc2f2_base_models.py @@ -0,0 +1,113 @@ +"""base models + +Revision ID: 701baacbc2f2 +Revises: +Create Date: 2024-03-19 21:36:25.741447 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = '701baacbc2f2' +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('organization', + sa.Column('name', sa.String(), nullable=False), + sa.Column('logo', sa.String(), nullable=True), + sa.Column('id', sa.UUID(), server_default=sa.text('gen_random_uuid()'), nullable=False), + sa.PrimaryKeyConstraint('id') + ) + with op.batch_alter_table('organization', schema=None) as batch_op: + batch_op.create_index(batch_op.f('ix_organization_name'), ['name'], unique=False) + + op.create_table('harvest_source', + sa.Column('name', sa.String(), nullable=False), + sa.Column('notification_emails', postgresql.ARRAY(sa.String()), nullable=True), + sa.Column('organization_id', sa.UUID(), nullable=False), + sa.Column('frequency', sa.String(), nullable=False), + sa.Column('url', sa.String(), nullable=False), + sa.Column('schema_type', sa.String(), nullable=False), + sa.Column('source_type', sa.String(), nullable=False), + sa.Column('id', sa.UUID(), server_default=sa.text('gen_random_uuid()'), nullable=False), + sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('url') + ) + op.create_table('harvest_job', + sa.Column('harvest_source_id', sa.UUID(), nullable=False), + sa.Column('status', sa.Enum('new', 'in_progress', 'complete', name='job_status'), nullable=False), + sa.Column('date_created', sa.DateTime(), nullable=True), + sa.Column('date_finished', sa.DateTime(), nullable=True), + sa.Column('records_added', sa.Integer(), nullable=True), + sa.Column('records_updated', sa.Integer(), nullable=True), + sa.Column('records_deleted', sa.Integer(), nullable=True), + sa.Column('records_errored', sa.Integer(), nullable=True), + sa.Column('records_ignored', sa.Integer(), nullable=True), + sa.Column('id', sa.UUID(), server_default=sa.text('gen_random_uuid()'), nullable=False), + sa.ForeignKeyConstraint(['harvest_source_id'], ['harvest_source.id'], ), + sa.PrimaryKeyConstraint('id') + ) + with op.batch_alter_table('harvest_job', schema=None) as batch_op: + batch_op.create_index(batch_op.f('ix_harvest_job_date_created'), ['date_created'], unique=False) + batch_op.create_index(batch_op.f('ix_harvest_job_status'), ['status'], unique=False) + + op.create_table('harvest_error', + sa.Column('harvest_job_id', sa.UUID(), nullable=False), + sa.Column('harvest_record_id', sa.String(), nullable=True), + sa.Column('date_created', sa.DateTime(), nullable=True), + sa.Column('type', sa.String(), nullable=True), + sa.Column('severity', sa.Enum('CRITICAL', 'ERROR', 'WARN', name='error_serverity'), nullable=False), + sa.Column('message', sa.String(), nullable=True), + sa.Column('id', sa.UUID(), server_default=sa.text('gen_random_uuid()'), nullable=False), + sa.ForeignKeyConstraint(['harvest_job_id'], ['harvest_job.id'], ), + sa.PrimaryKeyConstraint('id') + ) + with op.batch_alter_table('harvest_error', schema=None) as batch_op: + batch_op.create_index(batch_op.f('ix_harvest_error_severity'), ['severity'], unique=False) + + op.create_table('harvest_record', + sa.Column('job_id', sa.UUID(), nullable=False), + sa.Column('identifier', sa.String(), nullable=False), + sa.Column('ckan_id', sa.String(), nullable=False), + sa.Column('type', sa.String(), nullable=False), + sa.Column('source_metadata', sa.String(), nullable=True), + sa.Column('id', sa.UUID(), server_default=sa.text('gen_random_uuid()'), nullable=False), + sa.ForeignKeyConstraint(['job_id'], ['harvest_job.id'], ), + sa.PrimaryKeyConstraint('id') + ) + with op.batch_alter_table('harvest_record', schema=None) as batch_op: + batch_op.create_index(batch_op.f('ix_harvest_record_ckan_id'), ['ckan_id'], unique=False) + batch_op.create_index('ix_job_id_identifier', ['job_id', 'identifier'], unique=False) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('harvest_record', schema=None) as batch_op: + batch_op.drop_index('ix_job_id_identifier') + batch_op.drop_index(batch_op.f('ix_harvest_record_ckan_id')) + + op.drop_table('harvest_record') + with op.batch_alter_table('harvest_error', schema=None) as batch_op: + batch_op.drop_index(batch_op.f('ix_harvest_error_severity')) + + op.drop_table('harvest_error') + with op.batch_alter_table('harvest_job', schema=None) as batch_op: + batch_op.drop_index(batch_op.f('ix_harvest_job_status')) + batch_op.drop_index(batch_op.f('ix_harvest_job_date_created')) + + op.drop_table('harvest_job') + op.drop_table('harvest_source') + with op.batch_alter_table('organization', schema=None) as batch_op: + batch_op.drop_index(batch_op.f('ix_organization_name')) + + op.drop_table('organization') + # ### end Alembic commands ### diff --git a/run.py b/run.py new file mode 100644 index 00000000..e3252c1e --- /dev/null +++ b/run.py @@ -0,0 +1,6 @@ +from app import create_app + +app = create_app() + +if __name__ == '__main__': + app.run(debug=True, port=8080) \ No newline at end of file From eb136bdc1db166631dd51c9d05f347e8ad23b1ea Mon Sep 17 00:00:00 2001 From: Jin-Sun-tts Date: Tue, 19 Mar 2024 19:02:42 -0400 Subject: [PATCH 20/20] lint fix --- .../versions/701baacbc2f2_base_models.py | 42 ++++++++++++------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/migrations/versions/701baacbc2f2_base_models.py b/migrations/versions/701baacbc2f2_base_models.py index 65035397..13c34ab3 100644 --- a/migrations/versions/701baacbc2f2_base_models.py +++ b/migrations/versions/701baacbc2f2_base_models.py @@ -21,28 +21,33 @@ def upgrade(): op.create_table('organization', sa.Column('name', sa.String(), nullable=False), sa.Column('logo', sa.String(), nullable=True), - sa.Column('id', sa.UUID(), server_default=sa.text('gen_random_uuid()'), nullable=False), + sa.Column('id', sa.UUID(), server_default=sa.text('gen_random_uuid()'), + nullable=False), sa.PrimaryKeyConstraint('id') ) with op.batch_alter_table('organization', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_organization_name'), ['name'], unique=False) + batch_op.create_index(batch_op.f('ix_organization_name'), + ['name'], unique=False) op.create_table('harvest_source', sa.Column('name', sa.String(), nullable=False), - sa.Column('notification_emails', postgresql.ARRAY(sa.String()), nullable=True), + sa.Column('notification_emails', postgresql.ARRAY(sa.String()), + nullable=True), sa.Column('organization_id', sa.UUID(), nullable=False), sa.Column('frequency', sa.String(), nullable=False), sa.Column('url', sa.String(), nullable=False), sa.Column('schema_type', sa.String(), nullable=False), sa.Column('source_type', sa.String(), nullable=False), - sa.Column('id', sa.UUID(), server_default=sa.text('gen_random_uuid()'), nullable=False), + sa.Column('id', sa.UUID(), server_default=sa.text('gen_random_uuid()'), + nullable=False), sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('url') ) op.create_table('harvest_job', sa.Column('harvest_source_id', sa.UUID(), nullable=False), - sa.Column('status', sa.Enum('new', 'in_progress', 'complete', name='job_status'), nullable=False), + sa.Column('status', sa.Enum('new', 'in_progress', 'complete', + name='job_status'), nullable=False), sa.Column('date_created', sa.DateTime(), nullable=True), sa.Column('date_finished', sa.DateTime(), nullable=True), sa.Column('records_added', sa.Integer(), nullable=True), @@ -50,27 +55,33 @@ def upgrade(): sa.Column('records_deleted', sa.Integer(), nullable=True), sa.Column('records_errored', sa.Integer(), nullable=True), sa.Column('records_ignored', sa.Integer(), nullable=True), - sa.Column('id', sa.UUID(), server_default=sa.text('gen_random_uuid()'), nullable=False), + sa.Column('id', sa.UUID(), server_default=sa.text('gen_random_uuid()'), + nullable=False), sa.ForeignKeyConstraint(['harvest_source_id'], ['harvest_source.id'], ), sa.PrimaryKeyConstraint('id') ) with op.batch_alter_table('harvest_job', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_harvest_job_date_created'), ['date_created'], unique=False) - batch_op.create_index(batch_op.f('ix_harvest_job_status'), ['status'], unique=False) + batch_op.create_index(batch_op.f('ix_harvest_job_date_created'), + ['date_created'], unique=False) + batch_op.create_index(batch_op.f('ix_harvest_job_status'), + ['status'], unique=False) op.create_table('harvest_error', sa.Column('harvest_job_id', sa.UUID(), nullable=False), sa.Column('harvest_record_id', sa.String(), nullable=True), sa.Column('date_created', sa.DateTime(), nullable=True), sa.Column('type', sa.String(), nullable=True), - sa.Column('severity', sa.Enum('CRITICAL', 'ERROR', 'WARN', name='error_serverity'), nullable=False), + sa.Column('severity', sa.Enum('CRITICAL', 'ERROR', 'WARN', + name='error_serverity'), nullable=False), sa.Column('message', sa.String(), nullable=True), - sa.Column('id', sa.UUID(), server_default=sa.text('gen_random_uuid()'), nullable=False), + sa.Column('id', sa.UUID(), server_default=sa.text('gen_random_uuid()'), + nullable=False), sa.ForeignKeyConstraint(['harvest_job_id'], ['harvest_job.id'], ), sa.PrimaryKeyConstraint('id') ) with op.batch_alter_table('harvest_error', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_harvest_error_severity'), ['severity'], unique=False) + batch_op.create_index(batch_op.f('ix_harvest_error_severity'), + ['severity'], unique=False) op.create_table('harvest_record', sa.Column('job_id', sa.UUID(), nullable=False), @@ -78,13 +89,16 @@ def upgrade(): sa.Column('ckan_id', sa.String(), nullable=False), sa.Column('type', sa.String(), nullable=False), sa.Column('source_metadata', sa.String(), nullable=True), - sa.Column('id', sa.UUID(), server_default=sa.text('gen_random_uuid()'), nullable=False), + sa.Column('id', sa.UUID(), server_default=sa.text('gen_random_uuid()'), + nullable=False), sa.ForeignKeyConstraint(['job_id'], ['harvest_job.id'], ), sa.PrimaryKeyConstraint('id') ) with op.batch_alter_table('harvest_record', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_harvest_record_ckan_id'), ['ckan_id'], unique=False) - batch_op.create_index('ix_job_id_identifier', ['job_id', 'identifier'], unique=False) + batch_op.create_index(batch_op.f('ix_harvest_record_ckan_id'), + ['ckan_id'], unique=False) + batch_op.create_index('ix_job_id_identifier', + ['job_id', 'identifier'], unique=False) # ### end Alembic commands ###