Skip to content

Commit

Permalink
Version 0.2.16
Browse files Browse the repository at this point in the history
  • Loading branch information
mihudec committed Dec 2, 2020
1 parent 547a3b0 commit 8a83da1
Show file tree
Hide file tree
Showing 17 changed files with 381 additions and 22 deletions.
34 changes: 34 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
## Version 0.2.16
**Release Date:** 02-12-2020

### BugFixes

- Some configuration lines were missing from `CiscoIosInterface.get_unprocessed()`
- `utils.common_utils.get_logger` used Formatter which did not show logger name

### New Functions

#### CiscoIosParser.vlan_groups
Returns list of VLAN groups, such as
```python
[
{
"group": "GROUP01",
"vlan_id": "1"
},
{
"group": "GROUP02",
"vlan_id": "2"
}
]
```
#### CiscoIosInterface.dhcp_snooping
*Note: This is just a quick one for specific use case. Currently only checks if interface is trusted (containing `ip dhcp snooping trus` child)*

In the future will return all info regarding DHCP Snooping on interface level. Currently only returns dict
```python
{
"trust": True
}
```
or `None`
2 changes: 1 addition & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
include requirements.txt
include CHANGELOG.txt
include CHANGELOG.md
recursive-include ccutils/templates *
recursive-include docs *
4 changes: 2 additions & 2 deletions ccutils/ccparser/BaseConfigParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class BaseConfigParser(object):
_vlan_configuration_regex = re.compile(pattern=r"^vlan configuration (?P<vlan_range>[\d\-,]+)", flags=re.MULTILINE)
_device_tracking_attach_policy_regex = re.compile(pattern=r"^ device-tracking attach-policy (?P<policy>\S+)")

def __init__(self, config=None, verbosity=4, **kwargs):
def __init__(self, config=None, verbosity=4, name="BaseConfigParser", **kwargs):
"""
Base class for parsing Cisco-like configs
Expand Down Expand Up @@ -71,7 +71,7 @@ def __init__(self, config=None, verbosity=4, **kwargs):
"""
self.verbosity = verbosity
self.logger = get_logger(name="ConfigParser", verbosity=verbosity)
self.logger = get_logger(name=name, verbosity=verbosity)
self.config = config
self.path = self._check_path(kwargs.get("filepath", None)) if kwargs.get("filepath", None) else None

Expand Down
39 changes: 28 additions & 11 deletions ccutils/ccparser/CiscoIosInterfaceLine.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class CiscoIosInterfaceLine(BaseInterfaceLine):

_helper_address_regex = re.compile(pattern=r"^\sip\shelper-address\s(?P<helper_address>(?:\d{1,3}\.){3}\d{1,3})", flags=re.MULTILINE)

_ip_dhcp_snooping_trust_regex = re.compile(r"^\sip dhcp snooping trust$", flags=re.MULTILINE)

_native_vlan_regex = re.compile(pattern=r"^ switchport trunk native vlan (?P<native_vlan>\d+)", flags=re.MULTILINE)
_trunk_encapsulation_regex = re.compile(pattern=r"^ switchport trunk encapsulation (?P<encapsulation>dot1q|isl|negotiate)", flags=re.MULTILINE)
Expand Down Expand Up @@ -108,7 +109,8 @@ def get_unprocessed(self, return_type=None):
list: List of unprocessed config lines
"""
unprocessed_children = []
unprocessed_children = self.get_children()

regexes = [
self._description_regex,
self._ip_addr_regex,
Expand Down Expand Up @@ -168,21 +170,20 @@ def get_unprocessed(self, return_type=None):
self._service_instance_service_policy_regex,
self._ip_unnumbered_interface_regex,
self._negotiation_regex,
self._ip_dhcp_snooping_trust_regex,
re.compile(pattern=r"^\s*!.*", flags=re.MULTILINE),
re.compile(pattern=r"^\sno\sip\saddress", flags=re.MULTILINE),
re.compile(pattern=r"^ (no )?switchport$", flags=re.MULTILINE),
re.compile(pattern=r"^ spanning-tree portfast")
]
for child in self.re_search_children(regex=r"^ \S"):
is_processed = False
for regex in regexes:
match = child.re_search(regex=regex)
if match:
is_processed = True
break
if not is_processed:
unprocessed_children.append(child)

for regex in regexes:
for child in self.re_search_children(regex=regex):
unprocessed_children.remove(child)
if return_type == "text":
return [x.text for x in unprocessed_children]
elif return_type == "obj":
return unprocessed_children
else:
return [x.text for x in unprocessed_children]

def _val_to_bool(self, entry, key):
Expand Down Expand Up @@ -1129,3 +1130,19 @@ def device_tracking_policy(self):
if len(candidates):
device_tracking_policy = candidates[0]
return device_tracking_policy
@property
@functools.lru_cache()
def dhcp_snooping(self):
dhcp_snooping = {"trust": None}
trust_candidates = self.re_search_children(regex=self._ip_dhcp_snooping_trust_regex)
if len(trust_candidates):
dhcp_snooping["trust"] = True

if self.config.minimal_results:
if not any(dhcp_snooping.values()):
dhcp_snooping = None

return dhcp_snooping



5 changes: 2 additions & 3 deletions ccutils/ccparser/CiscoIosParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class CiscoIosParser(BaseConfigParser):
_vrf_afi_rt_regex = re.compile(pattern=r"route-target (?P<action>import|export) (?P<rt>\d+:\d+)")

def __init__(self, config=None, verbosity=4, **kwargs):
super(CiscoIosParser, self).__init__(config=config, verbosity=verbosity, **kwargs)
super(CiscoIosParser, self).__init__(config=config, verbosity=verbosity, name="CiscoIosParser", **kwargs)

@property
@functools.lru_cache()
Expand Down Expand Up @@ -535,8 +535,7 @@ def vlans(self):

@property
def vlan_groups(self):
vlan_group_regex = re.compile(pattern=r"^vlan group (?P<group>\S+) vlan-list (?P<vlan_id>\d+)",
flags=re.MULTILINE)
vlan_group_regex = re.compile(pattern=r"^vlan group (?P<group>\S+) vlan-list (?P<vlan_id>\d+)", flags=re.MULTILINE)
candidates = self.find_objects(regex=vlan_group_regex)
return [x.re_search(regex=vlan_group_regex, group="ALL") for x in candidates]

Expand Down
49 changes: 49 additions & 0 deletions ccutils/utils/CustomAnsibleDumper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import yaml
from yaml.representer import Representer
from yaml.dumper import Dumper
from yaml.emitter import Emitter
from yaml.serializer import Serializer
from yaml.resolver import Resolver
from collections import OrderedDict


def represent_ordereddict(dumper, data):
value = []

for item_key, item_value in data.items():
node_key = dumper.represent_data(item_key)
node_value = dumper.represent_data(item_value)

value.append((node_key, node_value))

return yaml.nodes.MappingNode(u'tag:yaml.org,2002:map', value)

class CustomAnsibleRepresenter(Representer):

def represent_none(self, data):
return self.represent_scalar(u'tag:yaml.org,2002:null', u'')


class CustomAnsibleDumper(Emitter, Serializer, CustomAnsibleRepresenter, Resolver):
def __init__(self, stream,
default_style=None, default_flow_style=None,
canonical=None, indent=None, width=None,
allow_unicode=None, line_break=None,
encoding=None, explicit_start=None, explicit_end=None, sort_keys=False,
version=None, tags=None):
Emitter.__init__(self, stream, canonical=canonical,
indent=indent, width=width,
allow_unicode=allow_unicode, line_break=line_break)
Serializer.__init__(self, encoding=encoding,
explicit_start=explicit_start, explicit_end=explicit_end,
version=version, tags=tags)
CustomAnsibleRepresenter.__init__(self, default_style=default_style,
default_flow_style=default_flow_style)
Resolver.__init__(self)

CustomAnsibleRepresenter.add_representer(type(None), CustomAnsibleRepresenter.represent_none)
CustomAnsibleRepresenter.add_representer(OrderedDict, represent_ordereddict)


def increase_indent(self, flow=False, indentless=False):
return super(CustomAnsibleDumper, self).increase_indent(flow=flow, indentless=False)
165 changes: 165 additions & 0 deletions ccutils/utils/ExcelInventory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import pathlib
import unicodedata
import yaml
import re
from ccutils.utils.common_utils import get_logger, interface_sort
from ccutils.utils.CiscoRange import CiscoRange
from ccutils.utils.CustomAnsibleDumper import CustomAnsibleDumper
from collections import OrderedDict
from pprint import pprint


try:
import pandas as pd
except ImportError:
print("To use 'ExcelInventory' function you need to have 'pandas' installed.")
pd = None

class GroupDoesNotExist(Exception):
pass

class HostDoesNotExist(Exception):
pass

class ObjectNotFound(object):
pass

def __repr__(self):
return "[ObjectNotFound]"

def __str__(self):
return "[ObjectNotFound]"

def __bool__(self):
return False

class ExcelInventory(object):
pass

def __init__(self, input_file, output_dir, verbosity=4):
self.logger = get_logger(name="ExcelInventory", verbosity=verbosity)
self.input_file = self.check_path(path=input_file, mode="file")
self.output_dir = self.check_path(path=output_dir, mode="directory")
self.host_vars = {}
self.group_vars = {}
self.hosts = {}

def check_path(self, path, mode):
"""
"""
self.logger.info("Checking Path: '{}'".format(path))
try:
if not isinstance(path, pathlib.Path):
path = pathlib.Path(path)
if path.exists():
if path.is_file() and mode == "file":
self.logger.info("Path: '{}' Exists: File.".format(path))
elif path.is_file() and mode == "directory":
self.logger.critical("Path: '{}' Exists but is not a file!".format(path))
path = None
elif not path.is_file() and mode == "directory":
self.logger.info("Path: '{}' Exists: Directory.".format(path))
elif not path.is_file() and mode == "file":
self.logger.critical("Path: '{}' Exists but is not a directory!".format(path))
path = None
else:
self.logger.critical("Path: '{}' Unhandled error!".format(path))
else:
self.logger.critical("Path: '{}' Does not exist!".format(path))
except Exception as e:
self.logger.critical("Could not determine valid path for '{}'. Exception: {}".format(path, repr(e)))
finally:
return path

def load_excel(self, path, sheet_name, index_column=None, columns_rename=None, **kwargs):
self.logger.info("Loading file: '{}' Sheet: '{}' as DF".format(path, sheet_name))
df = pd.read_excel(io=path, sheet_name=sheet_name, index_col=index_column, **kwargs)
df = df.where(pd.notnull(df), None)
if columns_rename is not None:
df = df.rename(columns=columns_rename)
return df

def duplicates_check(self, df, columns):
results = []
for column_name in columns:
duplicates = df.duplicated(subset=[column_name])
results.append(any(duplicates))
if results[-1]:
self.logger.warning(
"Found duplicated values in column '{0}': {1}".format(column_name, df[duplicates][column_name]))
return results

@staticmethod
def replace_cz_chars(line):
line = unicodedata.normalize('NFKD', line)
output = ''
for c in line:
if not unicodedata.combining(c):
output += c
return output

def _finditem(self, obj, key):
if key in obj:
return obj
for k, v in obj.items():
if isinstance(v, dict):
return self._finditem(v, key) # added return statement
return ObjectNotFound()

def recursive_parent_lookup(self, key, obj):
if key in obj:
return obj
for v in obj.values():
if isinstance(v, dict):
a = self.recursive_parent_lookup(key=key, obj=v)
if not isinstance(a, ObjectNotFound):
return a
return ObjectNotFound()

def get_ordered_interfaces(self, host):
"""
Return interfaces as OrderedDict
Returns:
(:obj:`OrderedDict`): Interface section as OrderedDict
"""
if host not in self.host_vars.keys():
msg = "Host '{}' does not exist".format(host)
raise HostDoesNotExist(msg)
if "interfaces" not in self.host_vars[host].keys():
return OrderedDict()
interfaces_crange = CiscoRange(list(self.host_vars[host]["interfaces"].keys()))
ordered_interfaces = OrderedDict(sorted(self.host_vars[host]["interfaces"].items(), key=lambda x: interface_sort(crange=interfaces_crange, name=x[0])))
return ordered_interfaces

def dump_hosts(self, outputfile):
self.logger.info("Storing hosts as YAML file.")
with self.output_dir.joinpath(outputfile).open(mode="w") as f:
yaml.dump(data=self.hosts, stream=f, Dumper=CustomAnsibleDumper)

def dump_hostvars(self):
self.logger.info("Storing host_vars as YAML files.")
if self.output_dir is not None:
host_vars_path = self.output_dir.joinpath("host_vars")
host_vars_path.mkdir(exist_ok=True)
for hostname, host_vars in self.host_vars.items():
path = host_vars_path.joinpath("{}.yml".format(hostname))
with path.open(mode="w") as f:
data = host_vars
data["interfaces"] = self.get_ordered_interfaces(host=hostname)
yaml_string = yaml.dump(data=data, Dumper=CustomAnsibleDumper)
yaml_string = re.sub("'\"(.*)\"'", '"\\1"', yaml_string)
f.write(yaml_string)
# yaml.dump(data=host_vars, stream=f, Dumper=CustomAnsibleDumper)

def dump_groupvars(self):
self.logger.info("Storing group_vars as YAML files.")
if self.output_dir is not None:
group_vars_path = self.output_dir.joinpath("group_vars")
group_vars_path.mkdir(exist_ok=True)
for groupname, group_vars in self.group_vars.items():
path = group_vars_path.joinpath("{}.yml".format(groupname))
with path.open(mode="w") as f:
yaml.dump(data=group_vars, stream=f, Dumper=CustomAnsibleDumper)

Loading

0 comments on commit 8a83da1

Please sign in to comment.