Skip to content

Commit

Permalink
Add private methods for key MARC elements (leader and control field '…
Browse files Browse the repository at this point in the history
…008')
  • Loading branch information
jonavellecuerdo committed Jul 22, 2024
1 parent 166f1a9 commit cdad188
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 36 deletions.
96 changes: 81 additions & 15 deletions tests/sources/xml/test_marc.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
# ruff: noqa: E501
# ruff: noqa: E501, SLF001
import logging

import pytest
from bs4 import BeautifulSoup # type: ignore[import-untyped]

import transmogrifier.models as timdex
from transmogrifier.exceptions import SkippedRecordEvent
from transmogrifier.sources.xml.marc import Marc


def create_marc_source_record_stub(datafield_insert: str = ""):
def create_marc_source_record_stub(
leader_field_insert: str = "<leader>03282nam 2200721Ki 4500</leader>",
control_field_general_info_insert: str = (
'<controlfield tag="008">170906s2016 fr mun| o e zxx d</controlfield>'
),
datafield_insert: str = "",
):
"""
Create source record for unit tests.
Args:
leader_field_insert (str): A string representing a MARC fixed length 'leader'
XML element. Defaults to a dummy value.
control_field_general_info_insert (str): A string representing a MARC fixed length
'general info control field' (i.e., code 008) XML element.
Defaults to a dummy value.
datafield_insert (str): A string representing a MARC 'datafield' XML element.
Note: A source record for "missing" field method tests can be created by
Expand All @@ -20,15 +33,20 @@ def create_marc_source_record_stub(datafield_insert: str = ""):
xml_string = """
<collection>
<record>
<leader>03282nam 2200721Ki 4500</leader>
<controlfield tag="008">170906s2016 fr mun| o e zxx d</controlfield>
{leader_field_insert}
{control_field_general_info_insert}
<controlfield tag="001">990027185640106761</controlfield>
{datafield_insert}
</record>
</collection>
"""

return BeautifulSoup(
xml_string.format(datafield_insert=datafield_insert),
xml_string.format(
leader_field_insert=leader_field_insert,
control_field_general_info_insert=control_field_general_info_insert,
datafield_insert=datafield_insert,
),
"xml",
)

Expand Down Expand Up @@ -775,6 +793,62 @@ def test_marc_record_with_missing_optional_fields_transforms_correctly():
)


def test_get_leader_field_success():
source_record = create_marc_source_record_stub()
assert Marc._get_leader_field(source_record) == "03282nam 2200721Ki 4500"


def test_get_leader_field_raises_error_if_field_blank():
source_record = create_marc_source_record_stub(
leader_field_insert="<leader></leader>"
)
with pytest.raises(
SkippedRecordEvent,
match=("Record skipped because key information is missing: <leader>."),
):
Marc._get_leader_field(source_record)


def test_get_leader_data_raises_error_if_field_missing():
source_record = create_marc_source_record_stub(leader_field_insert="")
with pytest.raises(
SkippedRecordEvent,
match=("Record skipped because key information is missing: <leader>."),
):
Marc._get_leader_field(source_record)


def test_get_control_field_general_info_success():
source_record = create_marc_source_record_stub()
assert Marc._get_control_field_general_info(source_record) == (
"170906s2016 fr mun| o e zxx d"
)


def test_get_control_field_general_info_if_field_blank():
source_record = create_marc_source_record_stub(
control_field_general_info_insert='<controlfield tag="008"></controlfield>'
)
with pytest.raises(
SkippedRecordEvent,
match=(
'Record skipped because key information is missing: <controlfield tag="008">.'
),
):
Marc._get_control_field_general_info(source_record)


def test_get_control_field_general_info_if_field_missing():
source_record = create_marc_source_record_stub(control_field_general_info_insert="")
with pytest.raises(
SkippedRecordEvent,
match=(
'Record skipped because key information is missing: <controlfield tag="008">.'
),
):
Marc._get_control_field_general_info(source_record)


def test_get_alternate_titles_success():
source_record = create_marc_source_record_stub(
datafield_insert=(
Expand Down Expand Up @@ -894,11 +968,7 @@ def test_marc_record_missing_leader_logs_error(caplog):
output_records = Marc("alma", marc_xml_records)
assert len(list(output_records)) == 0
assert output_records.processed_record_count == 1
assert (
"transmogrifier.sources.xml.marc",
logging.ERROR,
"Record ID 990027185640106761 is missing MARC leader",
) in caplog.record_tuples
assert output_records.skipped_record_count == 1


def test_marc_record_missing_008_logs_error(caplog):
Expand All @@ -908,11 +978,7 @@ def test_marc_record_missing_008_logs_error(caplog):
output_records = Marc("alma", marc_xml_records)
assert len(list(output_records)) == 0
assert output_records.processed_record_count == 1
assert (
"transmogrifier.sources.xml.marc",
logging.ERROR,
"Record ID 990027185640106761 is missing MARC 008 field",
) in caplog.record_tuples
assert output_records.skipped_record_count == 1


def test_create_subfield_value_list_from_datafield_with_values():
Expand Down
53 changes: 32 additions & 21 deletions transmogrifier/sources/xml/marc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import transmogrifier.models as timdex
from transmogrifier.config import load_external_config
from transmogrifier.exceptions import SkippedRecordEvent
from transmogrifier.helpers import validate_date
from transmogrifier.sources.xmltransformer import XMLTransformer

Expand Down Expand Up @@ -45,17 +46,8 @@ def get_optional_fields(self, source_record: Tag) -> dict | None:

source_record_id = Marc.get_source_record_id(source_record)

fixed_length_data = source_record.find("controlfield", tag="008", string=True)
if fixed_length_data is None:
message = f"Record ID {source_record_id} is missing MARC 008 field"
logger.error(message)
return None

leader = source_record.find("leader", string=True)
if leader is None:
message = f"Record ID {source_record_id} is missing MARC leader"
logger.error(message)
return None
leader_field = Marc._get_leader_field(source_record)
control_field_general_info = Marc._get_control_field_general_info(source_record)

# alternate titles
fields["alternate_titles"] = self.get_alternate_titles(source_record)
Expand All @@ -67,7 +59,7 @@ def get_optional_fields(self, source_record: Tag) -> dict | None:

# content_type
if content_type := Marc.json_crosswalk_code_to_name(
str(leader.string)[6:7],
leader_field[6:7],
marc_content_type_crosswalk,
source_record_id,
"Leader/06",
Expand Down Expand Up @@ -146,7 +138,7 @@ def get_optional_fields(self, source_record: Tag) -> dict | None:
fields["contributors"] = contributor_values or None

# dates
publication_year = str(fixed_length_data.string)[7:11].strip()
publication_year = control_field_general_info[7:11].strip()
if validate_date(publication_year, source_record_id):
fields["dates"] = [
timdex.Date(kind="Publication date", value=publication_year)
Expand Down Expand Up @@ -295,7 +287,7 @@ def get_optional_fields(self, source_record: Tag) -> dict | None:

# Get language codes
language_codes = []
if fixed_language_value := str(fixed_length_data.string)[35:38]:
if fixed_language_value := control_field_general_info[35:38]:
language_codes.append(fixed_language_value)
for field_041 in source_record.find_all("datafield", tag="041"):
language_codes.extend(
Expand Down Expand Up @@ -345,16 +337,16 @@ def get_optional_fields(self, source_record: Tag) -> dict | None:
# by leader "Type of Record" position = "Language Material" or "Manuscript
# language material" and "Bibliographic level" position =
# "Monographic component part," "Collection," "Subunit," or "Monograph/Item."
if leader.string[6:7] in "at" and leader.string[7:8] in "acdm":
if fixed_length_data.string[33:34] in "0se":
if leader_field[6:7] in "at" and leader_field[7:8] in "acdm":
if control_field_general_info[33:34] in "0se":
fields["literary_form"] = "Nonfiction"
elif fixed_length_data.string[33:34]:
elif control_field_general_info[33:34]:
fields["literary_form"] = "Fiction"

# locations

# Get place of publication from 008 field code
if fixed_location_code := str(fixed_length_data.string)[15:17]: # noqa: SIM102
if fixed_location_code := control_field_general_info[15:17]: # noqa: SIM102
if location_name := Marc.loc_crosswalk_code_to_name(
fixed_location_code, country_code_crosswalk, source_record_id, "country"
):
Expand Down Expand Up @@ -738,6 +730,25 @@ def loc_crosswalk_code_to_name(
)
return str(code_element.parent.find("name").string)

@classmethod
def _get_leader_field(cls, source_record: Tag) -> str:
if leader := source_record.find("leader", string=True):
return str(leader.string)
message = "Record skipped because key information is missing: <leader>."
raise SkippedRecordEvent(message)

@classmethod
def _get_control_field_general_info(cls, source_record: Tag) -> str:
if control_field_general_info := source_record.find(
"controlfield", tag="008", string=True
):
return str(control_field_general_info.string)
message = (
"Record skipped because key information is missing: "
'<controlfield tag="008">.'
)
raise SkippedRecordEvent(message)

@classmethod
def get_alternate_titles(
cls, source_record: Tag
Expand Down Expand Up @@ -795,7 +806,7 @@ def get_alternate_titles(

@classmethod
def get_call_numbers(cls, source_record: Tag) -> list[str] | None:
call_numbers = []
call_numbers: list = []
call_number_marc_fields = [
{
"tag": "050",
Expand All @@ -811,8 +822,8 @@ def get_call_numbers(cls, source_record: Tag) -> list[str] | None:
"datafield", tag=call_number_marc_field["tag"]
):
call_numbers.extend(
call_number_value
for call_number_value in cls.create_subfield_value_list_from_datafield(
call_number
for call_number in cls.create_subfield_value_list_from_datafield(
datafield, call_number_marc_field["subfields"]
)
)
Expand Down

0 comments on commit cdad188

Please sign in to comment.