Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support validating nested list of dictionary as input, make base clas… #7

Merged
merged 8 commits into from
Sep 22, 2023
195 changes: 134 additions & 61 deletions plugins/module_utils/dnac.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
DNAC_SDK_IS_INSTALLED = True
from ansible.module_utils._text import to_native
from ansible.module_utils.common import validation
from abc import ABC, abstractmethod
try:
import logging
except ImportError:
Expand All @@ -25,15 +26,15 @@
import datetime
import inspect


class DnacBase:
class DnacBase(ABC):
"""Class contains members which can be reused for all intent modules"""

def __init__(self, module):
self.module = module
self.params = module.params
self.config = copy.deepcopy(module.params.get("config"))
self.have_create = {}
self.want_create = {}
self.have = {}
self.want = {}
self.validated_config = []
self.msg = ""
self.status = "success"
Expand All @@ -43,10 +44,17 @@ def __init__(self, module):
self.get_diff_state_apply = {'merged': self.get_diff_merged,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also write for other states?
"merged" : self.get_diff_merged
"replaced" : self.get_diff_replaced
and all..
Also write member functions for all the states..

'deleted': self.get_diff_deleted}
self.dnac_log = dnac_params.get("dnac_log")
self.log(str(dnac_params))
self.supported_states = ["merged", "deleted"]
log(str(dnac_params))
self.supported_states = ["merged", "deleted", "replaced", "overridden", "gathered", "rendered", "parsed"]
self.result = {"changed": False, "diff": [], "response": [], "warnings": []}

@abstractmethod
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to write function body for abstractmethod? What is the use of it? Anyhow we are going to override it in base class..

def validate_input(self):
if not self.config:
self.msg = "config not available in playbook for validattion"
self.status = "success"
return self

def log(self, message, frameIncrement=0):
"""Log messages into dnac.log file"""

Expand All @@ -57,7 +65,7 @@ def log(self, message, frameIncrement=0):
def check_return_status(self):
"""API to check the return status value and exit/fail the module"""

self.log("status: {0}, msg:{1}".format(self.status, self.msg), frameIncrement=1)
log("status: {0}, msg:{1}".format(self.status, self.msg), frameIncrement=1)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for changing this into self.log? Because if dnac.log is true, then only we call log API to print.

if "failed" in self.status:
self.module.fail_json(msg=self.msg, response=[])
elif "exited" in self.status:
Expand Down Expand Up @@ -88,7 +96,7 @@ def get_task_details(self, task_id):
params={"task_id": task_id},
)

self.log(str(response))
log(str(response))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for changing this into self.log? Because if dnac.log is true, then only we call log API to print.

if isinstance(response, dict):
result = response.get("response")

Expand All @@ -97,8 +105,8 @@ def get_task_details(self, task_id):
def reset_values(self):
"""Reset all neccessary attributes to default values"""

self.have_create.clear()
self.want_create.clear()
self.have.clear()
self.want.clear()


def log(msg, frameIncrement=0):
Expand Down Expand Up @@ -202,6 +210,83 @@ def dnac_argument_spec():
)
return argument_spec

def validate_str(item, param_spec, param_name, invalid_params):
item = validation.check_type_str(item)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write Doc string for this new API ?

if param_spec.get("length_max"):
if 1 <= len(item) <= param_spec.get("length_max"):
return item
else:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need else here as we returning from if block...

invalid_params.append(
"{0}:{1} : The string exceeds the allowed "
"range of max {2} char".format(
param_name, item, param_spec.get("length_max")
)
)
return item

def validate_int(item, param_spec, param_name, invalid_params):
item = validation.check_type_int(item)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc string for this new API

min_value = 1
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pass this min_value as the last argument and set 1 as default.

def validate_int(item, param_spec, param_name, invalid_params, min_value = 1):

if param_spec.get("range_min") is not None:
min_value = param_spec.get("range_min")
if param_spec.get("range_max"):
if min_value <= item <= param_spec.get("range_max"):
return item
else:
invalid_params.append(
"{0}:{1} : The item exceeds the allowed "
"range of max {2}".format(
param_name, item, param_spec.get("range_max")
)
)
return item

def validate_bool(item, param_spec, param_name, invalid_params):
return validation.check_type_bool(item)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc String for this API


def validate_list(item, param_spec, param_name, invalid_params):
try:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc String for this API ..

if param_spec.get("type") == type(item).__name__:
if param_spec:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need of this check.. If param_spec is already there and get the type before we check it..
As we have lot more statements in this if block, we can check with not equal and return in the beginning itself.

keys_list = []
for dict_key in param_spec:
keys_list.append(dict_key)
if len(keys_list) == 1:
return validation.check_type_list(item)
else:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we are returning from if block unconditional, do we need else here?

temp_dict = {keys_list[1]: param_spec[keys_list[1]]}
try:
if param_spec['elements']:
get_spec_type = param_spec['type']
get_spec_element = param_spec['elements']
if type(item).__name__ == get_spec_type:
for element in item:
if type(element).__name__ != get_spec_element:
invalid_params.append(
"{0} is not of the same datatype as expected which is {1}".format(
element, get_spec_element
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we close the braces in the same line itself becuase adding multiple new lines only for braces looks little messier ?

)
)
else:
invalid_params.append(
"{0} is not of the same datatype as expected which is {1}".format(
item, get_spec_type
)
)
except Exception as e:
item, list_invalid_params = validate_list_of_dicts(item, temp_dict)
invalid_params.extend(list_invalid_params)
else:
invalid_params.append("{0} : is not a valid list".format(item))
except Exception as e:
invalid_params.append("{0} : comes into the exception".format(e))
return item

def validate_dict(item, param_spec, param_name, invalid_params):
if param_spec.get("type") != type(item).__name__:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc String for this new API

invalid_params.append("{0} : is not a valid dictionary".format(item))
return validation.check_type_dict(item)


def validate_list_of_dicts(param_list, spec, module=None):
"""Validate/Normalize playbook params. Will raise when invalid parameters found.
Expand All @@ -214,8 +299,13 @@ def validate_list_of_dicts(param_list, spec, module=None):
v = validation
normalized = []
invalid_params = []

for list_entry in param_list:
valid_params_dict = {}
if not spec:
# Handle the case when spec becomes empty but param list is still there
invalid_params.append("No more spec to validate, but parameters remain")
break
for param in spec:
item = list_entry.get(param)
log(str(item))
Expand All @@ -226,58 +316,41 @@ def validate_list_of_dicts(param_list, spec, module=None):
)
else:
item = spec[param].get("default")
valid_params_dict[param] = item
continue
data_type = spec[param].get("type")
switch = {
"str": validate_str,
"int": validate_int,
"bool": validate_bool,
"list": validate_list,
"dict": validate_dict,
}

validator = switch.get(data_type)
if validator:
item = validator(item, spec[param], param, invalid_params)
else:
type = spec[param].get("type")
if type == "str":
item = v.check_type_str(item)
if spec[param].get("length_max"):
if 1 <= len(item) <= spec[param].get("length_max"):
pass
else:
invalid_params.append(
"{0}:{1} : The string exceeds the allowed "
"range of max {2} char".format(
param, item, spec[param].get("length_max")
)
)
elif type == "int":
item = v.check_type_int(item)
min_value = 1
if spec[param].get("range_min") is not None:
min_value = spec[param].get("range_min")
if spec[param].get("range_max"):
if min_value <= item <= spec[param].get("range_max"):
pass
else:
invalid_params.append(
"{0}:{1} : The item exceeds the allowed "
"range of max {2}".format(
param, item, spec[param].get("range_max")
)
)
elif type == "bool":
item = v.check_type_bool(item)
elif type == "list":
item = v.check_type_list(item)
elif type == "dict":
item = v.check_type_dict(item)

choice = spec[param].get("choices")
if choice:
if item not in choice:
invalid_params.append(
"{0} : Invalid choice provided".format(item)
)

no_log = spec[param].get("no_log")
if no_log:
if module is not None:
module.no_log_values.add(item)
else:
msg = "\n\n'{0}' is a no_log parameter".format(param)
msg += "\nAnsible module object must be passed to this "
msg += "\nfunction to ensure it is not logged\n\n"
raise Exception(msg)
invalid_params.append(
"{0}:{1} : Unsupported data type {2}.".format(param, item, data_type)
)

choice = spec[param].get("choices")
if choice:
if item not in choice:
invalid_params.append(
"{0} : Invalid choice provided".format(item)
)

no_log = spec[param].get("no_log")
if no_log:
if module is not None:
module.no_log_values.add(item)
else:
msg = "\n\n'{0}' is a no_log parameter".format(param)
msg += "\nAnsible module object must be passed to this "
msg += "\nfunction to ensure it is not logged\n\n"
raise Exception(msg)

valid_params_dict[param] = item
normalized.append(valid_params_dict)
Expand Down
Loading