Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added exception #1172

Merged
merged 1 commit into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
289 changes: 148 additions & 141 deletions app/apps/permits/api_queries_decos_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def map_data_on_conf_keys(self, data, conf):
)

def datestring_to_timestamp(self, datestring):
if type(datestring) == str and re.match(
if type(datestring) is str and re.match(
r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})", datestring
):
return datetime.timestamp(
Expand Down Expand Up @@ -241,11 +241,8 @@ def _get_decos_folder(self, decos_object):
else:
return get_decos_join_mock_folder_fields_address_a()

def get_decos_entry_by_bag_id(self, bag_id, dt):
"""Get simple view of the important permits"""

response = {}

def initialize_decos_join_conf(self):
# Setup decos_join_conf_object
decos_join_conf_object = DecosJoinConf()
decos_join_conf_object.set_default_expression(
settings.DECOS_JOIN_DEFAULT_PERMIT_VALID_EXPRESSION
Expand All @@ -256,161 +253,171 @@ def get_decos_entry_by_bag_id(self, bag_id, dt):
decos_join_conf_object.set_default_field_mapping(
settings.DECOS_JOIN_DEFAULT_FIELD_MAPPING
)

decos_join_conf_object.add_conf(settings.DECOS_JOIN_DEFAULT_PERMIT_VALID_CONF)
return decos_join_conf_object

def extract_permits(self, decos_join_conf_object):
# Create a permits list
permits = [
{
"permit_granted": "UNKNOWN",
"permit_type": v.get(DecosJoinConf.PERMIT_TYPE),
}
for v in decos_join_conf_object
]
return permits

# A "Omzettingsvergunning" should NOT have an end date.
# If DATE_VALID_UNTIL is given from Decos, then this has been entered incorrectly.
def update_date_valid_until_for_omzettingsvergunning(self, permits):
for item in permits:
if (
item.get("permit_type") == "Omzettingsvergunning"
and item.get("details")
and item["details"].get("DATE_VALID_UNTIL")
):
item["details"]["DATE_VALID_UNTIL"] = None

def log_error_info(self, parent_key, folder, decos_join_conf_object):
logger.error("DECOS JOIN parent key not found in config")
logger.info("book key: %s" % parent_key)
logger.info("permit name: %s" % folder.get("fields", {}).get("text45"))
logger.info("permit result: %s" % folder.get("fields", {}).get("dfunction"))
logger.info("Config keys: %s" % decos_join_conf_object.get_book_keys())

def get_decos_entry_by_bag_id(self, bag_id, dt):
"""Get simple view of the important permits"""
response = {}
decos_join_conf_object = self.initialize_decos_join_conf()
permits = self.extract_permits(decos_join_conf_object)

# Initialize an empty list to hold new items
new_permits = []

response_decos_obj = self.get_decos_object_with_bag_id(bag_id)
response_decos_folder = (
self._get_decos_folder(response_decos_obj) if response_decos_obj else {}
)

response_decos_folder = {}
if response_decos_obj:
response_decos_folder = self._get_decos_folder(response_decos_obj)
if response_decos_folder:

# permits
for folder in response_decos_folder["content"]:

parent_key = folder.get("fields", {}).get("parentKey")
if parent_key in decos_join_conf_object.get_book_keys():
data = {}
conf = decos_join_conf_object.get_conf_by_book_key(parent_key)
permit_granted = decos_join_conf_object.expression_is_valid(
folder["fields"], conf, dt
)
data.update(
{
"permit_granted": "GRANTED"
if permit_granted
else "NOT_GRANTED",
"permit_type": conf.get(DecosJoinConf.PERMIT_TYPE),
"raw_data": folder["fields"],
"details": decos_join_conf_object.map_data_on_conf_keys(
folder["fields"], conf
),
}
)
permit_serializer = PermitSerializer(data=data)
if permit_serializer.is_valid():
for d in permits:
if d.get("permit_type") == conf.get(
DecosJoinConf.PERMIT_TYPE
):
current_permit_raw_data = d.get("raw_data", {})
# Check all Decos folders. Multiple permits are possible. Which one do you want to show?
# Check if there's already a permit saved with the same permit type AND raw data. Else: save this permit.
if current_permit_raw_data:
# Current permit data found so check which one is valid now.
now = datetime.now().isoformat()
next_permit_raw_data = (
permit_serializer.data.get("raw_data", {})
)
next_permit_valid_from = (
next_permit_raw_data.get("date6")
)
next_permit_valid_until = (
next_permit_raw_data.get("date7")
)
current_permit_valid_from = (
current_permit_raw_data.get("date6")
)
current_permit_valid_until = (
current_permit_raw_data.get("date7")
)

# Wrap datetime validation in a try-catch to prevent breaking code.
try:
# Is the current permit valid/active? => now must be between start and enddate.
# Check if the variables are not None before comparing.
if (
current_permit_valid_from is not None
and current_permit_valid_until
is not None
):
is_current_permit_valid = (
current_permit_valid_from <= now
and now
<= current_permit_valid_until
if response_decos_folder:
for folder in response_decos_folder["content"]:
parent_key = folder.get("fields", {}).get("parentKey")
if parent_key in decos_join_conf_object.get_book_keys():
data = {}
conf = decos_join_conf_object.get_conf_by_book_key(parent_key)
permit_granted = decos_join_conf_object.expression_is_valid(
folder["fields"], conf, dt
)
data.update(
{
"permit_granted": "GRANTED"
if permit_granted
else "NOT_GRANTED",
"permit_type": conf.get(DecosJoinConf.PERMIT_TYPE),
"raw_data": folder["fields"],
"details": decos_join_conf_object.map_data_on_conf_keys(
folder["fields"], conf
),
}
)
permit_serializer = PermitSerializer(data=data)
if permit_serializer.is_valid():
for d in permits:
if d.get("permit_type") == conf.get(
DecosJoinConf.PERMIT_TYPE
):
current_permit_raw_data = d.get("raw_data", {})
# Check all Decos folders. Multiple permits are possible. Which one do you want to show?
# Check if there's already a permit saved with the same permit type AND raw data. Else: save this permit.
if current_permit_raw_data:
# Current permit data found so check which one is valid now.
now = datetime.now().isoformat()
next_permit_raw_data = permit_serializer.data.get(
"raw_data", {}
)
next_permit_valid_from = next_permit_raw_data.get(
"date6"
)
next_permit_valid_until = next_permit_raw_data.get(
"date7"
)
current_permit_valid_from = (
current_permit_raw_data.get("date6")
)
current_permit_valid_until = (
current_permit_raw_data.get("date7")
)

# Wrap datetime validation in a try-catch to prevent breaking code.
try:
# Is the current permit valid/active? => now must be between start and enddate.
# Check if the variables are not None before comparing.
if (
current_permit_valid_from is not None
and current_permit_valid_until is not None
):
is_current_permit_valid = (
current_permit_valid_from <= now
and now <= current_permit_valid_until
)
else:
# Handle the case where one or both variables are None
is_current_permit_valid = False

# Is the next permit valid/active? => now must be between start and enddate.
# Check if the variables are not None before comparing.
if (
next_permit_valid_from is not None
and next_permit_valid_until is not None
):
is_next_permit_valid = (
next_permit_valid_from <= now
and now <= next_permit_valid_until
)
else:
# Handle the case where one or both variables are None
is_next_permit_valid = False

if is_next_permit_valid:
# Next permit is valid so this is the one users would like to see. Update permit data.
if is_current_permit_valid:
# Both current and next permits are valid.
# Show both permits so the Toezichthouder can decide.
# Add the next permit to the new_permits list for merging later.
new_permits.append(
permit_serializer.data
)
else:
# Handle the case where one or both variables are None
is_current_permit_valid = False

# Is the next permit valid/active? => now must be between start and enddate.
# Check if the variables are not None before comparing.
if (
next_permit_valid_from is not None
and next_permit_valid_until is not None
# Only the next permit is valid, update the data.
d.update(permit_serializer.data)

elif not is_current_permit_valid:
# Current permit and next permit are not valid.
if next_permit_valid_from > now:
# There's a future permit so show this to the user.
# This does not cover a use case with both current permit and next permit in the future.
# This is not a realistic use case because multiple future permits will not be issued.
d.update(permit_serializer.data)
elif (
next_permit_valid_from
> current_permit_valid_from
):
is_next_permit_valid = (
next_permit_valid_from <= now
and now <= next_permit_valid_until
)
else:
# Handle the case where one or both variables are None
is_next_permit_valid = False

if is_next_permit_valid:
# Next permit is valid so this is the one users would like to see. Update permit data.
if is_current_permit_valid:
# Both current and next permits are valid.
# Show both permits so the Toezichthouder can decide.
# Add the next permit to the new_permits list for merging later.
new_permits.append(
permit_serializer.data
)
else:
# Only the next permit is valid, update the data.
d.update(permit_serializer.data)

elif not is_current_permit_valid:
# Current permit and next permit are not valid.
if next_permit_valid_from > now:
# There's a future permit so show this to the user.
# This does not cover a use case with both current permit and next permit in the future.
# This is not a realistic use case because multiple future permits will not be issued.
d.update(permit_serializer.data)
elif (
next_permit_valid_from
> current_permit_valid_from
):
# Both permits are expired so show most recent one.
d.update(permit_serializer.data)

except Exception as e:
logger.error(
f"Decos permits could not validate datetimes: {e}"
)
# Both permits are expired so show most recent one.
d.update(permit_serializer.data)

else:
# There's NO permit with raw data so update.
d.update(permit_serializer.data)

else:
logger.error("DECOS JOIN parent key not found in config")
logger.info("book key: %s" % parent_key)
logger.info(
"permit name: %s" % folder.get("fields", {}).get("text45")
)
logger.info(
"permit result: %s"
% folder.get("fields", {}).get("dfunction")
)
logger.info(
"Config keys: %s" % decos_join_conf_object.get_book_keys()
)
except Exception as e:
logger.error(
f"Decos permits could not validate datetimes: {e}"
)
else:
# There's NO permit with raw data so update.
d.update(permit_serializer.data)
else:
self.log_error_info(parent_key, folder, decos_join_conf_object)

# Extend the original permits list with new items
permits.extend(new_permits)

# A "Omzettingsvergunning" should NOT have an end date.
self.update_date_valid_until_for_omzettingsvergunning(permits)
# Sort the list based on the "permit_type" key
sorted_permits = sorted(permits, key=lambda x: x["permit_type"])

Expand Down
1 change: 1 addition & 0 deletions app/apps/permits/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,4 @@ class PowerbrowserSerializer(serializers.Serializer):
einddatum = serializers.DateTimeField(allow_null=True)
vergunninghouder = serializers.CharField(allow_null=True)
initator = serializers.CharField(allow_null=True)
datuM_TOT = serializers.DateTimeField(allow_null=True)
Loading