From 00ee21d1bf0489b344fb58d092cc57154a3983e3 Mon Sep 17 00:00:00 2001 From: Matthew Richards Date: Tue, 4 Jan 2022 10:38:59 +0000 Subject: [PATCH] refactor: move `get_icat_mapping()` into `PaNOSCMappings` #261 - I've moved the code which deals with the parameter value field name list away from this function to make the function more generic/reusable. That code has been moved to applying the WHERE filter, where it'll be used. The mapping will not be reached by an include filter (under a valid API request) because the parameter values are for a field name/attribute, not a related entity --- datagateway_api/src/search_api/filters.py | 83 +++++-------------- .../src/search_api/panosc_mappings.py | 46 +++++++++- 2 files changed, 65 insertions(+), 64 deletions(-) diff --git a/datagateway_api/src/search_api/filters.py b/datagateway_api/src/search_api/filters.py index e206084f..793e9956 100644 --- a/datagateway_api/src/search_api/filters.py +++ b/datagateway_api/src/search_api/filters.py @@ -42,9 +42,28 @@ def apply_filter(self, query): # Convert PaNOSC field names to ICAT field names for field_name in panosc_field_names: - panosc_mapping_name, icat_field_name = self.get_icat_mapping( + panosc_mapping_name, icat_field_name = mappings.get_icat_mapping( panosc_mapping_name, field_name, ) + + # An edge case for ICAT has been somewhat hardcoded here, to deal with + # ICAT's different parameter value field names. The following mapping is + # assumed (where order matters): + # {"Parameter": {"value": ["numericValue", "stringValue", "dateTimeValue"]}} + if isinstance(icat_field_name, list): + if isinstance(self.value, int) or isinstance(self.value, float): + icat_field_name = icat_field_name[0] + elif isinstance(self.value, datetime): + icat_field_name = icat_field_name[2] + elif isinstance(self.value, str): + if DateHandler.is_str_a_date(self.value): + icat_field_name = icat_field_name[2] + else: + icat_field_name = icat_field_name[1] + else: + self.value = str(self.value) + icat_field_name = icat_field_name[1] + icat_field_names.append(icat_field_name) log.debug( @@ -60,68 +79,6 @@ def apply_filter(self, query): return super().apply_filter(query.icat_query.query) - def get_icat_mapping(self, panosc_entity_name, field_name): - """ - This function searches the PaNOSC mappings (from `search_api_mapping.json`, - maintained/stored by :class:`PaNOSCMappings`) and retrieves the ICAT translation - from the PaNOSC input. Fields in the same entity can be found, as well as fields - from related entities (e.g. Dataset.files.path) via recursion inside this - function. - - An edge case for ICAT has been somewhat hardcoded into this function to dea - with ICAT's different parameter value field names. The following mapping is - assumed (where order matters): - {"Parameter": {"value": ["numericValue", "stringValue", "dateTimeValue"]}} - - :param panosc_entity_name: A PaNOSC entity name e.g. "Dataset" - :type panosc_entity_name: :class:`str` - :param field_name: PaNOSC field name to fetch the ICAT version of e.g. "name" - :type field_name: :class:`str` - :return: Tuple containing the PaNOSC entity name (which will change from the - input if a related entity is found) and the ICAT field name - mapping/translation from the PaNOSC data model - :raises FilterError: If a valid mapping cannot be found - """ - - log.info( - "Searching mapping file to find ICAT translation for %s", - f"{panosc_entity_name}.{field_name}", - ) - - try: - icat_mapping = mappings.mappings[panosc_entity_name][field_name] - log.debug("ICAT mapping/translation found: %s", icat_mapping) - except KeyError as e: - raise FilterError(f"Bad PaNOSC to ICAT mapping: {e.args}") - - if isinstance(icat_mapping, str): - # Field name - icat_field_name = icat_mapping - elif isinstance(icat_mapping, dict): - # Relation - JSON format: {PaNOSC entity name: ICAT related field name} - panosc_entity_name = list(icat_mapping.keys())[0] - icat_field_name = icat_mapping[panosc_entity_name] - elif isinstance(icat_mapping, list): - # Edge case for ICAT's different parameter value field names - if isinstance(self.value, int) or isinstance(self.value, float): - icat_field_name = icat_mapping[0] - elif isinstance(self.value, datetime): - icat_field_name = icat_mapping[2] - elif isinstance(self.value, str): - if DateHandler.is_str_a_date(self.value): - icat_field_name = icat_mapping[2] - else: - icat_field_name = icat_mapping[1] - else: - self.value = str(self.value) - icat_field_name = icat_mapping[1] - - log.debug( - "Output of get_icat_mapping(): %s, %s", panosc_entity_name, icat_field_name, - ) - - return (panosc_entity_name, icat_field_name) - def __str__(self): """ String representation which is also used to apply WHERE filters that are inside diff --git a/datagateway_api/src/search_api/panosc_mappings.py b/datagateway_api/src/search_api/panosc_mappings.py index 60e6b5ba..2b812b84 100644 --- a/datagateway_api/src/search_api/panosc_mappings.py +++ b/datagateway_api/src/search_api/panosc_mappings.py @@ -2,7 +2,7 @@ import logging from pathlib import Path -from datagateway_api.src.common.exceptions import SearchAPIError +from datagateway_api.src.common.exceptions import FilterError, SearchAPIError log = logging.getLogger() @@ -19,6 +19,50 @@ def __init__( except IOError as e: raise SearchAPIError(e) + def get_icat_mapping(self, panosc_entity_name, field_name): + """ + This function searches the PaNOSC mappings and retrieves the ICAT translation + from the PaNOSC input. Fields in the same entity can be found, as well as fields + from related entities (e.g. Dataset.files.path) via recursion inside this + function. + + :param panosc_entity_name: A PaNOSC entity name e.g. "Dataset" + :type panosc_entity_name: :class:`str` + :param field_name: PaNOSC field name to fetch the ICAT version of e.g. "name" + :type field_name: :class:`str` + :return: Tuple containing the PaNOSC entity name (which will change from the + input if a related entity is found) and the ICAT field name + mapping/translation from the PaNOSC data model + :raises FilterError: If a valid mapping cannot be found + """ + + log.info( + "Searching mapping file to find ICAT translation for %s", + f"{panosc_entity_name}.{field_name}", + ) + + try: + icat_mapping = mappings.mappings[panosc_entity_name][field_name] + log.debug("ICAT mapping/translation found: %s", icat_mapping) + except KeyError as e: + # TODO - do we want to change the exception? + raise FilterError(f"Bad PaNOSC to ICAT mapping: {e.args}") + + if isinstance(icat_mapping, str): + # Field name + icat_field_name = icat_mapping + elif isinstance(icat_mapping, dict): + # Relation - JSON format: {PaNOSC entity name: ICAT related field name} + panosc_entity_name = list(icat_mapping.keys())[0] + icat_field_name = icat_mapping[panosc_entity_name] + elif isinstance(icat_mapping, list): + # If a list of ICAT field names are found, this is likely to be a specific + # need for that entity (e.g. parameter values). Dealing with this should be + # delegated to other code in this repo so the entire list is returned here + icat_field_name = icat_mapping + + return (panosc_entity_name, icat_field_name) + def get_panosc_related_entity_name( self, panosc_entity_name, panosc_related_field_name, ):