Skip to content

Commit

Permalink
fix: lint
Browse files Browse the repository at this point in the history
  • Loading branch information
soleksy-splunk committed Feb 22, 2024
1 parent 8240afb commit 747e938
Showing 1 changed file with 113 additions and 33 deletions.
146 changes: 113 additions & 33 deletions splunk_add_on_ucc_framework/global_config_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,92 +560,172 @@ def _validate_groups(self) -> None:
f"Service {service['name']} uses group field {group_field} which is not defined in entity"
)

def _is_circular(self, mods, visited, all_entity_fields, current_field):
visited[current_field] = 'visited'
current_field_mods = next((mod for mod in mods if mod["fieldId"] == current_field), None)
def _is_circular(
self,
mods: List[Any],
visited: Dict[str, str],
all_entity_fields: List[Any],
current_field: str,
) -> Dict[str, str]:
"""
Checks if there is circular modification based on visited list and DFS algorithm
"""
DEAD_END = "dead_end"
VISITING = "visited"
visited[current_field] = VISITING

current_field_mods = next(
(mod for mod in mods if mod["fieldId"] == current_field), None
)
if current_field_mods is None:
# no more dependent modification fields
visited[current_field] = 'dead_end'
visited[current_field] = DEAD_END
return visited
else:
for influenced_field in current_field_mods["influenced_fields"]:
if influenced_field not in all_entity_fields:
raise GlobalConfigValidatorException(
"""Modification in field '{0}' for not existing field '{1}'""".format(current_field, influenced_field)
f"""Modification in field '{current_field}' for not existing field '{influenced_field}'"""
)
if influenced_field == current_field:
raise GlobalConfigValidatorException(
"""Field '{0}' tries to modify itself""".format(current_field)
f"""Field '{current_field}' tries to modify itself"""
)
if visited[influenced_field] == 'visited':
if visited[influenced_field] == VISITING:
raise GlobalConfigValidatorException(
"""Circular modifications for field '{0}' included in modifications in field '{1}'""".format(influenced_field, current_field)
f"""Circular modifications for field '{influenced_field}' included in modifications in field '{current_field}'"""
)
else:
visited = self._is_circular(mods, visited, all_entity_fields,influenced_field)
visited = self._is_circular(
mods, visited, all_entity_fields, influenced_field
)
# all of dependent modifications fields are dead_end
visited[current_field] = 'dead_end'
visited[current_field] = DEAD_END
return visited

def _check_if_cilcular(self, all_entity_fields, fields_with_mods, modification_dict):
def _check_if_cilcular(
self,
all_entity_fields: List[Any],
fields_with_mods: List[Any],
modifications: List[Any],
) -> None:
visited = {}

for field in all_entity_fields:
visited[field] = 'not_visited'
visited[field] = "not_visited"

for start_field in fields_with_mods:
#DFS algorithm for all fields with modifications
visited = self._is_circular(modification_dict, visited, all_entity_fields,start_field)
for start_field in fields_with_mods:
# DFS algorithm for all fields with modifications
visited = self._is_circular(
modifications, visited, all_entity_fields, start_field
)

def _get_mods_data_for_single_entity(self,all_fields,fields_with_mods,all_modifications,entity):
def _get_mods_data_for_single_entity(
self,
all_fields: List[Any],
fields_with_mods: List[Any],
all_modifications: List[Any],
entity: Dict[str, Any],
) -> List[Any]:
"""
Add modification entity data to lists and returns them
"""
all_fields.append(entity["field"])
if "modifyFieldsOnValue" in entity:
influenced_fields = set()
fields_with_mods.append(entity["field"])
for mods in entity["modifyFieldsOnValue"]:
for mod in mods["fieldsToModify"]:
influenced_fields.add(mod['fieldId'])
all_modifications.append({"fieldId":entity["field"], "influenced_fields":influenced_fields})
influenced_fields.add(mod["fieldId"])
all_modifications.append(
{"fieldId": entity["field"], "influenced_fields": influenced_fields}
)
return [all_fields, fields_with_mods, all_modifications]

def _get_all_modifiction_data(self, all_fields, fields_with_mods, all_modifications, entityCollector):
def _get_all_modifiction_data(
self,
all_fields: List[Any],
fields_with_mods: List[Any],
all_modifications: List[Any],
entityCollector: Dict[str, Any],
) -> List[Any]:
entities = entityCollector["entity"]
for entity in entities:
if entity['type'] == "oauth":
for type in entity['options']['auth_type']:
for oauthEntity in entity['options'][type]:
[all_fields, fields_with_mods, all_modifications] = self._get_mods_data_for_single_entity(all_fields, fields_with_mods, all_modifications, oauthEntity)
for entity in entities:
if entity["type"] == "oauth":
for oauthType in entity["options"]["auth_type"]:
for oauthEntity in entity["options"][oauthType]:
[
all_fields,
fields_with_mods,
all_modifications,
] = self._get_mods_data_for_single_entity(
all_fields, fields_with_mods, all_modifications, oauthEntity
)
else:
[all_fields, fields_with_mods, all_modifications] = self._get_mods_data_for_single_entity(all_fields, fields_with_mods, all_modifications, entity)
[
all_fields,
fields_with_mods,
all_modifications,
] = self._get_mods_data_for_single_entity(
all_fields, fields_with_mods, all_modifications, entity
)

return [fields_with_mods, all_modifications, all_fields]

def _validate_field_modifications(self) -> None:
"""
Validates:
Circular dependencies
If fields try modify itself
If fields try modify unexisting field
"""
pages = self._config["pages"]
configuration = pages["configuration"]
tabs = configuration["tabs"]

# lists initialised here as fields need to be unique across configuration page
fields_with_mods_config = []
all_modifications_config = []
all_fields_config = []
fields_with_mods_config: List[Any] = []
all_modifications_config: List[Any] = []
all_fields_config: List[str] = []

for tab in tabs:
[fields_with_mods_config, all_modifications_config, all_fields_config] = self._get_all_modifiction_data(all_fields_config,fields_with_mods_config,all_modifications_config,tab)
[
fields_with_mods_config,
all_modifications_config,
all_fields_config,
] = self._get_all_modifiction_data(
all_fields_config,
fields_with_mods_config,
all_modifications_config,
tab,
)

self._check_if_cilcular(all_fields_config, fields_with_mods_config, all_modifications_config)
self._check_if_cilcular(
all_fields_config, fields_with_mods_config, all_modifications_config
)

inputs = pages['inputs']
inputs = pages["inputs"]
services = inputs["services"]

# lists initialised here as fields need to be unique across inputs page
fields_with_mods_inputs = []
all_modifications_inputs = []
all_fields_inputs = []
for service in services:
[fields_with_mods_inputs, all_modifications_inputs, all_fields_inputs] = self._get_all_modifiction_data(all_fields_config,fields_with_mods_config,all_modifications_config,service)
[
fields_with_mods_inputs,
all_modifications_inputs,
all_fields_inputs,
] = self._get_all_modifiction_data(
all_fields_config,
fields_with_mods_config,
all_modifications_config,
service,
)

self._check_if_cilcular(all_fields_inputs, fields_with_mods_inputs, all_modifications_inputs)
self._check_if_cilcular(
all_fields_inputs, fields_with_mods_inputs, all_modifications_inputs
)

def validate(self) -> None:
self._validate_config_against_schema()
Expand Down

0 comments on commit 747e938

Please sign in to comment.