diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..1e85a34 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,34 @@ +## Version 0.2.16 +**Release Date:** 02-12-2020 + +### BugFixes + + - Some configuration lines were missing from `CiscoIosInterface.get_unprocessed()` + - `utils.common_utils.get_logger` used Formatter which did not show logger name + +### New Functions + +#### CiscoIosParser.vlan_groups +Returns list of VLAN groups, such as +```python +[ + { + "group": "GROUP01", + "vlan_id": "1" + }, + { + "group": "GROUP02", + "vlan_id": "2" + } +] +``` +#### CiscoIosInterface.dhcp_snooping +*Note: This is just a quick one for specific use case. Currently only checks if interface is trusted (containing `ip dhcp snooping trus` child)* + +In the future will return all info regarding DHCP Snooping on interface level. Currently only returns dict +```python +{ + "trust": True +} +``` +or `None` diff --git a/MANIFEST.in b/MANIFEST.in index 7a036d5..748b428 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,4 @@ include requirements.txt -include CHANGELOG.txt +include CHANGELOG.md recursive-include ccutils/templates * recursive-include docs * diff --git a/ccutils/ccparser/BaseConfigParser.py b/ccutils/ccparser/BaseConfigParser.py index 88fc812..7a543f7 100644 --- a/ccutils/ccparser/BaseConfigParser.py +++ b/ccutils/ccparser/BaseConfigParser.py @@ -24,7 +24,7 @@ class BaseConfigParser(object): _vlan_configuration_regex = re.compile(pattern=r"^vlan configuration (?P[\d\-,]+)", flags=re.MULTILINE) _device_tracking_attach_policy_regex = re.compile(pattern=r"^ device-tracking attach-policy (?P\S+)") - def __init__(self, config=None, verbosity=4, **kwargs): + def __init__(self, config=None, verbosity=4, name="BaseConfigParser", **kwargs): """ Base class for parsing Cisco-like configs @@ -71,7 +71,7 @@ def __init__(self, config=None, verbosity=4, **kwargs): """ self.verbosity = verbosity - self.logger = get_logger(name="ConfigParser", verbosity=verbosity) + self.logger = get_logger(name=name, verbosity=verbosity) self.config = config self.path = self._check_path(kwargs.get("filepath", None)) if kwargs.get("filepath", None) else None diff --git a/ccutils/ccparser/CiscoIosInterfaceLine.py b/ccutils/ccparser/CiscoIosInterfaceLine.py index 0840951..2c894c2 100644 --- a/ccutils/ccparser/CiscoIosInterfaceLine.py +++ b/ccutils/ccparser/CiscoIosInterfaceLine.py @@ -25,6 +25,7 @@ class CiscoIosInterfaceLine(BaseInterfaceLine): _helper_address_regex = re.compile(pattern=r"^\sip\shelper-address\s(?P(?:\d{1,3}\.){3}\d{1,3})", flags=re.MULTILINE) + _ip_dhcp_snooping_trust_regex = re.compile(r"^\sip dhcp snooping trust$", flags=re.MULTILINE) _native_vlan_regex = re.compile(pattern=r"^ switchport trunk native vlan (?P\d+)", flags=re.MULTILINE) _trunk_encapsulation_regex = re.compile(pattern=r"^ switchport trunk encapsulation (?Pdot1q|isl|negotiate)", flags=re.MULTILINE) @@ -108,7 +109,8 @@ def get_unprocessed(self, return_type=None): list: List of unprocessed config lines """ - unprocessed_children = [] + unprocessed_children = self.get_children() + regexes = [ self._description_regex, self._ip_addr_regex, @@ -168,21 +170,20 @@ def get_unprocessed(self, return_type=None): self._service_instance_service_policy_regex, self._ip_unnumbered_interface_regex, self._negotiation_regex, + self._ip_dhcp_snooping_trust_regex, re.compile(pattern=r"^\s*!.*", flags=re.MULTILINE), re.compile(pattern=r"^\sno\sip\saddress", flags=re.MULTILINE), re.compile(pattern=r"^ (no )?switchport$", flags=re.MULTILINE), re.compile(pattern=r"^ spanning-tree portfast") ] - for child in self.re_search_children(regex=r"^ \S"): - is_processed = False - for regex in regexes: - match = child.re_search(regex=regex) - if match: - is_processed = True - break - if not is_processed: - unprocessed_children.append(child) - + for regex in regexes: + for child in self.re_search_children(regex=regex): + unprocessed_children.remove(child) + if return_type == "text": + return [x.text for x in unprocessed_children] + elif return_type == "obj": + return unprocessed_children + else: return [x.text for x in unprocessed_children] def _val_to_bool(self, entry, key): @@ -1129,3 +1130,19 @@ def device_tracking_policy(self): if len(candidates): device_tracking_policy = candidates[0] return device_tracking_policy + @property + @functools.lru_cache() + def dhcp_snooping(self): + dhcp_snooping = {"trust": None} + trust_candidates = self.re_search_children(regex=self._ip_dhcp_snooping_trust_regex) + if len(trust_candidates): + dhcp_snooping["trust"] = True + + if self.config.minimal_results: + if not any(dhcp_snooping.values()): + dhcp_snooping = None + + return dhcp_snooping + + + diff --git a/ccutils/ccparser/CiscoIosParser.py b/ccutils/ccparser/CiscoIosParser.py index 833923d..18b4b02 100644 --- a/ccutils/ccparser/CiscoIosParser.py +++ b/ccutils/ccparser/CiscoIosParser.py @@ -72,7 +72,7 @@ class CiscoIosParser(BaseConfigParser): _vrf_afi_rt_regex = re.compile(pattern=r"route-target (?Pimport|export) (?P\d+:\d+)") def __init__(self, config=None, verbosity=4, **kwargs): - super(CiscoIosParser, self).__init__(config=config, verbosity=verbosity, **kwargs) + super(CiscoIosParser, self).__init__(config=config, verbosity=verbosity, name="CiscoIosParser", **kwargs) @property @functools.lru_cache() @@ -535,8 +535,7 @@ def vlans(self): @property def vlan_groups(self): - vlan_group_regex = re.compile(pattern=r"^vlan group (?P\S+) vlan-list (?P\d+)", - flags=re.MULTILINE) + vlan_group_regex = re.compile(pattern=r"^vlan group (?P\S+) vlan-list (?P\d+)", flags=re.MULTILINE) candidates = self.find_objects(regex=vlan_group_regex) return [x.re_search(regex=vlan_group_regex, group="ALL") for x in candidates] diff --git a/ccutils/utils/CustomAnsibleDumper.py b/ccutils/utils/CustomAnsibleDumper.py index e69de29..ef6ac92 100644 --- a/ccutils/utils/CustomAnsibleDumper.py +++ b/ccutils/utils/CustomAnsibleDumper.py @@ -0,0 +1,49 @@ +import yaml +from yaml.representer import Representer +from yaml.dumper import Dumper +from yaml.emitter import Emitter +from yaml.serializer import Serializer +from yaml.resolver import Resolver +from collections import OrderedDict + + +def represent_ordereddict(dumper, data): + value = [] + + for item_key, item_value in data.items(): + node_key = dumper.represent_data(item_key) + node_value = dumper.represent_data(item_value) + + value.append((node_key, node_value)) + + return yaml.nodes.MappingNode(u'tag:yaml.org,2002:map', value) + +class CustomAnsibleRepresenter(Representer): + + def represent_none(self, data): + return self.represent_scalar(u'tag:yaml.org,2002:null', u'') + + +class CustomAnsibleDumper(Emitter, Serializer, CustomAnsibleRepresenter, Resolver): + def __init__(self, stream, + default_style=None, default_flow_style=None, + canonical=None, indent=None, width=None, + allow_unicode=None, line_break=None, + encoding=None, explicit_start=None, explicit_end=None, sort_keys=False, + version=None, tags=None): + Emitter.__init__(self, stream, canonical=canonical, + indent=indent, width=width, + allow_unicode=allow_unicode, line_break=line_break) + Serializer.__init__(self, encoding=encoding, + explicit_start=explicit_start, explicit_end=explicit_end, + version=version, tags=tags) + CustomAnsibleRepresenter.__init__(self, default_style=default_style, + default_flow_style=default_flow_style) + Resolver.__init__(self) + + CustomAnsibleRepresenter.add_representer(type(None), CustomAnsibleRepresenter.represent_none) + CustomAnsibleRepresenter.add_representer(OrderedDict, represent_ordereddict) + + + def increase_indent(self, flow=False, indentless=False): + return super(CustomAnsibleDumper, self).increase_indent(flow=flow, indentless=False) \ No newline at end of file diff --git a/ccutils/utils/ExcelInventory.py b/ccutils/utils/ExcelInventory.py index e69de29..50ba6e0 100644 --- a/ccutils/utils/ExcelInventory.py +++ b/ccutils/utils/ExcelInventory.py @@ -0,0 +1,165 @@ +import pathlib +import unicodedata +import yaml +import re +from ccutils.utils.common_utils import get_logger, interface_sort +from ccutils.utils.CiscoRange import CiscoRange +from ccutils.utils.CustomAnsibleDumper import CustomAnsibleDumper +from collections import OrderedDict +from pprint import pprint + + +try: + import pandas as pd +except ImportError: + print("To use 'ExcelInventory' function you need to have 'pandas' installed.") + pd = None + +class GroupDoesNotExist(Exception): + pass + +class HostDoesNotExist(Exception): + pass + +class ObjectNotFound(object): + pass + + def __repr__(self): + return "[ObjectNotFound]" + + def __str__(self): + return "[ObjectNotFound]" + + def __bool__(self): + return False + +class ExcelInventory(object): + pass + + def __init__(self, input_file, output_dir, verbosity=4): + self.logger = get_logger(name="ExcelInventory", verbosity=verbosity) + self.input_file = self.check_path(path=input_file, mode="file") + self.output_dir = self.check_path(path=output_dir, mode="directory") + self.host_vars = {} + self.group_vars = {} + self.hosts = {} + + def check_path(self, path, mode): + """ + """ + self.logger.info("Checking Path: '{}'".format(path)) + try: + if not isinstance(path, pathlib.Path): + path = pathlib.Path(path) + if path.exists(): + if path.is_file() and mode == "file": + self.logger.info("Path: '{}' Exists: File.".format(path)) + elif path.is_file() and mode == "directory": + self.logger.critical("Path: '{}' Exists but is not a file!".format(path)) + path = None + elif not path.is_file() and mode == "directory": + self.logger.info("Path: '{}' Exists: Directory.".format(path)) + elif not path.is_file() and mode == "file": + self.logger.critical("Path: '{}' Exists but is not a directory!".format(path)) + path = None + else: + self.logger.critical("Path: '{}' Unhandled error!".format(path)) + else: + self.logger.critical("Path: '{}' Does not exist!".format(path)) + except Exception as e: + self.logger.critical("Could not determine valid path for '{}'. Exception: {}".format(path, repr(e))) + finally: + return path + + def load_excel(self, path, sheet_name, index_column=None, columns_rename=None, **kwargs): + self.logger.info("Loading file: '{}' Sheet: '{}' as DF".format(path, sheet_name)) + df = pd.read_excel(io=path, sheet_name=sheet_name, index_col=index_column, **kwargs) + df = df.where(pd.notnull(df), None) + if columns_rename is not None: + df = df.rename(columns=columns_rename) + return df + + def duplicates_check(self, df, columns): + results = [] + for column_name in columns: + duplicates = df.duplicated(subset=[column_name]) + results.append(any(duplicates)) + if results[-1]: + self.logger.warning( + "Found duplicated values in column '{0}': {1}".format(column_name, df[duplicates][column_name])) + return results + + @staticmethod + def replace_cz_chars(line): + line = unicodedata.normalize('NFKD', line) + output = '' + for c in line: + if not unicodedata.combining(c): + output += c + return output + + def _finditem(self, obj, key): + if key in obj: + return obj + for k, v in obj.items(): + if isinstance(v, dict): + return self._finditem(v, key) # added return statement + return ObjectNotFound() + + def recursive_parent_lookup(self, key, obj): + if key in obj: + return obj + for v in obj.values(): + if isinstance(v, dict): + a = self.recursive_parent_lookup(key=key, obj=v) + if not isinstance(a, ObjectNotFound): + return a + return ObjectNotFound() + + def get_ordered_interfaces(self, host): + """ + Return interfaces as OrderedDict + + Returns: + (:obj:`OrderedDict`): Interface section as OrderedDict + + """ + if host not in self.host_vars.keys(): + msg = "Host '{}' does not exist".format(host) + raise HostDoesNotExist(msg) + if "interfaces" not in self.host_vars[host].keys(): + return OrderedDict() + interfaces_crange = CiscoRange(list(self.host_vars[host]["interfaces"].keys())) + ordered_interfaces = OrderedDict(sorted(self.host_vars[host]["interfaces"].items(), key=lambda x: interface_sort(crange=interfaces_crange, name=x[0]))) + return ordered_interfaces + + def dump_hosts(self, outputfile): + self.logger.info("Storing hosts as YAML file.") + with self.output_dir.joinpath(outputfile).open(mode="w") as f: + yaml.dump(data=self.hosts, stream=f, Dumper=CustomAnsibleDumper) + + def dump_hostvars(self): + self.logger.info("Storing host_vars as YAML files.") + if self.output_dir is not None: + host_vars_path = self.output_dir.joinpath("host_vars") + host_vars_path.mkdir(exist_ok=True) + for hostname, host_vars in self.host_vars.items(): + path = host_vars_path.joinpath("{}.yml".format(hostname)) + with path.open(mode="w") as f: + data = host_vars + data["interfaces"] = self.get_ordered_interfaces(host=hostname) + yaml_string = yaml.dump(data=data, Dumper=CustomAnsibleDumper) + yaml_string = re.sub("'\"(.*)\"'", '"\\1"', yaml_string) + f.write(yaml_string) + # yaml.dump(data=host_vars, stream=f, Dumper=CustomAnsibleDumper) + + def dump_groupvars(self): + self.logger.info("Storing group_vars as YAML files.") + if self.output_dir is not None: + group_vars_path = self.output_dir.joinpath("group_vars") + group_vars_path.mkdir(exist_ok=True) + for groupname, group_vars in self.group_vars.items(): + path = group_vars_path.joinpath("{}.yml".format(groupname)) + with path.open(mode="w") as f: + yaml.dump(data=group_vars, stream=f, Dumper=CustomAnsibleDumper) + diff --git a/ccutils/utils/common_utils.py b/ccutils/utils/common_utils.py index adcf14b..2c6e046 100644 --- a/ccutils/utils/common_utils.py +++ b/ccutils/utils/common_utils.py @@ -5,6 +5,28 @@ import re from collections import OrderedDict +INTERFACE_FLAGS_SUBSTRING_MAP = { + "physical": [ + "Ethernet", + "GigE" + ], + "virtual": [ + "Loopback", + "Vlan" + ], + "port-channel": [ + "Port-channel" + ] +} + +PHYSICAL_INTERFACE_SUBSTRINGS = [ + "Ethernet", + "GigE" +] +VIRTUAL_INTERFACE_SUBSTRINGS = [ + "Loopback", + "Vlan" +] class UnsortableList(list): def sort(self, *args, **kwargs): @@ -28,9 +50,12 @@ def get_logger(name, verbosity=4): 5: logging.DEBUG } + threading_formatter_string = '[%(asctime)s] [%(levelname)s]\t[%(name)s][%(threadName)s][%(module)s][%(funcName)s]\t%(message)s' + single_formatter_string = '[%(asctime)s] [%(levelname)s]\t[%(name)s][%(module)s][%(funcName)s]\t%(message)s' + logger = logging.getLogger(name) handler = logging.StreamHandler(sys.stdout) - formatter = logging.Formatter('[%(asctime)s] [%(levelname)s]\t[%(module)s][%(funcName)s]\t%(message)s') + formatter = logging.Formatter(single_formatter_string) handler.setFormatter(formatter) if not len(logger.handlers): logger.addHandler(handler) @@ -254,6 +279,16 @@ def has_old_pyyaml(): return version.parse(yaml.__version__) < version.parse("5.1") +def get_flags_from_interface_name(interface): + flags = set() + for flag, subs in INTERFACE_FLAGS_SUBSTRING_MAP.items(): + for sub in subs: + if sub in interface: + flags.add(flag) + if re.search(pattern=r"\d+\.d+", string=interface): + flags.add("subinterface") + return list(flags) + def load_excel_sheet(file, sheet_name): try: import pandas as pd diff --git a/docs/conf.py b/docs/conf.py index ee76f18..795611f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -26,9 +26,9 @@ add_module_names = False # The short X.Y version -version = '0.2.15' +version = '0.2.16' # The full version, including alpha/beta/rc tags -release = '0.2.15-beta' +release = '0.2.16-beta' # -- General configuration --------------------------------------------------- diff --git a/setup.py b/setup.py index b6f8ae1..3568c9e 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name='ccutils', - version='0.2.15', + version='0.2.16', packages=find_packages(exclude=["test", "examples"]), url='https://github.org/mihudec/ccutils', license='', diff --git a/tests/resources/cisco_ios_interface_l2_tests.txt b/tests/resources/cisco_ios_interface_l2_tests.txt new file mode 100644 index 0000000..81a76db --- /dev/null +++ b/tests/resources/cisco_ios_interface_l2_tests.txt @@ -0,0 +1,3 @@ +interface TenGigabitEthernet1/0/1 + ip dhcp snooping trust +! \ No newline at end of file diff --git a/tests/resources/cisco_ios_vlan_groups_tests.txt b/tests/resources/cisco_ios_vlan_groups_tests.txt new file mode 100644 index 0000000..bed4784 --- /dev/null +++ b/tests/resources/cisco_ios_vlan_groups_tests.txt @@ -0,0 +1,2 @@ +vlan group GROUP01 vlan-list 1 +vlan group GROUP02 vlan-list 2 \ No newline at end of file diff --git a/tests/resources/interface_l2_test.txt b/tests/resources/interface_l2_test.txt index ba57443..f03c5ef 100644 --- a/tests/resources/interface_l2_test.txt +++ b/tests/resources/interface_l2_test.txt @@ -11,4 +11,4 @@ interface Ethernet0/1 switchport mode access switchport access vlan 10 switchport voice vlan 20 -! +! \ No newline at end of file diff --git a/tests/results/cisco_ios_interface_l2_tests.json b/tests/results/cisco_ios_interface_l2_tests.json new file mode 100644 index 0000000..76b234a --- /dev/null +++ b/tests/results/cisco_ios_interface_l2_tests.json @@ -0,0 +1,12 @@ +{ + "interfaces": { + "TenGigabitEthernet1/0/1": { + "dhcp_snooping": { + "trust": true + }, + "unprocessed_lines": [ + " ip dhcp snooping trust" + ] + } + } +} \ No newline at end of file diff --git a/tests/results/cisco_ios_vlan_groups_tests.json b/tests/results/cisco_ios_vlan_groups_tests.json new file mode 100644 index 0000000..310e33d --- /dev/null +++ b/tests/results/cisco_ios_vlan_groups_tests.json @@ -0,0 +1,12 @@ +{ + "vlan_groups": [ + { + "group": "GROUP01", + "vlan_id": "1" + }, + { + "group": "GROUP02", + "vlan_id": "2" + } + ] +} \ No newline at end of file diff --git a/tests/test_CiscoIosInterface.py b/tests/test_CiscoIosInterface.py index 12e5da5..4f8eaac 100644 --- a/tests/test_CiscoIosInterface.py +++ b/tests/test_CiscoIosInterface.py @@ -35,3 +35,23 @@ def test_bdf(self): want = self.results["interfaces"][interface]["bfd"] have = interface_line.bfd self.assertEqual(want, have) + + +class TestL2Interface(unittest.TestCase): + test_file_base = "cisco_ios_interface_l2_tests" + test_file_path = pathlib.Path(__file__).parent.joinpath("resources/{}.txt".format(test_file_base)) + result_file_path = pathlib.Path(__file__).parent.joinpath("results/{}.json".format(test_file_base)) + config = ConfigParser(config=test_file_path, device_type="ios", verbosity=3) + config.minimal_results = True + results = json.loads(result_file_path.read_text()) + + def test_dhcp_snooping(self): + tested_interfaces = ["TenGigabitEthernet1/0/1"] + for interface in tested_interfaces: + with self.subTest(msg=interface): + interface_line = [x for x in self.config.interface_lines if x.name == interface][0] + print(interface_line) + want = self.results["interfaces"][interface]["dhcp_snooping"] + have = interface_line.dhcp_snooping + self.assertEqual(want, have) + diff --git a/tests/test_CiscoIosParser.py b/tests/test_CiscoIosParser.py index d14dbbe..a3b40d6 100644 --- a/tests/test_CiscoIosParser.py +++ b/tests/test_CiscoIosParser.py @@ -67,3 +67,14 @@ def test_vrf_ipv4_standby_addresses(self): self.assertEqual(want, have) + def test_vlan_groups(self): + test_file_base = "cisco_ios_vlan_groups_tests" + with self.subTest(msg=test_file_base): + config = self.get_config(test_file_name=test_file_base) + want = self.get_results(results_file_name=test_file_base)["vlan_groups"] + have = config.vlan_groups + jprint(have) + + self.assertEqual(want, have) + +