From ff1aaeb338a351d4a3f8ec9d5d67a84ab62afc41 Mon Sep 17 00:00:00 2001 From: Eric Hanson Date: Thu, 30 May 2024 14:53:55 -0400 Subject: [PATCH] Add 2nd set of Datacite field methods (#181) * Add 2nd set of Datacite field methods Why these changes are being introduced: * Continue refactoring Datacite to use field methods How this addresses that need: * Add field methods and associated private methods for dates, edition, file_formats, format, funding_information, identifiers, languages, links, and locations * Update related_items code block to generate related_identifiers list that was moved into get_identifiers * Add unit tests for new field methods Side effects of this change: * None Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-284 * Updates based on discussion in PR #181 * Rename methods for clarity --- tests/sources/xml/test_datacite.py | 240 +++++++++++++++++++ transmogrifier/sources/xml/datacite.py | 318 ++++++++++++++++--------- 2 files changed, 442 insertions(+), 116 deletions(-) diff --git a/tests/sources/xml/test_datacite.py b/tests/sources/xml/test_datacite.py index a5d4971..18242bf 100644 --- a/tests/sources/xml/test_datacite.py +++ b/tests/sources/xml/test_datacite.py @@ -517,6 +517,246 @@ def test_get_contributors_transforms_correctly_if_fields_missing(): assert Datacite.get_contributors(source_record) is None +def test_get_dates_success(): + source_record = create_datacite_source_record_stub( + """ + 2017 + + 2017-02-27 + 2019-06-24 + 2007-01-01/2007-02-28 + + """ + ) + assert Datacite.get_dates(source_record) == [ + Date(kind="Publication date", value="2017"), + Date(kind="Submitted", value="2017-02-27"), + Date(kind="Updated", note="This was updated on this date", value="2019-06-24"), + Date( + kind="Collected", + range=DateRange(gte="2007-01-01", lte="2007-02-28"), + ), + ] + + +def test_get_dates_transforms_correctly_if_fields_blank(): + source_record = create_datacite_source_record_stub( + """ + + + + + """ + ) + assert Datacite.get_dates(source_record) is None + + +def test_get_dates_transforms_correctly_if_fields_missing(): + source_record = create_datacite_source_record_stub() + assert Datacite.get_dates(source_record) is None + + +def test_get_edition_success(): + source_record = create_datacite_source_record_stub("1.2") + assert Datacite.get_edition(source_record) == "1.2" + + +def test_get_edition_transforms_correctly_if_fields_blank(): + source_record = create_datacite_source_record_stub("") + assert Datacite.get_edition(source_record) is None + + +def test_get_edition_transforms_correctly_if_fields_missing(): + source_record = create_datacite_source_record_stub() + assert Datacite.get_edition(source_record) is None + + +def test_get_file_formats_success(): + source_record = create_datacite_source_record_stub( + """ + + application/vnd.openxmlformats-officedocument.spreadsheetml.sheet + application/pdf + application/pdf + application/vnd.openxmlformats-officedocument.spreadsheetml.sheet + application/pdf + application/x-stata-syntax + application/x-stata + application/x-stata + application/zip + application/pdf + application/pdf + + """ + ) + assert Datacite.get_file_formats(source_record) == [ + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + "application/pdf", + "application/pdf", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + "application/pdf", + "application/x-stata-syntax", + "application/x-stata", + "application/x-stata", + "application/zip", + "application/pdf", + "application/pdf", + ] + + +def test_get_file_formats_transforms_correctly_if_fields_blank(): + source_record = create_datacite_source_record_stub("") + assert Datacite.get_file_formats(source_record) is None + + +def test_get_file_formats_transforms_correctly_if_fields_missing(): + source_record = create_datacite_source_record_stub() + assert Datacite.get_file_formats(source_record) is None + + +def test_get_format_success(): + assert Datacite.get_format() == "electronic resource" + + +def test_get_funding_information_success(): + source_record = create_datacite_source_record_stub( + """ + + + 3ie, Nike Foundation + 0987 + OW1/1012 (3ie) + + + """ + ) + assert Datacite.get_funding_information(source_record) == [ + Funder( + funder_name="3ie, Nike Foundation", + funder_identifier="0987", + funder_identifier_type="Crossref FunderID", + award_number="OW1/1012 (3ie)", + award_uri="http://awards.example/7689", + ) + ] + + +def test_get_funding_information_transforms_correctly_if_fields_blank(): + source_record = create_datacite_source_record_stub( + "" + ) + assert Datacite.get_funding_information(source_record) is None + + +def test_get_funding_information_transforms_correctly_if_fields_missing(): + source_record = create_datacite_source_record_stub() + assert Datacite.get_funding_information(source_record) is None + + +def test_get_identifiers_success(): + source_record = create_datacite_source_record_stub( + """ + 10.7910/DVN/19PPE7 + + https://zenodo.org/record/5524465 + + + + 10.1257/app.20150390 + 10.5281/zenodo.5524464 + + 1234567.5524464 + + 1234567.5524464 + + https://zenodo.org/communities/astronomy-general + + """ + ) + assert Datacite.get_identifiers(source_record) == [ + Identifier(value="10.7910/DVN/19PPE7", kind="DOI"), + Identifier(value="https://zenodo.org/record/5524465", kind="url"), + Identifier(value="1234567.5524464", kind="IsIdenticalTo"), + ] + + +def test_get_identifiers_transforms_correctly_if_fields_blank(): + source_record = create_datacite_source_record_stub( + """ + + + + + + + + """ + ) + assert Datacite.get_identifiers(source_record) is None + + +def test_get_identifiers_transforms_correctly_if_fields_missing(): + source_record = create_datacite_source_record_stub() + assert Datacite.get_identifiers(source_record) is None + + +def test_get_languages_success(): + source_record = create_datacite_source_record_stub("en_US") + assert Datacite.get_languages(source_record) == ["en_US"] + + +def test_get_languages_transforms_correctly_if_fields_blank(): + source_record = create_datacite_source_record_stub("") + assert Datacite.get_languages(source_record) is None + + +def test_get_languages_transforms_correctly_if_fields_missing(): + source_record = create_datacite_source_record_stub() + assert Datacite.get_languages(source_record) is None + + +def test_get_links_success(datacite_record_all_fields): + source_record = create_datacite_source_record_stub() + datacite_transformer = Datacite("jpal", datacite_record_all_fields) + assert datacite_transformer.get_links(source_record) == [ + Link( + url="https://dataverse.harvard.edu/dataset.xhtml?persistentId=abc123", + kind="Digital object URL", + text="Digital object URL", + ) + ] + + +def test_get_locations_success(): + source_record = create_datacite_source_record_stub( + """ + + + A point on the globe + + + """ + ) + assert Datacite.get_locations(source_record) == [ + Location(value="A point on the globe") + ] + + +def test_get_locations_transforms_correctly_if_fields_blank(): + source_record = create_datacite_source_record_stub( + "" + ) + assert Datacite.get_locations(source_record) is None + + +def test_get_locations_transforms_correctly_if_fields_missing(): + source_record = create_datacite_source_record_stub() + assert Datacite.get_locations(source_record) is None + + def test_generate_name_identifier_url_orcid_scheme(datacite_record_all_fields): assert next(datacite_record_all_fields).contributors[0].identifier == [ "https://orcid.org/0000-0000-0000-0000" diff --git a/transmogrifier/sources/xml/datacite.py b/transmogrifier/sources/xml/datacite.py index ec99f6b..e6e2339 100644 --- a/transmogrifier/sources/xml/datacite.py +++ b/transmogrifier/sources/xml/datacite.py @@ -37,134 +37,31 @@ def get_optional_fields(self, source_record: Tag) -> dict | None: fields["contributors"] = self.get_contributors(source_record) # dates - if publication_year := source_record.metadata.find( - "publicationYear", string=True - ): - publication_year = str(publication_year.string.strip()) - if validate_date( - publication_year, - source_record_id, - ): - fields["dates"] = [ - timdex.Date(kind="Publication date", value=publication_year) - ] - else: - logger.warning( - "Datacite record %s missing required Datacite field publicationYear", - source_record_id, - ) - - for date in source_record.metadata.find_all("date"): - d = timdex.Date() - if date_value := date.string: - date_value = str(date_value) - if "/" in date_value: - split = date_value.index("/") - gte_date = date_value[:split].strip() - lte_date = date_value[split + 1 :].strip() - if validate_date_range( - gte_date, - lte_date, - source_record_id, - ): - d.range = timdex.DateRange( - gte=gte_date, - lte=lte_date, - ) - else: - d.value = ( - date_value.strip() - if validate_date( - date_value, - source_record_id, - ) - else None - ) - d.note = date.get("dateInformation") or None - if any([d.note, d.range, d.value]): - d.kind = date.get("dateType") or None - fields.setdefault("dates", []).append(d) + fields["dates"] = self.get_dates(source_record) # edition - if edition := source_record.metadata.find("version", string=True): - fields["edition"] = edition.string + fields["edition"] = self.get_edition(source_record) # file_formats - fields["file_formats"] = [ - f.string for f in source_record.metadata.find_all("format", string=True) - ] or None + fields["file_formats"] = self.get_file_formats(source_record) # format - fields["format"] = "electronic resource" + fields["format"] = self.get_format() # funding_information - for funding_reference in source_record.metadata.find_all("fundingReference"): - f = timdex.Funder() - if funder_name := funding_reference.find("funderName", string=True): - f.funder_name = funder_name.string - if award_number := funding_reference.find("awardNumber"): - f.award_number = award_number.string or None - f.award_uri = award_number.get("awardURI") or None - if funder_identifier := funding_reference.find( - "funderIdentifier", string=True - ): - f.funder_identifier = funder_identifier.string - f.funder_identifier_type = ( - funder_identifier.get("funderIdentifierType") or None - ) - if f != timdex.Funder(): - fields.setdefault("funding_information", []).append(f) + fields["funding_information"] = self.get_funding_information(source_record) # identifiers - if identifier_xml := source_record.metadata.find("identifier", string=True): - fields.setdefault("identifiers", []).append( - timdex.Identifier( - value=identifier_xml.string, - kind=identifier_xml.get("identifierType") or "Not specified", - ) - ) - for alternate_identifier in source_record.metadata.find_all( - "alternateIdentifier", string=True - ): - fields.setdefault("identifiers", []).append( - timdex.Identifier( - value=alternate_identifier.string, - kind=alternate_identifier.get("alternateIdentifierType") - or "Not specified", - ) - ) - - related_identifiers = source_record.metadata.find_all( - "relatedIdentifier", string=True - ) - for related_identifier in [ - ri for ri in related_identifiers if ri.get("relationType") == "IsIdenticalTo" - ]: - fields.setdefault("identifiers", []).append( - timdex.Identifier( - value=self.generate_related_item_identifier_url(related_identifier), - kind=related_identifier["relationType"], - ) - ) + fields["identifiers"] = self.get_identifiers(source_record) - # language - if language := source_record.metadata.find("language", string=True): - fields["languages"] = [language.string] + # languages + fields["languages"] = self.get_languages(source_record) # links - fields["links"] = [ - timdex.Link( - kind="Digital object URL", - text="Digital object URL", - url=self.source_base_url + source_record_id, - ) - ] + fields["links"] = self.get_links(source_record) # locations - for location in source_record.metadata.find_all("geoLocationPlace", string=True): - fields.setdefault("locations", []).append( - timdex.Location(value=location.string) - ) + fields["locations"] = self.get_locations(source_record) # notes if resource_type := source_record.metadata.find("resourceType", string=True): @@ -201,7 +98,9 @@ def get_optional_fields(self, source_record: Tag) -> dict | None: # related_items, uses related_identifiers retrieved for identifiers for related_identifier in [ - ri for ri in related_identifiers if ri.get("relationType") != "IsIdenticalTo" + ri + for ri in source_record.metadata.find_all("relatedIdentifier", string=True) + if ri.get("relationType") != "IsIdenticalTo" ]: fields.setdefault("related_items", []).append( timdex.RelatedItem( @@ -292,7 +191,9 @@ def get_content_type(cls, source_record: Tag) -> list[str] | None: def get_contributors(cls, source_record: Tag) -> list[timdex.Contributor] | None: contributors = [] contributors.extend(list(cls._get_creators(source_record))) - contributors.extend(list(cls._get_contributors(source_record))) + contributors.extend( + list(cls._get_contributors_by_contributor_element(source_record)) + ) return contributors or None @classmethod @@ -317,7 +218,9 @@ def _get_creators(cls, source_record: Tag) -> Iterator[timdex.Contributor]: ) @classmethod - def _get_contributors(cls, source_record: Tag) -> Iterator[timdex.Contributor]: + def _get_contributors_by_contributor_element( + cls, source_record: Tag + ) -> Iterator[timdex.Contributor]: for contributor in source_record.metadata.find_all("contributor"): if contributor_name := contributor.find("contributorName", string=True): yield timdex.Contributor( @@ -339,6 +242,189 @@ def _get_contributors(cls, source_record: Tag) -> Iterator[timdex.Contributor]: kind=contributor.get("contributorType") or "Not specified", ) + @classmethod + def get_dates( + cls, + source_record: Tag, + ) -> list[timdex.Date] | None: + dates = [] + dates.extend(list(cls._get_publication_year(source_record))) + dates.extend(list(cls._get_dates_by_date_element(source_record))) + return dates or None + + @classmethod + def _get_publication_year(cls, source_record: Tag) -> Iterator[timdex.Date]: + if publication_year := source_record.metadata.find( + "publicationYear", string=True + ): + publication_year = str(publication_year.string.strip()) + if validate_date( + publication_year, + cls.get_source_record_id(source_record), + ): + yield timdex.Date(kind="Publication date", value=publication_year) + else: + logger.warning( + "Datacite record %s missing required Datacite field publicationYear", + cls.get_source_record_id(source_record), + ) + + @classmethod + def _get_dates_by_date_element(cls, source_record: Tag) -> Iterator[timdex.Date]: + for date_element in source_record.metadata.find_all("date"): + date_object = timdex.Date() + if date_value := date_element.string: + date_value = str(date_value) + if "/" in date_value: + date_object = cls._parse_date_range( + date_object, date_value, cls.get_source_record_id(source_record) + ) + else: + date_object.value = ( + date_value.strip() + if validate_date( + date_value, + cls.get_source_record_id(source_record), + ) + else None + ) + date_object.note = date_element.get("dateInformation") or None + if any([date_object.note, date_object.range, date_object.value]): + date_object.kind = date_element.get("dateType") or None + yield date_object + + @classmethod + def _parse_date_range( + cls, date_object: timdex.Date, date_value: str, source_record_id: str + ) -> timdex.Date: + split = date_value.index("/") + gte_date = date_value[:split].strip() + lte_date = date_value[split + 1 :].strip() + if validate_date_range( + gte_date, + lte_date, + source_record_id, + ): + date_object.range = timdex.DateRange( + gte=gte_date, + lte=lte_date, + ) + return date_object + + @classmethod + def get_edition(cls, source_record: Tag) -> str | None: + if edition := source_record.metadata.find("version", string=True): + return str(edition.string) + return None + + @classmethod + def get_file_formats(cls, source_record: Tag) -> list[str] | None: + return [ + str(file_format.string) + for file_format in source_record.metadata.find_all("format", string=True) + ] or None + + @classmethod + def get_format(cls) -> str: + return "electronic resource" + + @classmethod + def get_funding_information(cls, source_record: Tag) -> list[timdex.Funder] | None: + funding_information = [] + for funding_reference in source_record.metadata.find_all("fundingReference"): + funder = timdex.Funder() + if funder_name := funding_reference.find("funderName", string=True): + funder.funder_name = str(funder_name.string) + if award_number := funding_reference.find("awardNumber"): + funder.award_number = award_number.string or None + funder.award_uri = award_number.get("awardURI") or None + if funder_identifier := funding_reference.find( + "funderIdentifier", string=True + ): + funder.funder_identifier = str(funder_identifier.string) + funder.funder_identifier_type = ( + funder_identifier.get("funderIdentifierType") or None + ) + if funder != timdex.Funder(): + funding_information.append(funder) + return funding_information or None + + @classmethod + def get_identifiers( + cls, + source_record: Tag, + ) -> list[timdex.Identifier] | None: + identifiers = [] + if identifier_element := source_record.metadata.find("identifier", string=True): + identifiers.append( + timdex.Identifier( + value=str(identifier_element.string), + kind=identifier_element.get("identifierType") or "Not specified", + ) + ) + identifiers.extend(list(cls._get_alternate_identifiers(source_record))) + identifiers.extend(list(cls._get_related_identifiers(source_record))) + return identifiers or None + + @classmethod + def _get_alternate_identifiers( + cls, + source_record: Tag, + ) -> Iterator[timdex.Identifier]: + for alternate_identifier_element in source_record.metadata.find_all( + "alternateIdentifier", string=True + ): + yield timdex.Identifier( + value=str(alternate_identifier_element.string), + kind=alternate_identifier_element.get("alternateIdentifierType") + or "Not specified", + ) + + @classmethod + def _get_related_identifiers( + cls, + source_record: Tag, + ) -> Iterator[timdex.Identifier]: + related_identifier_elements = source_record.metadata.find_all( + "relatedIdentifier", string=True + ) + for related_identifier_element in [ + related_identifier_element + for related_identifier_element in related_identifier_elements + if related_identifier_element.get("relationType") == "IsIdenticalTo" + ]: + yield timdex.Identifier( + value=cls.generate_related_item_identifier_url( + related_identifier_element + ), + kind=str(related_identifier_element["relationType"]), + ) + + @classmethod + def get_languages(cls, source_record: Tag) -> list[str] | None: + languages = [] + if language := source_record.metadata.find("language", string=True): + languages.append(str(language.string)) + return languages or None + + def get_links(self, source_record: Tag) -> list[timdex.Link] | None: + return [ + timdex.Link( + kind="Digital object URL", + text="Digital object URL", + url=self.source_base_url + self.get_source_record_id(source_record), + ) + ] + + @classmethod + def get_locations(cls, source_record: Tag) -> list[timdex.Location] | None: + return [ + timdex.Location(value=str(location.string)) + for location in source_record.metadata.find_all( + "geoLocationPlace", string=True + ) + ] or None + @classmethod def get_main_titles(cls, source_record: Tag) -> list[str]: """