diff --git a/.env b/.env index 70192494..028dbe68 100644 --- a/.env +++ b/.env @@ -8,7 +8,7 @@ S3FILESTORE__SIGNATURE_VERSION=s3v4 MDTRANSLATOR_URL=http://127.0.0.1:3000/translates -DATABASE_SERVER=db +DATABASE_SERVER=localhost DATABASE_PORT=5432 DATABASE_NAME=mydb DATABASE_USER=myuser diff --git a/app/__init__.py b/app/__init__.py index ac390386..fe306ee1 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -2,6 +2,9 @@ from .models import db from flask_migrate import Migrate import os +from dotenv import load_dotenv + +load_dotenv() DATABASE_URI = os.getenv('DATABASE_URI') diff --git a/app/models.py b/app/models.py index a8795105..5735d2c5 100644 --- a/app/models.py +++ b/app/models.py @@ -55,9 +55,11 @@ class HarvestError(Base): harvest_job_id = db.Column(UUID(as_uuid=True), db.ForeignKey('harvest_job.id'), nullable=False) - harvest_record_id = db.Column(UUID(as_uuid=True), - db.ForeignKey('harvest_record.id'), - nullable=True) + harvest_record_id = db.Column(db.String) + # to-do + # harvest_record_id = db.Column(UUID(as_uuid=True), + # db.ForeignKey('harvest_record.id'), + # nullable=True) date_created = db.Column(db.DateTime) type = db.Column(db.String) severity = db.Column(Enum('CRITICAL', 'ERROR', 'WARN', name='error_serverity'), diff --git a/app/templates/index.html b/app/templates/index.html index b462ff16..f3114184 100644 --- a/app/templates/index.html +++ b/app/templates/index.html @@ -25,11 +25,14 @@

Harvest Actions

  • Add Organization
  • Add Harvest Source
  • Add Harvest Job
  • +
  • Add Harvest Error
  • Get All Organizations
  • Get All Harvest Sources
  • Get All Harvest Jobs
  • +
  • Get All Harvest Errors By Job
  • Get Harvest Source
  • Get Harvest Job
  • +
  • Get Harvest Error
  • diff --git a/harvester/__init__.py b/harvester/__init__.py index ce8362ac..99a4defd 100644 --- a/harvester/__init__.py +++ b/harvester/__init__.py @@ -1 +1,7 @@ -from .harvest import HarvestSource, Record # noqa +import logging.config +from harvester.logger_config import LOGGING_CONFIG +from dotenv import load_dotenv + +load_dotenv() + +logging.config.dictConfig(LOGGING_CONFIG) diff --git a/harvester/data/dcatus/jsons/null-spatial.data.json b/harvester/data/dcatus/jsons/null-spatial.data.json deleted file mode 100644 index fd146ea5..00000000 --- a/harvester/data/dcatus/jsons/null-spatial.data.json +++ /dev/null @@ -1,43 +0,0 @@ -{ - "@type": "dcat:Catalog", - "describedBy": "https://project-open-data.cio.gov/v1.1/schema/catalog.json", - "conformsTo": "https://project-open-data.cio.gov/v1.1/schema", - "@context": "https://project-open-data.cio.gov/v1.1/schema/catalog.jsonld", - "dataset": [ - { - "identifier": "null-spatial", - "accessLevel": "public", - "contactPoint": { - "hasEmail": "mailto:Alexis.Graves@ocio.usda.gov", - "@type": "vcard:Contact", - "fn": "Alexi Graves" - }, - "programCode": [ - "005:059" - ], - "description": "Sample dataset. Spatial can be null", - "title": "Sample Title NUll Spatial", - "distribution": [ - { - "@type": "dcat:Distribution", - "downloadURL": "http://www.dm.usda.gov/foia/docs/Copy%20of%20ECM%20Congressional%20Logs%20FY14.xls", - "mediaType": "application/vnd.ms-excel", - "title": "Congressional Logs for Fiscal Year 2014" - } - ], - "license": "https://creativecommons.org/publicdomain/zero/1.0/", - "bureauCode": [ - "005:12" - ], - "modified": "2014-10-03", - "publisher": { - "@type": "org:Organization", - "name": "Department of Agriculture" - }, - "spatial": null, - "keyword": [ - "Congressional Logs" - ] - } - ] -} \ No newline at end of file diff --git a/harvester/exceptions.py b/harvester/exceptions.py new file mode 100644 index 00000000..2fccd18d --- /dev/null +++ b/harvester/exceptions.py @@ -0,0 +1,83 @@ +import logging +from datetime import datetime +from app.interface import HarvesterDBInterface + + +# critical exceptions +class HarvestCriticalException(Exception): + def __init__(self, msg, harvest_job_id): + super().__init__(msg, harvest_job_id) + + self.msg = msg + self.harvest_job_id = harvest_job_id + self.severity = "CRITICAL" + self.type = "job" + + self.db_interface = HarvesterDBInterface() + self.logger = logging.getLogger("harvest_runner") + + error_data = { + "harvest_job_id": self.harvest_job_id, + "message": self.msg, + "severity": self.severity, + "type": self.type, + "date_created": datetime.utcnow(), + } + + self.db_interface.add_harvest_error(error_data, self.harvest_job_id) + self.logger.critical(self.msg, exc_info=True) + + +class ExtractHarvestSourceException(HarvestCriticalException): + pass + + +class ExtractCKANSourceException(HarvestCriticalException): + pass + + +class CompareException(HarvestCriticalException): + pass + + +# non-critical exceptions +class HarvestNonCriticalException(Exception): + def __init__(self, msg, harvest_job_id, title): + super().__init__(msg, harvest_job_id, title) + + self.title = title + self.msg = msg + self.harvest_job_id = harvest_job_id + self.severity = "ERROR" + self.type = "record" + + self.db_interface = HarvesterDBInterface() + self.logger = logging.getLogger("harvest_runner") + + error_data = { + "harvest_job_id": self.harvest_job_id, + "message": self.msg, + "severity": self.severity, + "type": self.type, + "date_created": datetime.utcnow(), + "harvest_record_id": self.title # to-do + } + + self.db_interface.add_harvest_error(error_data, self.harvest_job_id) + self.logger.error(self.msg, exc_info=True) + + +class ValidationException(HarvestNonCriticalException): + pass + + +class TranformationException(HarvestNonCriticalException): + pass + + +class DCATUSToCKANException(HarvestNonCriticalException): + pass + + +class SynchronizeException(HarvestNonCriticalException): + pass diff --git a/harvester/harvest.py b/harvester/harvest.py index 22080f2b..32a158b5 100644 --- a/harvester/harvest.py +++ b/harvester/harvest.py @@ -1,19 +1,16 @@ import json -import logging import os import re -import sys -import traceback import xml.etree.ElementTree as ET from dataclasses import asdict, dataclass, field from datetime import datetime from pathlib import Path import functools +import logging import ckanapi import requests from bs4 import BeautifulSoup -from dotenv import load_dotenv from jsonschema import Draft202012Validator from .utils import ( @@ -23,31 +20,33 @@ open_json, sort_dataset, ) -from .ckan_utils import munge_tag, munge_title_to_name -load_dotenv() - -logging.basicConfig( - level=logging.NOTSET, - format="[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s", - handlers=[ - logging.FileHandler("harvest_load.log"), - logging.StreamHandler(sys.stdout), - ], +from .ckan_utils import munge_tag, munge_title_to_name +from .exceptions import ( + ExtractHarvestSourceException, + ExtractCKANSourceException, + ValidationException, + DCATUSToCKANException, + SynchronizeException, + CompareException, ) -logger = logging.getLogger(__name__) - -# TODD: make sure this doesn't change all requests +# requests data session = requests.Session() +# TODD: make sure this timeout config doesn't change all requests! session.request = functools.partial(session.request, timeout=15) +# ckan entrypoint ckan = ckanapi.RemoteCKAN( os.getenv("CKAN_URL_STAGE"), apikey=os.getenv("CKAN_API_TOKEN_STAGE"), session=session, ) +# logging data +# logging.getLogger(name) follows a singleton pattern +logger = logging.getLogger("harvest_runner") + ROOT_DIR = Path(__file__).parents[0] @@ -60,9 +59,10 @@ class HarvestSource: _title: str _url: str _owner_org: str # uuid + _job_id: str _extract_type: str # "datajson" or "waf-collection" _waf_config: dict = field(default_factory=lambda: {}) - _extra_source_name: str = "harvest_source_name" + _extra_source_name: str = "harvest_source_name" # TODO: change this! _dataset_schema: dict = field( default_factory=lambda: open_json( ROOT_DIR / "data" / "dcatus" / "schemas" / "dataset.json" @@ -113,9 +113,12 @@ def url(self) -> str: def owner_org(self) -> str: return self._owner_org + @property + def job_id(self) -> str: + return self._job_id + @property def extract_type(self) -> str: - # TODO: this is probably safe to assume right? if "json" in self._extract_type: return "datajson" if "waf" in self._extract_type: @@ -172,18 +175,26 @@ def download_dcatus(self): return resp.json() def harvest_to_id_hash(self, records: list[dict]) -> None: + # ruff: noqa: F841 logger.info("converting harvest records to id: hash") for record in records: - if self.extract_type == "datajson": - if "identifier" not in record: # TODO: what to do? - continue - identifier = record["identifier"] - dataset_hash = dataset_to_hash(sort_dataset(record)) - if self.extract_type == "waf-collection": - identifier = self.get_title_from_fgdc(record["content"]) - dataset_hash = dataset_to_hash(record["content"].decode("utf-8")) - - self.records[identifier] = Record(self, identifier, record, dataset_hash) + try: + if self.extract_type == "datajson": + identifier = record["identifier"] + dataset_hash = dataset_to_hash(sort_dataset(record)) + if self.extract_type == "waf-collection": + identifier = self.get_title_from_fgdc(record["content"]) + dataset_hash = dataset_to_hash(record["content"].decode("utf-8")) + + self.records[identifier] = Record( + self, identifier, record, dataset_hash + ) + except Exception as e: + # TODO: do something with 'e' + raise ExtractHarvestSourceException( + f"{self.title} {self.url} failed to convert record to id: hash format. exiting.", + self.job_id, + ) def ckan_to_id_hash(self, results: list[dict]) -> None: logger.info("converting ckan records to id: hash") @@ -243,18 +254,26 @@ def download_waf(self, files): def compare(self) -> None: """Compares records""" + # ruff: noqa: F841 logger.info("comparing harvest and ckan records") - harvest_ids = set(self.records.keys()) - ckan_ids = set(self.ckan_records.keys()) - same_ids = harvest_ids & ckan_ids + try: + harvest_ids = set(self.records.keys()) + ckan_ids = set(self.ckan_records.keys()) + same_ids = harvest_ids & ckan_ids - self.compare_data["delete"] = ckan_ids - harvest_ids - self.compare_data["create"] = harvest_ids - ckan_ids + self.compare_data["delete"] = ckan_ids - harvest_ids + self.compare_data["create"] = harvest_ids - ckan_ids - for i in same_ids: - if self.records[i].metadata_hash != self.ckan_records[i][0]: - self.compare_data["update"].add(i) + for i in same_ids: + if self.records[i].metadata_hash != self.ckan_records[i][0]: + self.compare_data["update"].add(i) + except Exception as e: + # TODO: do something with 'e' + raise CompareException( + f"{self.title} {self.url} failed to run compare. exiting.", + self.job_id, + ) def get_ckan_records(self, results=[]) -> None: logger.info("querying ckan") @@ -267,24 +286,31 @@ def get_ckan_records(self, results=[]) -> None: def get_ckan_records_as_id_hash(self) -> None: logger.info("retrieving and preparing ckan records") - self.ckan_to_id_hash(self.get_ckan_records(results=[])) + try: + self.ckan_to_id_hash(self.get_ckan_records(results=[])) + except Exception as e: # ruff: noqa: E841 + # TODO: do something with 'e' + raise ExtractCKANSourceException( + f"{self.title} {self.url} failed to extract ckan records. exiting.", + self.job_id, + ) def get_harvest_records_as_id_hash(self) -> None: logger.info("retrieving and preparing harvest records") - if self.extract_type == "datajson": - download_res = self.download_dcatus() - if download_res is None or "dataset" not in download_res: - logger.warning( - "no valid response from server or lack of 'dataset' array" + try: + if self.extract_type == "datajson": + download_res = self.download_dcatus() + self.harvest_to_id_hash(download_res["dataset"]) + if self.extract_type == "waf-collection": + # TODO: break out the xml catalogs as records + # TODO: handle no response + self.harvest_to_id_hash( + self.download_waf(self.traverse_waf(self.url, **self.waf_config)) ) - self.no_harvest_resp = True - return - self.harvest_to_id_hash(download_res["dataset"]) - if self.extract_type == "waf-collection": - # TODO: break out the xml catalogs as records - # TODO: handle no response - self.harvest_to_id_hash( - self.download_waf(self.traverse_waf(self.url, **self.waf_config)) + except Exception as e: # ruff: noqa: E841 + raise ExtractHarvestSourceException( + f"{self.title} {self.url} failed to extract harvest source. exiting", + self.job_id, ) def get_record_changes(self) -> None: @@ -293,14 +319,14 @@ def get_record_changes(self) -> None: try: self.get_ckan_records_as_id_hash() self.get_harvest_records_as_id_hash() - if self.no_harvest_resp is True: - return self.compare() - except Exception as e: - logger.error(self.title + " " + self.url + " " "\n") - logger.error( - "\n".join(traceback.format_exception(None, e, e.__traceback__)) - ) + except ( + ExtractCKANSourceException, + ExtractHarvestSourceException, + CompareException, + ) as e: + # TODO: do something with 'e'? + raise def synchronize_records(self) -> None: """runs the delete, update, and create @@ -326,11 +352,14 @@ def synchronize_records(self) -> None: record.validate() # TODO: add transformation and validation record.sync() - except Exception as e: - logger.error(f"error processing '{operation}' on record {i}\n") - logger.error( - "\n".join(traceback.format_exception(None, e, e.__traceback__)) - ) + + except ( + ValidationException, + DCATUSToCKANException, + SynchronizeException, + ) as e: + # TODO: do something with 'e'? + pass def report(self) -> None: logger.info("report results") @@ -382,6 +411,7 @@ def prepare_for_s3_upload(self) -> dict: return json.dumps(data, default=convert_set_to_list) def upload_to_s3(self, s3handler: S3Handler) -> None: + # ruff: noqa: F841 try: # TODO: confirm out_path out_path = f"{s3handler.endpoint_url}/{self.title}/job-id/{self.title}.json" @@ -389,10 +419,11 @@ def upload_to_s3(self, s3handler: S3Handler) -> None: logger.info(f"saved harvest source {self.title} in s3 at {out_path}") return out_path except Exception as e: - logger.error(f"error uploading harvest source ({self.title}) to s3 \n") - logger.error( - "\n".join(traceback.format_exception(None, e, e.__traceback__)) - ) + # logger.error(f"error uploading harvest source ({self.title}) to s3 \n") + # logger.error( + # "\n".join(traceback.format_exception(None, e, e.__traceback__)) + # ) + pass return False @@ -602,20 +633,33 @@ def simple_transform(self, metadata: dict) -> dict: return output def ckanify_dcatus(self) -> None: + # ruff: noqa: F841 logger.info("ckanifying dcatus record") - self.ckanified_metadata = self.simple_transform(self.metadata) - self.ckanified_metadata["resources"] = self.create_ckan_resources(self.metadata) - self.ckanified_metadata["tags"] = ( - self.create_ckan_tags(self.metadata["keyword"]) - if "keyword" in self.metadata - else [] - ) - self.ckanified_metadata["extras"] = self.create_ckan_extras() - logger.info("completed ckanifying dcatus record") + try: + self.ckanified_metadata = self.simple_transform(self.metadata) + + self.ckanified_metadata["resources"] = self.create_ckan_resources( + self.metadata + ) + self.ckanified_metadata["tags"] = ( + self.create_ckan_tags(self.metadata["keyword"]) + if "keyword" in self.metadata + else [] + ) + self.ckanified_metadata["extras"] = self.create_ckan_extras() + logger.info("completed ckanifying dcatus record") + except Exception as e: + # TODO: something with 'e' + raise DCATUSToCKANException( + f"unable to ckanify dcatus record {self.identifier}", + self.harvest_source.job_id, + self.identifier, + ) def validate(self) -> None: logger.info(f"validating {self.identifier}") + # ruff: noqa: F841 validator = Draft202012Validator(self.harvest_source.dataset_schema) try: validator.validate(self.metadata) @@ -623,6 +667,12 @@ def validate(self) -> None: except Exception as e: self.validation_msg = str(e) # TODO: verify this is what we want self.valid = False + # TODO: do something with 'e' in logger? + raise ValidationException( + f"{self.identifier} failed validation", + self.harvest_source.job_id, + self.identifier, + ) # def transform(self, metadata: dict): # """Transforms records""" @@ -648,11 +698,20 @@ def update_record(self) -> dict: self.status = "updated" def delete_record(self) -> None: - # purge returns nothing - ckan.action.dataset_purge(**{"id": self.identifier}) - self.status = "deleted" + # ruff: noqa: F841 + try: + ckan.action.dataset_purge(**{"id": self.identifier}) + self.status = "deleted" + except Exception as e: + # TODO: something with 'e' + raise SynchronizeException( + f"failed to delete {self.identifier}", + self.harvest_source.job_id, + self.identifier, + ) def sync(self) -> None: + # ruff: noqa: F841 if self.valid is False: logger.warning(f"{self.identifier} is invalid. bypassing {self.operation}") return @@ -661,10 +720,18 @@ def sync(self) -> None: start = datetime.now() - if self.operation == "create": - self.create_record() - if self.operation == "update": - self.update_record() + try: + if self.operation == "create": + self.create_record() + if self.operation == "update": + self.update_record() + except Exception as e: + # TODO: something with 'e' + raise SynchronizeException( + f"failed to {self.operation} for {self.identifier}", + self.harvest_source.job_id, + self.identifier, + ) logger.info( f"time to {self.operation} {self.identifier} {datetime.now()-start}" @@ -685,6 +752,7 @@ def prepare_for_s3_upload(self) -> dict: def upload_to_s3(self, s3handler: S3Handler) -> None: # ruff: noqa: E501 + # ruff: noqa: F841 try: out_path = f"{s3handler.endpoint_url}/{self.harvest_source.title}/job-id/{self.status}/{self.identifier}.json" s3handler.put_object(self.prepare_for_s3_upload(), out_path) @@ -693,11 +761,16 @@ def upload_to_s3(self, s3handler: S3Handler) -> None: ) return out_path except Exception as e: - logger.error( - f"error uploading harvest record ({self.identifier} of {self.harvest_source.title}) to s3 \n" - ) - logger.error( - "\n".join(traceback.format_exception(None, e, e.__traceback__)) - ) + # TODO: this exception + pass return False + + +def main(): + + pass + + +if __name__ == "__main__": + main() diff --git a/harvester/logger_config.py b/harvester/logger_config.py new file mode 100644 index 00000000..219d7575 --- /dev/null +++ b/harvester/logger_config.py @@ -0,0 +1,25 @@ +LOGGING_CONFIG = { + "version": 1, + "formatters": { + "standard": { + "format": ( + "[%(asctime)s] %(levelname)s " + "[%(name)s.%(funcName)s:%(lineno)d] %(message)s" + ) + }, + }, + "handlers": { + "console": { + "level": "INFO", + "formatter": "standard", + "class": "logging.StreamHandler", + "stream": "ext://sys.stdout", + }, + }, + "loggers": { + "harvest_runner": { + "handlers": ["console"], + "level": "INFO", + }, + }, +} diff --git a/harvester/utils.py b/harvester/utils.py index 042f86df..51614b0e 100644 --- a/harvester/utils.py +++ b/harvester/utils.py @@ -1,8 +1,6 @@ import hashlib import json import os -import string -import random import boto3 import sansjson @@ -10,16 +8,6 @@ # ruff: noqa: F841 -def create_random_text(str_len: int) -> str: - - alphabet = string.ascii_lowercase - output = "" - - for _ in range(str_len): - output += alphabet[random.randint(0, len(alphabet))] - return output - - def convert_set_to_list(obj): if isinstance(obj, set): return list(obj) @@ -31,6 +19,8 @@ def sort_dataset(d): def dataset_to_hash(d): + # TODO: check for sh1 or sha256? + # https://github.com/GSA/ckanext-datajson/blob/a3bc214fa7585115b9ff911b105884ef209aa416/ckanext/datajson/datajson.py#L279 return hashlib.sha256(json.dumps(d, sort_keys=True).encode("utf-8")).hexdigest() diff --git a/migrations/versions/779c95895070_base_model.py b/migrations/versions/701baacbc2f2_base_models.py similarity index 93% rename from migrations/versions/779c95895070_base_model.py rename to migrations/versions/701baacbc2f2_base_models.py index 86b40f45..13c34ab3 100644 --- a/migrations/versions/779c95895070_base_model.py +++ b/migrations/versions/701baacbc2f2_base_models.py @@ -1,8 +1,8 @@ -"""base model +"""base models -Revision ID: 779c95895070 +Revision ID: 701baacbc2f2 Revises: -Create Date: 2024-03-19 15:45:18.672638 +Create Date: 2024-03-19 21:36:25.741447 """ from alembic import op @@ -10,7 +10,7 @@ from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. -revision = '779c95895070' +revision = '701baacbc2f2' down_revision = None branch_labels = None depends_on = None @@ -21,8 +21,8 @@ def upgrade(): op.create_table('organization', sa.Column('name', sa.String(), nullable=False), sa.Column('logo', sa.String(), nullable=True), - sa.Column('id', sa.UUID(), - server_default=sa.text('gen_random_uuid()'), nullable=False), + sa.Column('id', sa.UUID(), server_default=sa.text('gen_random_uuid()'), + nullable=False), sa.PrimaryKeyConstraint('id') ) with op.batch_alter_table('organization', schema=None) as batch_op: @@ -31,7 +31,8 @@ def upgrade(): op.create_table('harvest_source', sa.Column('name', sa.String(), nullable=False), - sa.Column('notification_emails', postgresql.ARRAY(sa.String()), nullable=True), + sa.Column('notification_emails', postgresql.ARRAY(sa.String()), + nullable=True), sa.Column('organization_id', sa.UUID(), nullable=False), sa.Column('frequency', sa.String(), nullable=False), sa.Column('url', sa.String(), nullable=False), @@ -65,6 +66,23 @@ def upgrade(): batch_op.create_index(batch_op.f('ix_harvest_job_status'), ['status'], unique=False) + op.create_table('harvest_error', + sa.Column('harvest_job_id', sa.UUID(), nullable=False), + sa.Column('harvest_record_id', sa.String(), nullable=True), + sa.Column('date_created', sa.DateTime(), nullable=True), + sa.Column('type', sa.String(), nullable=True), + sa.Column('severity', sa.Enum('CRITICAL', 'ERROR', 'WARN', + name='error_serverity'), nullable=False), + sa.Column('message', sa.String(), nullable=True), + sa.Column('id', sa.UUID(), server_default=sa.text('gen_random_uuid()'), + nullable=False), + sa.ForeignKeyConstraint(['harvest_job_id'], ['harvest_job.id'], ), + sa.PrimaryKeyConstraint('id') + ) + with op.batch_alter_table('harvest_error', schema=None) as batch_op: + batch_op.create_index(batch_op.f('ix_harvest_error_severity'), + ['severity'], unique=False) + op.create_table('harvest_record', sa.Column('job_id', sa.UUID(), nullable=False), sa.Column('identifier', sa.String(), nullable=False), @@ -82,38 +100,20 @@ def upgrade(): batch_op.create_index('ix_job_id_identifier', ['job_id', 'identifier'], unique=False) - op.create_table('harvest_error', - sa.Column('harvest_job_id', sa.UUID(), nullable=False), - sa.Column('harvest_record_id', sa.UUID(), nullable=True), - sa.Column('date_created', sa.DateTime(), nullable=True), - sa.Column('type', sa.String(), nullable=True), - sa.Column('severity', sa.Enum('CRITICAL', 'ERROR', 'WARN', - name='error_serverity'), nullable=False), - sa.Column('message', sa.String(), nullable=True), - sa.Column('id', sa.UUID(), server_default=sa.text('gen_random_uuid()'), - nullable=False), - sa.ForeignKeyConstraint(['harvest_job_id'], ['harvest_job.id'], ), - sa.ForeignKeyConstraint(['harvest_record_id'], ['harvest_record.id'], ), - sa.PrimaryKeyConstraint('id') - ) - with op.batch_alter_table('harvest_error', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_harvest_error_severity'), - ['severity'], unique=False) - # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('harvest_error', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_harvest_error_severity')) - - op.drop_table('harvest_error') with op.batch_alter_table('harvest_record', schema=None) as batch_op: batch_op.drop_index('ix_job_id_identifier') batch_op.drop_index(batch_op.f('ix_harvest_record_ckan_id')) op.drop_table('harvest_record') + with op.batch_alter_table('harvest_error', schema=None) as batch_op: + batch_op.drop_index(batch_op.f('ix_harvest_error_severity')) + + op.drop_table('harvest_error') with op.batch_alter_table('harvest_job', schema=None) as batch_op: batch_op.drop_index(batch_op.f('ix_harvest_job_status')) batch_op.drop_index(batch_op.f('ix_harvest_job_date_created')) diff --git a/pyproject.toml b/pyproject.toml index fdedf195..213f8a7f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "datagov-harvesting-logic" -version = "0.3.3" +version = "0.3.4" description = "" # authors = [ # {name = "Jin Sun", email = "jin.sun@gsa.gov"}, diff --git a/scripts/smoke-test.py b/scripts/smoke-test.py index d782c0b7..73a693fe 100755 --- a/scripts/smoke-test.py +++ b/scripts/smoke-test.py @@ -2,14 +2,14 @@ import os import sys +from harvester import HarvestSource, Record sys.path.insert(1, "/".join(os.path.realpath(__file__).split("/")[0:-2])) -from harvester import HarvestSource, Record -harvest_source = HarvestSource('a', 'a', 'a', 'a') +harvest_source = HarvestSource("a", "a", "a", "a") -record = Record('a', 'a') +record = Record("a", "a") print(harvest_source) diff --git a/tests/conftest.py b/tests/conftest.py index 204d31f6..1e0dbf1b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,12 +1,62 @@ from pathlib import Path +import os import pytest +from dotenv import load_dotenv + +from sqlalchemy import create_engine, text +from sqlalchemy.orm import sessionmaker, scoped_session +from app.models import Base +from app.interface import HarvesterDBInterface from harvester.utils import open_json +load_dotenv() + HARVEST_SOURCES = Path(__file__).parents[0] / "harvest-sources" +@pytest.fixture(scope="session") +def db_session(): + DATABASE_SERVER = os.getenv("DATABASE_SERVER") + DATABASE_URI = os.getenv("DATABASE_URI") + TEST_SCHEMA = "test_schema" + modified_uri = DATABASE_URI.replace("@" + DATABASE_SERVER, "@localhost") + engine = create_engine(modified_uri) + + with engine.connect() as connection: + connection.execute(text(f"CREATE SCHEMA IF NOT EXISTS {TEST_SCHEMA};")) + connection.execute(text(f"SET search_path TO {TEST_SCHEMA};")) + + Base.metadata.create_all(engine) + SessionLocal = sessionmaker(bind=engine) + + session = scoped_session(SessionLocal) + yield session + + session.remove() + engine.dispose() + + with engine.begin() as connection: + connection.execute(text(f"DROP SCHEMA IF EXISTS {TEST_SCHEMA} CASCADE;")) + + +@pytest.fixture(scope="session") +def db_interface(db_session): + return HarvesterDBInterface(db_session) + + +@pytest.fixture +def bad_url_dcatus_config() -> dict: + return { + "_title": "test_harvest_source_title", + "_url": "http://localhost/dcatus/bad_url.json", + "_extract_type": "datajson", + "_owner_org": "example_organization", + "_job_id": "1db556ff-fb02-438b-b7d2-ad914e1f2531", + } + + @pytest.fixture def dcatus_config() -> dict: """example dcatus job payload""" @@ -15,6 +65,18 @@ def dcatus_config() -> dict: "_url": "http://localhost/dcatus/dcatus.json", "_extract_type": "datajson", "_owner_org": "example_organization", + "_job_id": "1db556ff-fb02-438b-b7d2-ad914e1f2531", + } + + +@pytest.fixture +def invalid_dcatus_config() -> dict: + return { + "_title": "test_harvest_source_name", + "_url": "http://localhost/dcatus/missing_title.json", + "_extract_type": "datajson", + "_owner_org": "example_organization", + "_job_id": "1db556ff-fb02-438b-b7d2-ad914e1f2531", } @@ -27,6 +89,7 @@ def waf_config() -> dict: "_extract_type": "waf-collection", "_owner_org": "example_organization", "_waf_config": {"filters": ["../", "dcatus/"]}, + "_job_id": "1db556ff-fb02-438b-b7d2-ad914e1f2531", } @@ -38,6 +101,7 @@ def dcatus_compare_config() -> dict: "_url": "http://localhost/dcatus/dcatus_compare.json", "_extract_type": "datajson", "_owner_org": "example_organization", + "_job_id": "1db556ff-fb02-438b-b7d2-ad914e1f2531", } diff --git a/tests/database/test_db.py b/tests/database/test_db.py index f0c5884f..a150bc4f 100644 --- a/tests/database/test_db.py +++ b/tests/database/test_db.py @@ -27,33 +27,38 @@ def db_session(): yield session session.remove() - engine.dispose() with engine.begin() as connection: connection.execute(text(f"DROP SCHEMA IF EXISTS {TEST_SCHEMA} CASCADE;")) + engine.dispose() + @pytest.fixture(scope='session') def db_interface(db_session): return HarvesterDBInterface(db_session) def test_add_harvest_source(db_interface): + org_data = db_interface.add_organization({'name': 'GSA', + 'logo': 'url for the logo'}) source_data = {'name': 'Test Source', 'frequency': 'daily', - 'url': "http://example.com", + 'url': "http://example-1.com", 'schema_type': 'strict', 'source_type': 'json'} - new_source = db_interface.add_harvest_source(source_data) + new_source = db_interface.add_harvest_source(source_data, str(org_data.id)) assert new_source.name == 'Test Source' def test_add_and_get_harvest_source(db_interface): + org_data = db_interface.add_organization({'name': 'GSA', + 'logo': 'url for the logo'}) new_source = db_interface.add_harvest_source({ 'name': 'Test Source', 'notification_emails': ['test@example.com'], 'frequency': 'daily', - 'url': "http://example.com", + 'url': "http://example-2.com", 'schema_type': 'strict', 'source_type': 'json' - }) + }, str(org_data.id)) assert new_source.name == 'Test Source' sources = db_interface.get_all_harvest_sources() @@ -61,14 +66,16 @@ def test_add_and_get_harvest_source(db_interface): def test_add_harvest_job(db_interface): + org_data = db_interface.add_organization({'name': 'GSA', + 'logo': 'url for the logo'}) new_source = db_interface.add_harvest_source({ 'name': 'Test Source', 'notification_emails': ['test@example.com'], 'frequency': 'daily', - 'url': "http://example.com", + 'url': "http://example-3.com", 'schema_type': 'strict', 'source_type': 'json' - }) + }, str(org_data.id)) job_data = { 'status': 'in_progress', diff --git a/tests/harvest-sources/dcatus/missing_title.json b/tests/harvest-sources/dcatus/missing_title.json new file mode 100644 index 00000000..faca3065 --- /dev/null +++ b/tests/harvest-sources/dcatus/missing_title.json @@ -0,0 +1,36 @@ +{ + "@type": "dcat:Catalog", + "describedBy": "https://project-open-data.cio.gov/v1.1/schema/catalog.json", + "conformsTo": "https://project-open-data.cio.gov/v1.1/schema", + "@context": "https://project-open-data.cio.gov/v1.1/schema/catalog.jsonld", + "dataset": [ + { + "identifier": "null-spatial", + "accessLevel": "public", + "contactPoint": { + "hasEmail": "mailto:Alexis.Graves@ocio.usda.gov", + "@type": "vcard:Contact", + "fn": "Alexi Graves" + }, + "programCode": ["005:059"], + "description": "Sample dataset. Spatial can be null", + "distribution": [ + { + "@type": "dcat:Distribution", + "downloadURL": "http://www.dm.usda.gov/foia/docs/Copy%20of%20ECM%20Congressional%20Logs%20FY14.xls", + "mediaType": "application/vnd.ms-excel", + "title": "Congressional Logs for Fiscal Year 2014" + } + ], + "license": "https://creativecommons.org/publicdomain/zero/1.0/", + "bureauCode": ["005:12"], + "modified": "2014-10-03", + "publisher": { + "@type": "org:Organization", + "name": "Department of Agriculture" + }, + "spatial": null, + "keyword": ["Congressional Logs"] + } + ] +} diff --git a/tests/unit/exception/test_exception_handling.py b/tests/unit/exception/test_exception_handling.py new file mode 100644 index 00000000..73e361df --- /dev/null +++ b/tests/unit/exception/test_exception_handling.py @@ -0,0 +1,99 @@ +from datetime import datetime +from unittest.mock import patch + +import pytest + +import harvester +from harvester.harvest import HarvestSource +from harvester.exceptions import ( + ExtractHarvestSourceException, + ExtractCKANSourceException, + ValidationException, + DCATUSToCKANException, + SynchronizeException, +) + +import ckanapi + +# ruff: noqa: F401 +# ruff: noqa: F841 + + +class TestExceptionHandling: + def test_add_harvest_source(self, db_interface): + organization = { + "id": "919bfb9e-89eb-4032-9abf-eee54be5a00c", + "logo": "url for the logo", + "name": "GSA" + } + + harvest_source = { + "id": "9347a852-2498-4bee-b817-90b8e93c9cec", + "name": "harvest_source_test", + "notification_emails": ["admin@example.com"], + "organization_id": "919bfb9e-89eb-4032-9abf-eee54be5a00c", + "frequency": "daily", + "url": "http://example.com", + "schema_type": "strict", + "source_type": "json", + } + + harvest_job = { + "harvest_source_id": "9347a852-2498-4bee-b817-90b8e93c9cec", + "id": "1db556ff-fb02-438b-b7d2-ad914e1f2531", + "status": "in_progress", + "date_created": datetime.utcnow(), + "date_finished": datetime.utcnow(), + "records_added": 0, + "records_updated": 0, + "records_deleted": 0, + "records_errored": 0, + "records_ignored": 0, + } + db_interface.add_organization(organization) + db_interface.add_harvest_source(harvest_source, + harvest_source["organization_id"]) + db_interface.add_harvest_job(harvest_job, harvest_job["harvest_source_id"]) + + def test_bad_harvest_source_url_exception(self, bad_url_dcatus_config): + harvest_source = HarvestSource(**bad_url_dcatus_config) + + with pytest.raises(ExtractHarvestSourceException) as e: + harvest_source.get_harvest_records_as_id_hash() + + @patch("harvester.harvest.ckan", ckanapi.RemoteCKAN("mock_address")) + def test_get_ckan_records_exception(self, bad_url_dcatus_config): + # using bad_url_dcatus_config just to populate required fields + harvest_source = HarvestSource(**bad_url_dcatus_config) + + with pytest.raises(ExtractCKANSourceException) as e: + harvest_source.get_ckan_records_as_id_hash() + + def test_validation_exception(self, invalid_dcatus_config): + harvest_source = HarvestSource(**invalid_dcatus_config) + harvest_source.get_harvest_records_as_id_hash() + + test_record = harvest_source.records["null-spatial"] + + with pytest.raises(ValidationException) as e: + test_record.validate() + + def test_dcatus_to_ckan_exception(self, invalid_dcatus_config): + harvest_source = HarvestSource(**invalid_dcatus_config) + harvest_source.get_harvest_records_as_id_hash() + + test_record = harvest_source.records["null-spatial"] + + with pytest.raises(DCATUSToCKANException) as e: + test_record.ckanify_dcatus() + + @patch("harvester.harvest.ckan", ckanapi.RemoteCKAN("mock_address")) + def test_synchronization_exception(self, dcatus_config): + harvest_source = HarvestSource(**dcatus_config) + harvest_source.get_harvest_records_as_id_hash() + + test_record = harvest_source.records["cftc-dc1"] + test_record.operation = "create" + + with pytest.raises(SynchronizeException) as e: + test_record.sync()