diff --git a/tests/sources/xml/test_marc.py b/tests/sources/xml/test_marc.py
index 5ec8cac..b9b4ef8 100644
--- a/tests/sources/xml/test_marc.py
+++ b/tests/sources/xml/test_marc.py
@@ -1,17 +1,30 @@
-# ruff: noqa: E501
+# ruff: noqa: E501, SLF001
import logging
+import pytest
from bs4 import BeautifulSoup # type: ignore[import-untyped]
import transmogrifier.models as timdex
+from transmogrifier.exceptions import SkippedRecordEvent
from transmogrifier.sources.xml.marc import Marc
-def create_marc_source_record_stub(datafield_insert: str = ""):
+def create_marc_source_record_stub(
+ leader_field_insert: str = "03282nam 2200721Ki 4500",
+ control_field_general_info_insert: str = (
+ '170906s2016 fr mun| o e zxx d'
+ ),
+ datafield_insert: str = "",
+):
"""
Create source record for unit tests.
Args:
+ leader_field_insert (str): A string representing a MARC fixed length 'leader'
+ XML element. Defaults to a dummy value.
+ control_field_general_info_insert (str): A string representing a MARC fixed length
+ 'general info control field' (i.e., code 008) XML element.
+ Defaults to a dummy value.
datafield_insert (str): A string representing a MARC 'datafield' XML element.
Note: A source record for "missing" field method tests can be created by
@@ -20,15 +33,20 @@ def create_marc_source_record_stub(datafield_insert: str = ""):
xml_string = """
- 03282nam 2200721Ki 4500
- 170906s2016 fr mun| o e zxx d
+ {leader_field_insert}
+ {control_field_general_info_insert}
990027185640106761
{datafield_insert}
"""
+
return BeautifulSoup(
- xml_string.format(datafield_insert=datafield_insert),
+ xml_string.format(
+ leader_field_insert=leader_field_insert,
+ control_field_general_info_insert=control_field_general_info_insert,
+ datafield_insert=datafield_insert,
+ ),
"xml",
)
@@ -775,6 +793,62 @@ def test_marc_record_with_missing_optional_fields_transforms_correctly():
)
+def test_get_leader_field_success():
+ source_record = create_marc_source_record_stub()
+ assert Marc._get_leader_field(source_record) == "03282nam 2200721Ki 4500"
+
+
+def test_get_leader_field_raises_error_if_field_blank():
+ source_record = create_marc_source_record_stub(
+ leader_field_insert=""
+ )
+ with pytest.raises(
+ SkippedRecordEvent,
+ match=("Record skipped because key information is missing: ."),
+ ):
+ Marc._get_leader_field(source_record)
+
+
+def test_get_leader_data_raises_error_if_field_missing():
+ source_record = create_marc_source_record_stub(leader_field_insert="")
+ with pytest.raises(
+ SkippedRecordEvent,
+ match=("Record skipped because key information is missing: ."),
+ ):
+ Marc._get_leader_field(source_record)
+
+
+def test_get_control_field_general_info_success():
+ source_record = create_marc_source_record_stub()
+ assert Marc._get_control_field_general_info(source_record) == (
+ "170906s2016 fr mun| o e zxx d"
+ )
+
+
+def test_get_control_field_general_info_if_field_blank():
+ source_record = create_marc_source_record_stub(
+ control_field_general_info_insert=''
+ )
+ with pytest.raises(
+ SkippedRecordEvent,
+ match=(
+ 'Record skipped because key information is missing: .'
+ ),
+ ):
+ Marc._get_control_field_general_info(source_record)
+
+
+def test_get_control_field_general_info_if_field_missing():
+ source_record = create_marc_source_record_stub(control_field_general_info_insert="")
+ with pytest.raises(
+ SkippedRecordEvent,
+ match=(
+ 'Record skipped because key information is missing: .'
+ ),
+ ):
+ Marc._get_control_field_general_info(source_record)
+
+
def test_get_alternate_titles_success():
source_record = create_marc_source_record_stub(
datafield_insert=(
@@ -894,11 +968,7 @@ def test_marc_record_missing_leader_logs_error(caplog):
output_records = Marc("alma", marc_xml_records)
assert len(list(output_records)) == 0
assert output_records.processed_record_count == 1
- assert (
- "transmogrifier.sources.xml.marc",
- logging.ERROR,
- "Record ID 990027185640106761 is missing MARC leader",
- ) in caplog.record_tuples
+ assert output_records.skipped_record_count == 1
def test_marc_record_missing_008_logs_error(caplog):
@@ -908,11 +978,7 @@ def test_marc_record_missing_008_logs_error(caplog):
output_records = Marc("alma", marc_xml_records)
assert len(list(output_records)) == 0
assert output_records.processed_record_count == 1
- assert (
- "transmogrifier.sources.xml.marc",
- logging.ERROR,
- "Record ID 990027185640106761 is missing MARC 008 field",
- ) in caplog.record_tuples
+ assert output_records.skipped_record_count == 1
def test_create_subfield_value_list_from_datafield_with_values():
diff --git a/transmogrifier/sources/xml/marc.py b/transmogrifier/sources/xml/marc.py
index 0573004..688ebd0 100644
--- a/transmogrifier/sources/xml/marc.py
+++ b/transmogrifier/sources/xml/marc.py
@@ -4,6 +4,7 @@
import transmogrifier.models as timdex
from transmogrifier.config import load_external_config
+from transmogrifier.exceptions import SkippedRecordEvent
from transmogrifier.helpers import validate_date
from transmogrifier.sources.xmltransformer import XMLTransformer
@@ -45,17 +46,8 @@ def get_optional_fields(self, source_record: Tag) -> dict | None:
source_record_id = Marc.get_source_record_id(source_record)
- fixed_length_data = source_record.find("controlfield", tag="008", string=True)
- if fixed_length_data is None:
- message = f"Record ID {source_record_id} is missing MARC 008 field"
- logger.error(message)
- return None
-
- leader = source_record.find("leader", string=True)
- if leader is None:
- message = f"Record ID {source_record_id} is missing MARC leader"
- logger.error(message)
- return None
+ leader_field = Marc._get_leader_field(source_record)
+ control_field_general_info = Marc._get_control_field_general_info(source_record)
# alternate titles
fields["alternate_titles"] = self.get_alternate_titles(source_record)
@@ -67,7 +59,7 @@ def get_optional_fields(self, source_record: Tag) -> dict | None:
# content_type
if content_type := Marc.json_crosswalk_code_to_name(
- str(leader.string)[6:7],
+ leader_field[6:7],
marc_content_type_crosswalk,
source_record_id,
"Leader/06",
@@ -146,7 +138,7 @@ def get_optional_fields(self, source_record: Tag) -> dict | None:
fields["contributors"] = contributor_values or None
# dates
- publication_year = str(fixed_length_data.string)[7:11].strip()
+ publication_year = control_field_general_info[7:11].strip()
if validate_date(publication_year, source_record_id):
fields["dates"] = [
timdex.Date(kind="Publication date", value=publication_year)
@@ -295,7 +287,7 @@ def get_optional_fields(self, source_record: Tag) -> dict | None:
# Get language codes
language_codes = []
- if fixed_language_value := str(fixed_length_data.string)[35:38]:
+ if fixed_language_value := control_field_general_info[35:38]:
language_codes.append(fixed_language_value)
for field_041 in source_record.find_all("datafield", tag="041"):
language_codes.extend(
@@ -345,16 +337,16 @@ def get_optional_fields(self, source_record: Tag) -> dict | None:
# by leader "Type of Record" position = "Language Material" or "Manuscript
# language material" and "Bibliographic level" position =
# "Monographic component part," "Collection," "Subunit," or "Monograph/Item."
- if leader.string[6:7] in "at" and leader.string[7:8] in "acdm":
- if fixed_length_data.string[33:34] in "0se":
+ if leader_field[6:7] in "at" and leader_field[7:8] in "acdm":
+ if control_field_general_info[33:34] in "0se":
fields["literary_form"] = "Nonfiction"
- elif fixed_length_data.string[33:34]:
+ elif control_field_general_info[33:34]:
fields["literary_form"] = "Fiction"
# locations
# Get place of publication from 008 field code
- if fixed_location_code := str(fixed_length_data.string)[15:17]: # noqa: SIM102
+ if fixed_location_code := control_field_general_info[15:17]: # noqa: SIM102
if location_name := Marc.loc_crosswalk_code_to_name(
fixed_location_code, country_code_crosswalk, source_record_id, "country"
):
@@ -738,6 +730,25 @@ def loc_crosswalk_code_to_name(
)
return str(code_element.parent.find("name").string)
+ @classmethod
+ def _get_leader_field(cls, source_record: Tag) -> str:
+ if leader := source_record.find("leader", string=True):
+ return str(leader.string)
+ message = "Record skipped because key information is missing: ."
+ raise SkippedRecordEvent(message)
+
+ @classmethod
+ def _get_control_field_general_info(cls, source_record: Tag) -> str:
+ if control_field_general_info := source_record.find(
+ "controlfield", tag="008", string=True
+ ):
+ return str(control_field_general_info.string)
+ message = (
+ "Record skipped because key information is missing: "
+ '.'
+ )
+ raise SkippedRecordEvent(message)
+
@classmethod
def get_alternate_titles(
cls, source_record: Tag
@@ -795,7 +806,7 @@ def get_alternate_titles(
@classmethod
def get_call_numbers(cls, source_record: Tag) -> list[str] | None:
- call_numbers = []
+ call_numbers: list = []
call_number_marc_fields = [
{
"tag": "050",
@@ -811,8 +822,8 @@ def get_call_numbers(cls, source_record: Tag) -> list[str] | None:
"datafield", tag=call_number_marc_field["tag"]
):
call_numbers.extend(
- call_number_value
- for call_number_value in cls.create_subfield_value_list_from_datafield(
+ call_number
+ for call_number in cls.create_subfield_value_list_from_datafield(
datafield, call_number_marc_field["subfields"]
)
)