-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support validating nested list of dictionary as input, make base clas… #7
Changes from 6 commits
bc26b6c
7f60c80
058ff5e
2f3e77f
12f2e05
e0969b8
4e9e768
7a9b695
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
DNAC_SDK_IS_INSTALLED = True | ||
from ansible.module_utils._text import to_native | ||
from ansible.module_utils.common import validation | ||
from abc import ABC, abstractmethod | ||
try: | ||
import logging | ||
except ImportError: | ||
|
@@ -26,27 +27,74 @@ | |
import inspect | ||
|
||
|
||
class DnacBase: | ||
class DnacBase(ABC): | ||
|
||
"""Class contains members which can be reused for all intent modules""" | ||
|
||
def __init__(self, module): | ||
self.module = module | ||
self.params = module.params | ||
self.config = copy.deepcopy(module.params.get("config")) | ||
self.have_create = {} | ||
self.want_create = {} | ||
self.have = {} | ||
self.want = {} | ||
self.validated_config = [] | ||
self.msg = "" | ||
self.status = "success" | ||
dnac_params = self.get_dnac_params(self.params) | ||
self.dnac = DNACSDK(params=dnac_params) | ||
self.dnac_apply = {'exec': self.dnac._exec} | ||
self.get_diff_state_apply = {'merged': self.get_diff_merged, | ||
'deleted': self.get_diff_deleted} | ||
'deleted': self.get_diff_deleted, | ||
'replaced': self.get_diff_replaced, | ||
'overridden': self.get_diff_overridden, | ||
'gathered': self.get_diff_gathered, | ||
'rendered': self.get_diff_rendered, | ||
'parsed': self.get_diff_parsed | ||
} | ||
self.dnac_log = dnac_params.get("dnac_log") | ||
self.log(str(dnac_params)) | ||
self.supported_states = ["merged", "deleted"] | ||
log(str(dnac_params)) | ||
self.supported_states = ["merged", "deleted", "replaced", "overridden", "gathered", "rendered", "parsed"] | ||
self.result = {"changed": False, "diff": [], "response": [], "warnings": []} | ||
|
||
@abstractmethod | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to write function body for abstractmethod? What is the use of it? Anyhow we are going to override it in base class.. |
||
def validate_input(self): | ||
pass | ||
|
||
def get_diff_merged(self): | ||
# Implement logic to merge the resource configuration | ||
self.merged = True | ||
return self | ||
|
||
def get_diff_deleted(self): | ||
# Implement logic to delete the resource | ||
self.deleted = True | ||
return self | ||
|
||
def get_diff_replaced(self): | ||
# Implement logic to replace the resource | ||
self.replaced = True | ||
return self | ||
|
||
def get_diff_overridden(self): | ||
# Implement logic to overwrite the resource | ||
self.overridden = True | ||
return self | ||
|
||
def get_diff_gathered(self): | ||
# Implement logic to gather data about the resource | ||
self.gathered = True | ||
return self | ||
|
||
def get_diff_rendered(self): | ||
# Implement logic to render a configuration template | ||
self.rendered = True | ||
return self | ||
|
||
def get_diff_parsed(self): | ||
# Implement logic to parse a configuration file | ||
self.parsed = True | ||
return True | ||
|
||
def log(self, message, frameIncrement=0): | ||
"""Log messages into dnac.log file""" | ||
|
||
|
@@ -97,8 +145,8 @@ def get_task_details(self, task_id): | |
def reset_values(self): | ||
"""Reset all neccessary attributes to default values""" | ||
|
||
self.have_create.clear() | ||
self.want_create.clear() | ||
self.have.clear() | ||
self.want.clear() | ||
|
||
|
||
def log(msg, frameIncrement=0): | ||
|
@@ -203,6 +251,167 @@ def dnac_argument_spec(): | |
return argument_spec | ||
|
||
|
||
def validate_str(item, param_spec, param_name, invalid_params): | ||
""" | ||
This function checks that the input `item` is a valid string and confirms to | ||
the constraints specified in `param_spec`. If the string is not valid or does | ||
not meet the constraints, an error message is added to `invalid_params`. | ||
|
||
Args: | ||
item (str): The input string to be validated. | ||
param_spec (dict): The parameter's specification, including validation constraints. | ||
param_name (str): The name of the parameter being validated. | ||
invalid_params (list): A list to collect validation error messages. | ||
|
||
Returns: | ||
str: The validated and possibly normalized string. | ||
|
||
Example `param_spec`: | ||
{ | ||
"type": "str", | ||
"length_max": 255 # Optional: maximum allowed length | ||
} | ||
""" | ||
|
||
item = validation.check_type_str(item) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we write Doc string for this new API ? |
||
if param_spec.get("length_max"): | ||
if 1 <= len(item) <= param_spec.get("length_max"): | ||
return item | ||
else: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need else here as we returning from if block... |
||
invalid_params.append( | ||
"{0}:{1} : The string exceeds the allowed " | ||
"range of max {2} char".format(param_name, item, param_spec.get("length_max")) | ||
) | ||
return item | ||
|
||
|
||
def validate_int(item, param_spec, param_name, invalid_params): | ||
""" | ||
This function checks that the input `item` is a valid integer and conforms to | ||
the constraints specified in `param_spec`. If the integer is not valid or does | ||
not meet the constraints, an error message is added to `invalid_params`. | ||
|
||
Args: | ||
item (int): The input integer to be validated. | ||
param_spec (dict): The parameter's specification, including validation constraints. | ||
param_name (str): The name of the parameter being validated. | ||
invalid_params (list): A list to collect validation error messages. | ||
|
||
Returns: | ||
int: The validated integer. | ||
|
||
Example `param_spec`: | ||
{ | ||
"type": "int", | ||
"range_min": 1, # Optional: minimum allowed value | ||
"range_max": 100 # Optional: maximum allowed value | ||
} | ||
""" | ||
|
||
item = validation.check_type_int(item) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doc string for this new API |
||
min_value = 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we pass this min_value as the last argument and set 1 as default. def validate_int(item, param_spec, param_name, invalid_params, min_value = 1): |
||
if param_spec.get("range_min") is not None: | ||
min_value = param_spec.get("range_min") | ||
if param_spec.get("range_max"): | ||
if min_value <= item <= param_spec.get("range_max"): | ||
return item | ||
else: | ||
invalid_params.append( | ||
"{0}:{1} : The item exceeds the allowed " | ||
"range of max {2}".format(param_name, item, param_spec.get("range_max")) | ||
) | ||
return item | ||
|
||
|
||
def validate_bool(item, param_spec, param_name, invalid_params): | ||
""" | ||
This function checks that the input `item` is a valid boolean value. If it does | ||
not represent a valid boolean value, an error message is added to `invalid_params`. | ||
|
||
Args: | ||
item (bool): The input boolean value to be validated. | ||
param_spec (dict): The parameter's specification, including validation constraints. | ||
param_name (str): The name of the parameter being validated. | ||
invalid_params (list): A list to collect validation error messages. | ||
|
||
Returns: | ||
bool: The validated boolean value. | ||
""" | ||
|
||
return validation.check_type_bool(item) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doc String for this API |
||
|
||
|
||
def validate_list(item, param_spec, param_name, invalid_params): | ||
""" | ||
This function checks if the input `item` is a valid list based on the specified `param_spec`. | ||
It also verifies that the elements of the list match the expected data type specified in the | ||
`param_spec`. If any validation errors occur, they are appended to the `invalid_params` list. | ||
|
||
Args: | ||
item (list): The input list to be validated. | ||
param_spec (dict): The parameter's specification, including validation constraints. | ||
param_name (str): The name of the parameter being validated. | ||
invalid_params (list): A list to collect validation error messages. | ||
|
||
Returns: | ||
list: The validated list, potentially normalized based on the specification. | ||
""" | ||
|
||
try: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doc String for this API .. |
||
if param_spec.get("type") == type(item).__name__: | ||
keys_list = [] | ||
for dict_key in param_spec: | ||
keys_list.append(dict_key) | ||
if len(keys_list) == 1: | ||
return validation.check_type_list(item) | ||
|
||
temp_dict = {keys_list[1]: param_spec[keys_list[1]]} | ||
try: | ||
if param_spec['elements']: | ||
get_spec_type = param_spec['type'] | ||
get_spec_element = param_spec['elements'] | ||
if type(item).__name__ == get_spec_type: | ||
for element in item: | ||
if type(element).__name__ != get_spec_element: | ||
invalid_params.append( | ||
"{0} is not of the same datatype as expected which is {1}".format(element, get_spec_element) | ||
) | ||
else: | ||
invalid_params.append( | ||
"{0} is not of the same datatype as expected which is {1}".format(item, get_spec_type) | ||
) | ||
except Exception as e: | ||
item, list_invalid_params = validate_list_of_dicts(item, temp_dict) | ||
invalid_params.extend(list_invalid_params) | ||
else: | ||
invalid_params.append("{0} : is not a valid list".format(item)) | ||
except Exception as e: | ||
invalid_params.append("{0} : comes into the exception".format(e)) | ||
|
||
return item | ||
|
||
|
||
def validate_dict(item, param_spec, param_name, invalid_params): | ||
""" | ||
This function checks if the input `item` is a valid dictionary based on the specified `param_spec`. | ||
If the dictionary does not match the expected data type specified in the `param_spec`, | ||
a validation error is appended to the `invalid_params` list. | ||
|
||
Args: | ||
item (dict): The input dictionary to be validated. | ||
param_spec (dict): The parameter's specification, including validation constraints. | ||
param_name (str): The name of the parameter being validated. | ||
invalid_params (list): A list to collect validation error messages. | ||
|
||
Returns: | ||
dict: The validated dictionary. | ||
""" | ||
|
||
if param_spec.get("type") != type(item).__name__: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doc String for this new API |
||
invalid_params.append("{0} : is not a valid dictionary".format(item)) | ||
return validation.check_type_dict(item) | ||
|
||
|
||
def validate_list_of_dicts(param_list, spec, module=None): | ||
"""Validate/Normalize playbook params. Will raise when invalid parameters found. | ||
param_list: a playbook parameter list of dicts | ||
|
@@ -211,11 +420,17 @@ def validate_list_of_dicts(param_list, spec, module=None): | |
foo=dict(type='str', default='bar')) | ||
return: list of normalized input data | ||
""" | ||
|
||
v = validation | ||
normalized = [] | ||
invalid_params = [] | ||
|
||
for list_entry in param_list: | ||
valid_params_dict = {} | ||
if not spec: | ||
# Handle the case when spec becomes empty but param list is still there | ||
invalid_params.append("No more spec to validate, but parameters remain") | ||
break | ||
for param in spec: | ||
item = list_entry.get(param) | ||
log(str(item)) | ||
|
@@ -226,58 +441,41 @@ def validate_list_of_dicts(param_list, spec, module=None): | |
) | ||
else: | ||
item = spec[param].get("default") | ||
valid_params_dict[param] = item | ||
continue | ||
data_type = spec[param].get("type") | ||
switch = { | ||
"str": validate_str, | ||
"int": validate_int, | ||
"bool": validate_bool, | ||
"list": validate_list, | ||
"dict": validate_dict, | ||
} | ||
|
||
validator = switch.get(data_type) | ||
if validator: | ||
item = validator(item, spec[param], param, invalid_params) | ||
else: | ||
type = spec[param].get("type") | ||
if type == "str": | ||
item = v.check_type_str(item) | ||
if spec[param].get("length_max"): | ||
if 1 <= len(item) <= spec[param].get("length_max"): | ||
pass | ||
else: | ||
invalid_params.append( | ||
"{0}:{1} : The string exceeds the allowed " | ||
"range of max {2} char".format( | ||
param, item, spec[param].get("length_max") | ||
) | ||
) | ||
elif type == "int": | ||
item = v.check_type_int(item) | ||
min_value = 1 | ||
if spec[param].get("range_min") is not None: | ||
min_value = spec[param].get("range_min") | ||
if spec[param].get("range_max"): | ||
if min_value <= item <= spec[param].get("range_max"): | ||
pass | ||
else: | ||
invalid_params.append( | ||
"{0}:{1} : The item exceeds the allowed " | ||
"range of max {2}".format( | ||
param, item, spec[param].get("range_max") | ||
) | ||
) | ||
elif type == "bool": | ||
item = v.check_type_bool(item) | ||
elif type == "list": | ||
item = v.check_type_list(item) | ||
elif type == "dict": | ||
item = v.check_type_dict(item) | ||
|
||
choice = spec[param].get("choices") | ||
if choice: | ||
if item not in choice: | ||
invalid_params.append( | ||
"{0} : Invalid choice provided".format(item) | ||
) | ||
invalid_params.append( | ||
"{0}:{1} : Unsupported data type {2}.".format(param, item, data_type) | ||
) | ||
|
||
no_log = spec[param].get("no_log") | ||
if no_log: | ||
if module is not None: | ||
module.no_log_values.add(item) | ||
else: | ||
msg = "\n\n'{0}' is a no_log parameter".format(param) | ||
msg += "\nAnsible module object must be passed to this " | ||
msg += "\nfunction to ensure it is not logged\n\n" | ||
raise Exception(msg) | ||
choice = spec[param].get("choices") | ||
if choice: | ||
if item not in choice: | ||
invalid_params.append( | ||
"{0} : Invalid choice provided".format(item) | ||
) | ||
|
||
no_log = spec[param].get("no_log") | ||
if no_log: | ||
if module is not None: | ||
module.no_log_values.add(item) | ||
else: | ||
msg = "\n\n'{0}' is a no_log parameter".format(param) | ||
msg += "\nAnsible module object must be passed to this " | ||
msg += "\nfunction to ensure it is not logged\n\n" | ||
raise Exception(msg) | ||
|
||
valid_params_dict[param] = item | ||
normalized.append(valid_params_dict) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also write for other states?
"merged" : self.get_diff_merged
"replaced" : self.get_diff_replaced
and all..
Also write member functions for all the states..