diff --git a/tests/sources/xml/test_datacite.py b/tests/sources/xml/test_datacite.py
index a5d4971..18242bf 100644
--- a/tests/sources/xml/test_datacite.py
+++ b/tests/sources/xml/test_datacite.py
@@ -517,6 +517,246 @@ def test_get_contributors_transforms_correctly_if_fields_missing():
assert Datacite.get_contributors(source_record) is None
+def test_get_dates_success():
+ source_record = create_datacite_source_record_stub(
+ """
+ 2017
+ 2017-02-27
+ 2019-06-24
+ 2007-01-01/2007-02-28
+ """
+ )
+ assert Datacite.get_dates(source_record) == [
+ Date(kind="Publication date", value="2017"),
+ Date(kind="Submitted", value="2017-02-27"),
+ Date(kind="Updated", note="This was updated on this date", value="2019-06-24"),
+ Date(
+ kind="Collected",
+ range=DateRange(gte="2007-01-01", lte="2007-02-28"),
+ ),
+ ]
+def test_get_dates_transforms_correctly_if_fields_blank():
+ source_record = create_datacite_source_record_stub(
+ """
+ """
+ )
+ assert Datacite.get_dates(source_record) is None
+def test_get_dates_transforms_correctly_if_fields_missing():
+ source_record = create_datacite_source_record_stub()
+ assert Datacite.get_dates(source_record) is None
+def test_get_edition_success():
+ source_record = create_datacite_source_record_stub("1.2")
+ assert Datacite.get_edition(source_record) == "1.2"
+def test_get_edition_transforms_correctly_if_fields_blank():
+ source_record = create_datacite_source_record_stub("")
+ assert Datacite.get_edition(source_record) is None
+def test_get_edition_transforms_correctly_if_fields_missing():
+ source_record = create_datacite_source_record_stub()
+ assert Datacite.get_edition(source_record) is None
+def test_get_file_formats_success():
+ source_record = create_datacite_source_record_stub(
+ """
+ application/vnd.openxmlformats-officedocument.spreadsheetml.sheet
+ application/pdf
+ application/pdf
+ application/vnd.openxmlformats-officedocument.spreadsheetml.sheet
+ application/pdf
+ application/x-stata-syntax
+ application/x-stata
+ application/x-stata
+ application/zip
+ application/pdf
+ application/pdf
+ """
+ )
+ assert Datacite.get_file_formats(source_record) == [
+ "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
+ "application/pdf",
+ "application/pdf",
+ "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
+ "application/pdf",
+ "application/x-stata-syntax",
+ "application/x-stata",
+ "application/x-stata",
+ "application/zip",
+ "application/pdf",
+ "application/pdf",
+ ]
+def test_get_file_formats_transforms_correctly_if_fields_blank():
+ source_record = create_datacite_source_record_stub("")
+ assert Datacite.get_file_formats(source_record) is None
+def test_get_file_formats_transforms_correctly_if_fields_missing():
+ source_record = create_datacite_source_record_stub()
+ assert Datacite.get_file_formats(source_record) is None
+def test_get_format_success():
+ assert Datacite.get_format() == "electronic resource"
+def test_get_funding_information_success():
+ source_record = create_datacite_source_record_stub(
+ """
+ 3ie, Nike Foundation
+ 0987
+ OW1/1012 (3ie)
+ """
+ )
+ assert Datacite.get_funding_information(source_record) == [
+ Funder(
+ funder_name="3ie, Nike Foundation",
+ funder_identifier="0987",
+ funder_identifier_type="Crossref FunderID",
+ award_number="OW1/1012 (3ie)",
+ award_uri="http://awards.example/7689",
+ )
+ ]
+def test_get_funding_information_transforms_correctly_if_fields_blank():
+ source_record = create_datacite_source_record_stub(
+ ""
+ )
+ assert Datacite.get_funding_information(source_record) is None
+def test_get_funding_information_transforms_correctly_if_fields_missing():
+ source_record = create_datacite_source_record_stub()
+ assert Datacite.get_funding_information(source_record) is None
+def test_get_identifiers_success():
+ source_record = create_datacite_source_record_stub(
+ """
+ 10.7910/DVN/19PPE7
+ https://zenodo.org/record/5524465
+ 10.1257/app.20150390
+ 10.5281/zenodo.5524464
+ 1234567.5524464
+ 1234567.5524464
+ https://zenodo.org/communities/astronomy-general
+ """
+ )
+ assert Datacite.get_identifiers(source_record) == [
+ Identifier(value="10.7910/DVN/19PPE7", kind="DOI"),
+ Identifier(value="https://zenodo.org/record/5524465", kind="url"),
+ Identifier(value="1234567.5524464", kind="IsIdenticalTo"),
+ ]
+def test_get_identifiers_transforms_correctly_if_fields_blank():
+ source_record = create_datacite_source_record_stub(
+ """
+ """
+ )
+ assert Datacite.get_identifiers(source_record) is None
+def test_get_identifiers_transforms_correctly_if_fields_missing():
+ source_record = create_datacite_source_record_stub()
+ assert Datacite.get_identifiers(source_record) is None
+def test_get_languages_success():
+ source_record = create_datacite_source_record_stub("en_US")
+ assert Datacite.get_languages(source_record) == ["en_US"]
+def test_get_languages_transforms_correctly_if_fields_blank():
+ source_record = create_datacite_source_record_stub("")
+ assert Datacite.get_languages(source_record) is None
+def test_get_languages_transforms_correctly_if_fields_missing():
+ source_record = create_datacite_source_record_stub()
+ assert Datacite.get_languages(source_record) is None
+def test_get_links_success(datacite_record_all_fields):
+ source_record = create_datacite_source_record_stub()
+ datacite_transformer = Datacite("jpal", datacite_record_all_fields)
+ assert datacite_transformer.get_links(source_record) == [
+ Link(
+ url="https://dataverse.harvard.edu/dataset.xhtml?persistentId=abc123",
+ kind="Digital object URL",
+ text="Digital object URL",
+ )
+ ]
+def test_get_locations_success():
+ source_record = create_datacite_source_record_stub(
+ """
+ A point on the globe
+ """
+ )
+ assert Datacite.get_locations(source_record) == [
+ Location(value="A point on the globe")
+ ]
+def test_get_locations_transforms_correctly_if_fields_blank():
+ source_record = create_datacite_source_record_stub(
+ ""
+ )
+ assert Datacite.get_locations(source_record) is None
+def test_get_locations_transforms_correctly_if_fields_missing():
+ source_record = create_datacite_source_record_stub()
+ assert Datacite.get_locations(source_record) is None
def test_generate_name_identifier_url_orcid_scheme(datacite_record_all_fields):
assert next(datacite_record_all_fields).contributors[0].identifier == [
diff --git a/transmogrifier/sources/xml/datacite.py b/transmogrifier/sources/xml/datacite.py
index ec99f6b..e6e2339 100644
--- a/transmogrifier/sources/xml/datacite.py
+++ b/transmogrifier/sources/xml/datacite.py
@@ -37,134 +37,31 @@ def get_optional_fields(self, source_record: Tag) -> dict | None:
fields["contributors"] = self.get_contributors(source_record)
# dates
- if publication_year := source_record.metadata.find(
- "publicationYear", string=True
- ):
- publication_year = str(publication_year.string.strip())
- if validate_date(
- publication_year,
- source_record_id,
- ):
- fields["dates"] = [
- timdex.Date(kind="Publication date", value=publication_year)
- ]
- else:
- logger.warning(
- "Datacite record %s missing required Datacite field publicationYear",
- source_record_id,
- )
- for date in source_record.metadata.find_all("date"):
- d = timdex.Date()
- if date_value := date.string:
- date_value = str(date_value)
- if "/" in date_value:
- split = date_value.index("/")
- gte_date = date_value[:split].strip()
- lte_date = date_value[split + 1 :].strip()
- if validate_date_range(
- gte_date,
- lte_date,
- source_record_id,
- ):
- d.range = timdex.DateRange(
- gte=gte_date,
- lte=lte_date,
- )
- else:
- d.value = (
- date_value.strip()
- if validate_date(
- date_value,
- source_record_id,
- )
- else None
- )
- d.note = date.get("dateInformation") or None
- if any([d.note, d.range, d.value]):
- d.kind = date.get("dateType") or None
- fields.setdefault("dates", []).append(d)
+ fields["dates"] = self.get_dates(source_record)
# edition
- if edition := source_record.metadata.find("version", string=True):
- fields["edition"] = edition.string
+ fields["edition"] = self.get_edition(source_record)
# file_formats
- fields["file_formats"] = [
- f.string for f in source_record.metadata.find_all("format", string=True)
- ] or None
+ fields["file_formats"] = self.get_file_formats(source_record)
# format
- fields["format"] = "electronic resource"
+ fields["format"] = self.get_format()
# funding_information
- for funding_reference in source_record.metadata.find_all("fundingReference"):
- f = timdex.Funder()
- if funder_name := funding_reference.find("funderName", string=True):
- f.funder_name = funder_name.string
- if award_number := funding_reference.find("awardNumber"):
- f.award_number = award_number.string or None
- f.award_uri = award_number.get("awardURI") or None
- if funder_identifier := funding_reference.find(
- "funderIdentifier", string=True
- ):
- f.funder_identifier = funder_identifier.string
- f.funder_identifier_type = (
- funder_identifier.get("funderIdentifierType") or None
- )
- if f != timdex.Funder():
- fields.setdefault("funding_information", []).append(f)
+ fields["funding_information"] = self.get_funding_information(source_record)
# identifiers
- if identifier_xml := source_record.metadata.find("identifier", string=True):
- fields.setdefault("identifiers", []).append(
- timdex.Identifier(
- value=identifier_xml.string,
- kind=identifier_xml.get("identifierType") or "Not specified",
- )
- )
- for alternate_identifier in source_record.metadata.find_all(
- "alternateIdentifier", string=True
- ):
- fields.setdefault("identifiers", []).append(
- timdex.Identifier(
- value=alternate_identifier.string,
- kind=alternate_identifier.get("alternateIdentifierType")
- or "Not specified",
- )
- )
- related_identifiers = source_record.metadata.find_all(
- "relatedIdentifier", string=True
- )
- for related_identifier in [
- ri for ri in related_identifiers if ri.get("relationType") == "IsIdenticalTo"
- ]:
- fields.setdefault("identifiers", []).append(
- timdex.Identifier(
- value=self.generate_related_item_identifier_url(related_identifier),
- kind=related_identifier["relationType"],
- )
- )
+ fields["identifiers"] = self.get_identifiers(source_record)
- # language
- if language := source_record.metadata.find("language", string=True):
- fields["languages"] = [language.string]
+ # languages
+ fields["languages"] = self.get_languages(source_record)
# links
- fields["links"] = [
- timdex.Link(
- kind="Digital object URL",
- text="Digital object URL",
- url=self.source_base_url + source_record_id,
- )
- ]
+ fields["links"] = self.get_links(source_record)
# locations
- for location in source_record.metadata.find_all("geoLocationPlace", string=True):
- fields.setdefault("locations", []).append(
- timdex.Location(value=location.string)
- )
+ fields["locations"] = self.get_locations(source_record)
# notes
if resource_type := source_record.metadata.find("resourceType", string=True):
@@ -201,7 +98,9 @@ def get_optional_fields(self, source_record: Tag) -> dict | None:
# related_items, uses related_identifiers retrieved for identifiers
for related_identifier in [
- ri for ri in related_identifiers if ri.get("relationType") != "IsIdenticalTo"
+ ri
+ for ri in source_record.metadata.find_all("relatedIdentifier", string=True)
+ if ri.get("relationType") != "IsIdenticalTo"
fields.setdefault("related_items", []).append(
@@ -292,7 +191,9 @@ def get_content_type(cls, source_record: Tag) -> list[str] | None:
def get_contributors(cls, source_record: Tag) -> list[timdex.Contributor] | None:
contributors = []
- contributors.extend(list(cls._get_contributors(source_record)))
+ contributors.extend(
+ list(cls._get_contributors_by_contributor_element(source_record))
+ )
return contributors or None
@@ -317,7 +218,9 @@ def _get_creators(cls, source_record: Tag) -> Iterator[timdex.Contributor]:
- def _get_contributors(cls, source_record: Tag) -> Iterator[timdex.Contributor]:
+ def _get_contributors_by_contributor_element(
+ cls, source_record: Tag
+ ) -> Iterator[timdex.Contributor]:
for contributor in source_record.metadata.find_all("contributor"):
if contributor_name := contributor.find("contributorName", string=True):
yield timdex.Contributor(
@@ -339,6 +242,189 @@ def _get_contributors(cls, source_record: Tag) -> Iterator[timdex.Contributor]:
kind=contributor.get("contributorType") or "Not specified",
+ @classmethod
+ def get_dates(
+ cls,
+ source_record: Tag,
+ ) -> list[timdex.Date] | None:
+ dates = []
+ dates.extend(list(cls._get_publication_year(source_record)))
+ dates.extend(list(cls._get_dates_by_date_element(source_record)))
+ return dates or None
+ @classmethod
+ def _get_publication_year(cls, source_record: Tag) -> Iterator[timdex.Date]:
+ if publication_year := source_record.metadata.find(
+ "publicationYear", string=True
+ ):
+ publication_year = str(publication_year.string.strip())
+ if validate_date(
+ publication_year,
+ cls.get_source_record_id(source_record),
+ ):
+ yield timdex.Date(kind="Publication date", value=publication_year)
+ else:
+ logger.warning(
+ "Datacite record %s missing required Datacite field publicationYear",
+ cls.get_source_record_id(source_record),
+ )
+ @classmethod
+ def _get_dates_by_date_element(cls, source_record: Tag) -> Iterator[timdex.Date]:
+ for date_element in source_record.metadata.find_all("date"):
+ date_object = timdex.Date()
+ if date_value := date_element.string:
+ date_value = str(date_value)
+ if "/" in date_value:
+ date_object = cls._parse_date_range(
+ date_object, date_value, cls.get_source_record_id(source_record)
+ )
+ else:
+ date_object.value = (
+ date_value.strip()
+ if validate_date(
+ date_value,
+ cls.get_source_record_id(source_record),
+ )
+ else None
+ )
+ date_object.note = date_element.get("dateInformation") or None
+ if any([date_object.note, date_object.range, date_object.value]):
+ date_object.kind = date_element.get("dateType") or None
+ yield date_object
+ @classmethod
+ def _parse_date_range(
+ cls, date_object: timdex.Date, date_value: str, source_record_id: str
+ ) -> timdex.Date:
+ split = date_value.index("/")
+ gte_date = date_value[:split].strip()
+ lte_date = date_value[split + 1 :].strip()
+ if validate_date_range(
+ gte_date,
+ lte_date,
+ source_record_id,
+ ):
+ date_object.range = timdex.DateRange(
+ gte=gte_date,
+ lte=lte_date,
+ )
+ return date_object
+ @classmethod
+ def get_edition(cls, source_record: Tag) -> str | None:
+ if edition := source_record.metadata.find("version", string=True):
+ return str(edition.string)
+ return None
+ @classmethod
+ def get_file_formats(cls, source_record: Tag) -> list[str] | None:
+ return [
+ str(file_format.string)
+ for file_format in source_record.metadata.find_all("format", string=True)
+ ] or None
+ @classmethod
+ def get_format(cls) -> str:
+ return "electronic resource"
+ @classmethod
+ def get_funding_information(cls, source_record: Tag) -> list[timdex.Funder] | None:
+ funding_information = []
+ for funding_reference in source_record.metadata.find_all("fundingReference"):
+ funder = timdex.Funder()
+ if funder_name := funding_reference.find("funderName", string=True):
+ funder.funder_name = str(funder_name.string)
+ if award_number := funding_reference.find("awardNumber"):
+ funder.award_number = award_number.string or None
+ funder.award_uri = award_number.get("awardURI") or None
+ if funder_identifier := funding_reference.find(
+ "funderIdentifier", string=True
+ ):
+ funder.funder_identifier = str(funder_identifier.string)
+ funder.funder_identifier_type = (
+ funder_identifier.get("funderIdentifierType") or None
+ )
+ if funder != timdex.Funder():
+ funding_information.append(funder)
+ return funding_information or None
+ @classmethod
+ def get_identifiers(
+ cls,
+ source_record: Tag,
+ ) -> list[timdex.Identifier] | None:
+ identifiers = []
+ if identifier_element := source_record.metadata.find("identifier", string=True):
+ identifiers.append(
+ timdex.Identifier(
+ value=str(identifier_element.string),
+ kind=identifier_element.get("identifierType") or "Not specified",
+ )
+ )
+ identifiers.extend(list(cls._get_alternate_identifiers(source_record)))
+ identifiers.extend(list(cls._get_related_identifiers(source_record)))
+ return identifiers or None
+ @classmethod
+ def _get_alternate_identifiers(
+ cls,
+ source_record: Tag,
+ ) -> Iterator[timdex.Identifier]:
+ for alternate_identifier_element in source_record.metadata.find_all(
+ "alternateIdentifier", string=True
+ ):
+ yield timdex.Identifier(
+ value=str(alternate_identifier_element.string),
+ kind=alternate_identifier_element.get("alternateIdentifierType")
+ or "Not specified",
+ )
+ @classmethod
+ def _get_related_identifiers(
+ cls,
+ source_record: Tag,
+ ) -> Iterator[timdex.Identifier]:
+ related_identifier_elements = source_record.metadata.find_all(
+ "relatedIdentifier", string=True
+ )
+ for related_identifier_element in [
+ related_identifier_element
+ for related_identifier_element in related_identifier_elements
+ if related_identifier_element.get("relationType") == "IsIdenticalTo"
+ ]:
+ yield timdex.Identifier(
+ value=cls.generate_related_item_identifier_url(
+ related_identifier_element
+ ),
+ kind=str(related_identifier_element["relationType"]),
+ )
+ @classmethod
+ def get_languages(cls, source_record: Tag) -> list[str] | None:
+ languages = []
+ if language := source_record.metadata.find("language", string=True):
+ languages.append(str(language.string))
+ return languages or None
+ def get_links(self, source_record: Tag) -> list[timdex.Link] | None:
+ return [
+ timdex.Link(
+ kind="Digital object URL",
+ text="Digital object URL",
+ url=self.source_base_url + self.get_source_record_id(source_record),
+ )
+ ]
+ @classmethod
+ def get_locations(cls, source_record: Tag) -> list[timdex.Location] | None:
+ return [
+ timdex.Location(value=str(location.string))
+ for location in source_record.metadata.find_all(
+ "geoLocationPlace", string=True
+ )
+ ] or None
def get_main_titles(cls, source_record: Tag) -> list[str]: