diff --git a/clinlp/qualifier/__init__.py b/clinlp/qualifier/__init__.py index dd31a27..c0430a2 100644 --- a/clinlp/qualifier/__init__.py +++ b/clinlp/qualifier/__init__.py @@ -11,4 +11,4 @@ ) if importlib.util.find_spec("transformers") is not None: - from .transformer import NegationTransformer + from .transformer import NegationTransformer, ExperiencerTransformer diff --git a/clinlp/qualifier/transformer.py b/clinlp/qualifier/transformer.py index 30ff326..35d6f4a 100644 --- a/clinlp/qualifier/transformer.py +++ b/clinlp/qualifier/transformer.py @@ -13,7 +13,8 @@ ) from clinlp.util import clinlp_autocomponent -TRANSFORMER_REPO = "UMCU/MedRoBERTa.nl_NegationDetection" +TRANSFORMER_NEGATION_REPO = "UMCU/MedRoBERTa.nl_NegationDetection" +TRANSFORMER_EXPERIENCER_REPO = "UMCU/MedRoBERTa.nl_Experiencer" _defaults_negation_transformer = { "token_window": 32, @@ -23,7 +24,14 @@ "negation_threshold": 0.5, "affirmed_threshold": 0.5, } - +_defaults_experiencer_transformer = { + "token_window": 64, + "strip_entities": True, + "placeholder": None, + "probas_aggregator": statistics.mean, + "patient_threshold": 0.5, + "other_threshold": 0.5, +} @Language.factory( name="clinlp_negation_transformer", @@ -57,8 +65,8 @@ def __init__( self.negation_threshold = negation_threshold self.affirmed_threshold = affirmed_threshold - self.tokenizer = AutoTokenizer.from_pretrained(TRANSFORMER_REPO) - self.model = RobertaForTokenClassification.from_pretrained(TRANSFORMER_REPO) + self.tokenizer = AutoTokenizer.from_pretrained(TRANSFORMER_NEGATION_REPO) + self.model = RobertaForTokenClassification.from_pretrained(TRANSFORMER_NEGATION_REPO) @property def qualifier_factories(self) -> dict[str, QualifierFactory]: @@ -151,3 +159,133 @@ def _detect_qualifiers(self, doc: Doc): ent, self.qualifier_factories["Negation"].create("Unknown", prob=prob), ) + +####### + +@Language.factory( + name="clinlp_experiencer_transformer", + requires=["doc.ents"], + assigns=[f"span._.{ATTR_QUALIFIERS}"], + default_config=_defaults_experiencer_transformer, +) +@clinlp_autocomponent +class ExperiencerTransformer(QualifierDetector): + def __init__( + self, + nlp: Language, + token_window: int = _defaults_experiencer_transformer["token_window"], + strip_entities: bool = _defaults_experiencer_transformer["strip_entities"], + placeholder: Optional[str] = _defaults_experiencer_transformer["placeholder"], + probas_aggregator: Callable = _defaults_experiencer_transformer[ + "probas_aggregator" + ], + patient_threshold: float = _defaults_experiencer_transformer[ + "patient_threshold" + ], + other_threshold: float = _defaults_experiencer_transformer[ + "other_threshold" + ], + ) -> None: + self.nlp = nlp + self.token_window = token_window + self.strip_entities = strip_entities + self.placeholder = placeholder + self.probas_aggregator = probas_aggregator + self.patient_threshold = patient_threshold + self.other_threshold = other_threshold + + self.tokenizer = AutoTokenizer.from_pretrained(TRANSFORMER_EXPERIENCER_REPO) + self.model = RobertaForTokenClassification.from_pretrained(TRANSFORMER_EXPERIENCER_REPO) + + @property + def qualifier_factories(self) -> dict[str, QualifierFactory]: + return { + "Experiencer": QualifierFactory( + "Experiencer", ["Patient", "Unknown", "Other"], default="Other" + ) + } + + @staticmethod + def _get_ent_window(ent: Span, token_window: int) -> Tuple[str, int, int]: + start_token_i = max(0, ent.start - token_window) + end_token_i = min(len(ent.doc), ent.end + token_window) + + text_span = ent.doc[start_token_i:end_token_i] + + ent_start_char = ent.start_char - text_span.start_char + ent_end_char = ent.end_char - text_span.start_char + + return str(text_span), ent_start_char, ent_end_char + + @staticmethod + def _trim_ent_boundaries( + text: str, ent_start_char: int, ent_end_char: int + ) -> Tuple[str, int, int]: + entity = text[ent_start_char:ent_end_char] + + ent_start_char += len(entity) - len(entity.lstrip()) + ent_end_char -= len(entity) - len(entity.rstrip()) + + return text, ent_start_char, ent_end_char + + @staticmethod + def _fill_ent_placeholder( + text: str, ent_start_char: int, ent_end_char: int, placeholder: str + ) -> Tuple[str, int, int]: + text = text[0:ent_start_char] + placeholder + text[ent_end_char:] + ent_end_char = ent_start_char + len(placeholder) + + return text, ent_start_char, ent_end_char + + def _get_experiencer_prob( + self, + text: str, + ent_start_char: int, + ent_end_char: int, + probas_aggregator: Callable, + ) -> float: + inputs = self.tokenizer(text, return_tensors="pt") + output = self.model.forward(inputs["input_ids"]) + probas = torch.nn.functional.softmax(output.logits[0], dim=1).detach().numpy() + + start_token = inputs.char_to_token(ent_start_char) + end_token = inputs.char_to_token(ent_end_char - 1) + + return probas_aggregator( + pos[0] + pos[2] for pos in probas[start_token : end_token + 1] + ) + + def _detect_qualifiers(self, doc: Doc): + for ent in doc.ents: + text, ent_start_char, ent_end_char = self._get_ent_window( + ent, token_window=self.token_window + ) + + if self.strip_entities: + text, ent_start_char, ent_end_char = self._trim_ent_boundaries( + text, ent_start_char, ent_end_char + ) + + if self.placeholder is not None: + text, ent_start_char, ent_end_char = self._fill_ent_placeholder( + text, ent_start_char, ent_end_char, placeholder=self.placeholder + ) + + prob = self._get_experiencer_prob( + text, + ent_start_char, + ent_end_char, + probas_aggregator=self.probas_aggregator, + ) + + if prob > self.patient_threshold: + self.add_qualifier_to_ent( + ent, + self.qualifier_factories["Experiencer"].create("Patient", prob=prob), + ) + elif prob > self.other_threshold: + self.add_qualifier_to_ent( + ent, + self.qualifier_factories["Experiencer"].create("Unknown", prob=prob), + ) + diff --git a/tests/data/qualifier_cases.json b/tests/data/qualifier_cases.json index be7820b..526a687 100644 --- a/tests/data/qualifier_cases.json +++ b/tests/data/qualifier_cases.json @@ -501,7 +501,7 @@ "Plausibility.Hypothetical", "Negation.Affirmed", "Temporality.Current", - "Experiencer.Patient" + "Experiencer.Other" ] } ] diff --git a/tests/regression/qualifier/test_regression_transformer_experiencer.py b/tests/regression/qualifier/test_regression_transformer_experiencer.py new file mode 100644 index 0000000..ae5fb9b --- /dev/null +++ b/tests/regression/qualifier/test_regression_transformer_experiencer.py @@ -0,0 +1,56 @@ +import json + +import pytest +import spacy + +import clinlp # noqa: F401 +from clinlp.qualifier.qualifier import ATTR_QUALIFIERS_STR + + +@pytest.fixture() +def nlp(): + nlp = spacy.blank("clinlp") + nlp.add_pipe("clinlp_sentencizer") + + # ruler + ruler = nlp.add_pipe("entity_ruler") + ruler.add_patterns([{"label": "named_entity", "pattern": "ENTITY"}]) + + # recognizer + _ = nlp.add_pipe( + "clinlp_experiencer_transformer", config={"token_window": 32, "placeholder": "X"} + ) + + return nlp + +class TestRegressionTransformer: + def test_qualifier_cases(self, nlp): + with open("tests/data/qualifier_cases.json", "rb") as file: + data = json.load(file) + + incorrect_ents = set() + + for example in data["examples"]: + doc = nlp(example["text"]) + + assert len(example["ents"]) == len(doc.ents) + + for predicted_ent, example_ent in zip(doc.ents, example["ents"]): + try: + assert predicted_ent.start == example_ent["start"] + assert predicted_ent.end == example_ent["end"] + assert str(predicted_ent) == example_ent["text"] + assert getattr(predicted_ent._, ATTR_QUALIFIERS_STR).issubset( + example_ent["qualifiers"] + ) + except AssertionError: + print( + f"Incorrect (#{example_ent['ent_id']}): " + f"text=" + f"{example['text']}, example_ent={example_ent}, " + f"predicted qualifiers=" + f"{getattr(predicted_ent._, ATTR_QUALIFIERS_STR)}" + ) + incorrect_ents.add(example_ent["ent_id"]) + + assert incorrect_ents == {9} \ No newline at end of file diff --git a/tests/regression/qualifier/test_regression_transformer.py b/tests/regression/qualifier/test_regression_transformer_negation.py similarity index 99% rename from tests/regression/qualifier/test_regression_transformer.py rename to tests/regression/qualifier/test_regression_transformer_negation.py index 200c662..05d3d8b 100644 --- a/tests/regression/qualifier/test_regression_transformer.py +++ b/tests/regression/qualifier/test_regression_transformer_negation.py @@ -23,7 +23,6 @@ def nlp(): return nlp - class TestRegressionTransformer: def test_qualifier_cases(self, nlp): with open("tests/data/qualifier_cases.json", "rb") as file: diff --git a/tests/unit/qualifier/test_transformer.py b/tests/unit/qualifier/test_transformer.py index d70f252..6aba31e 100644 --- a/tests/unit/qualifier/test_transformer.py +++ b/tests/unit/qualifier/test_transformer.py @@ -4,7 +4,7 @@ import spacy import clinlp # noqa -from clinlp.qualifier import NegationTransformer +from clinlp.qualifier import NegationTransformer, ExperiencerTransformer from clinlp.qualifier.qualifier import ATTR_QUALIFIERS_STR @@ -121,3 +121,104 @@ def test_detect_qualifiers_without_negation(self, nlp): assert len(doc.ents) == 1 assert getattr(doc.ents[0]._, ATTR_QUALIFIERS_STR) == {"Negation.Affirmed"} + + +class TestExperiencerTransformer: + def test_get_ent_window(self, nlp, text): + doc = nlp(text) + span = doc.ents[0] + n = ExperiencerTransformer(nlp=nlp) + + assert n._get_ent_window(span, token_window=1) == ("geen SYMPTOOM,", 5, 13) + assert n._get_ent_window(span, token_window=2) == ( + "had geen SYMPTOOM, ondanks", + 9, + 17, + ) + assert n._get_ent_window(span, token_window=32) == ( + "De patient had geen SYMPTOOM, ondanks dat zij dit eerder wel had.", + 20, + 28, + ) + + def test_trim_ent_boundaries(self, nlp): + n = ExperiencerTransformer(nlp=nlp) + + assert n._trim_ent_boundaries("geen SYMPTOOM,", 5, 13) == ( + "geen SYMPTOOM,", + 5, + 13, + ) + assert n._trim_ent_boundaries("geen SYMPTOOM,", 4, 13) == ( + "geen SYMPTOOM,", + 5, + 13, + ) + assert n._trim_ent_boundaries("had geen SYMPTOOM, ondanks", 8, 17) == ( + "had geen SYMPTOOM, ondanks", + 9, + 17, + ) + assert n._trim_ent_boundaries("had geen SYMPTOOM, ondanks", 8, 19) == ( + "had geen SYMPTOOM, ondanks", + 9, + 18, + ) + + def test_fill_ent_placeholder(self, nlp): + n = ExperiencerTransformer(nlp=nlp) + + assert n._fill_ent_placeholder( + "geen SYMPTOOM,", 5, 13, placeholder="SYMPTOOM" + ) == ("geen SYMPTOOM,", 5, 13) + assert n._fill_ent_placeholder("geen SYMPTOOM,", 5, 13, placeholder="X") == ( + "geen X,", + 5, + 6, + ) + + def test_get_negation_prob(self, nlp): + n = ExperiencerTransformer(nlp=nlp) + + assert ( + n._get_experiencer_prob( + text="familiaire aandoening,", + ent_start_char=11, + ent_end_char=21, + probas_aggregator=statistics.mean, + ) + < 0.1 + ) + assert ( + n._get_experiencer_prob( + text="patient heeft aandoening,", + ent_start_char=14, + ent_end_char=24, + probas_aggregator=statistics.mean, + ) + > 0.9 + ) + + def test_detect_qualifiers_1(self, nlp): + n = ExperiencerTransformer(nlp=nlp, token_window=32, placeholder="X") + doc = nlp("De patient had geen last van SYMPTOOM.") + n(doc) + + assert len(doc.ents) == 1 + assert getattr(doc.ents[0]._, ATTR_QUALIFIERS_STR) == {"Experiencer.Patient"} + + def test_detect_qualifiers_small_window(self, nlp): + n = ExperiencerTransformer(nlp=nlp, token_window=1, placeholder="X") + doc = nlp("De patient had geen last van SYMPTOOM.") + n(doc) + + assert len(doc.ents) == 1 + assert getattr(doc.ents[0]._, ATTR_QUALIFIERS_STR) == {"Experiencer.Patient"} + + def test_detect_qualifiers_referring_to_other(self, nlp): + n = ExperiencerTransformer(nlp=nlp, token_window=32, placeholder="X") + doc = nlp("De broer van de patient had last van SYMPTOOM.") + n(doc) + + assert len(doc.ents) == 1 + assert getattr(doc.ents[0]._, ATTR_QUALIFIERS_STR) == {"Experiencer.Other"} \ No newline at end of file