From 6ed12eca822f397072ad684b2abb562fc0596c89 Mon Sep 17 00:00:00 2001 From: waghswapnil Date: Wed, 8 Feb 2017 07:34:40 +0530 Subject: [PATCH] apis for ip blocking and ip filtering (#105) * apis for ip blocking and ip filtering Signed-off-by: Swapnil Wagh * added examples in docstrings and warnings for user Signed-off-by: Swapnil Wagh --- imcsdk/apis/admin/ip.py | 328 ++++++++++++++++++++++++++++++++++++++++ imcsdk/imcmo.py | 2 + tests/server/test_ip.py | 130 ++++++++++++++++ 3 files changed, 460 insertions(+) create mode 100644 imcsdk/apis/admin/ip.py create mode 100644 tests/server/test_ip.py diff --git a/imcsdk/apis/admin/ip.py b/imcsdk/apis/admin/ip.py new file mode 100644 index 00000000..8b53c449 --- /dev/null +++ b/imcsdk/apis/admin/ip.py @@ -0,0 +1,328 @@ +# Copyright 2016 Cisco Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +This module implements the APIs for IP Blocking and IP Filtering +""" +import logging +from imcsdk.mometa.ip.IpBlocking import IpBlocking +from imcsdk.mometa.ip.IpFiltering import IpFiltering, IpFilteringConsts +from imcsdk.apis.utils import _get_mo, _is_valid_arg, _is_invalid_value +from imcsdk.imccoreutils import get_server_dn, IMC_PLATFORM +from imcsdk.imcexception import ImcOperationError + +log = logging.getLogger('imc') + + +def _get_mgmt_if_dn(handle, id=1): + from imcsdk.mometa.mgmt.MgmtIf import MgmtIf + + if handle.platform == IMC_PLATFORM.TYPE_CLASSIC: + parent_dn = get_server_dn(handle) + '/mgmt' + elif handle.platform == IMC_PLATFORM.TYPE_MODULAR: + parent_dn = 'sys/chassis-1' + + mo = MgmtIf(parent_mo_or_dn=parent_dn) + return mo.dn + + +def ip_blocking_enable(handle, fail_count='5', fail_window='60', + penalty_time='300', **kwargs): + """ + Enables IP Blocking + + Args: + handle (ImcHandle) + fail_count (str): Number of times a user can attempt to log in + unsuccessfully, before the system locks that user out + Range [3-10] attempts + fail_window (str): Length of time, in seconds, in which the + unsuccessful login attempts must occur in order + for the user to be locked out. + Range [60-120] seconds + penalty_time (str): The number of seconds the user remains locked out. + Range [300-900] seconds + kwargs: Key-Value paired arguments for future use + + Returns: + IpBlocking object + + Examples: + ip_blocking_enable(handle, fail_count='6', + fail_window='120', penalty_time='800') + """ + + mo = IpBlocking(parent_mo_or_dn=_get_mgmt_if_dn(handle)) + + params = { + 'enable': 'yes', + 'fail_count': str(fail_count), + 'fail_window': str(fail_window), + 'penalty_time': str(penalty_time) + } + mo.set_prop_multiple(**params) + mo.set_prop_multiple(**kwargs) + handle.set_mo(mo) + return handle.query_dn(mo.dn) + + +def ip_blocking_disable(handle): + """ + Disables IP Blocking + + Args: + handle (ImcHandle) + + Returns: + None + + Examples: + ip_blocking_disable(handle) + """ + mo = IpBlocking(parent_mo_or_dn=_get_mgmt_if_dn(handle)) + mo.enable = 'no' + handle.set_mo(mo) + + +def is_ip_blocking_enabled(handle): + """ + Checks if IP Blocking is enabled + + Args: + handle (ImcHandle) + + Returns: + bool + + Examples: + is_ip_blocking_enabled(handle) + """ + mo = IpBlocking(parent_mo_or_dn=_get_mgmt_if_dn(handle)) + mo = handle.query_dn(mo.dn) + return mo.enable.lower() in ['yes', 'true'] + + +def ip_blocking_exists(handle, **kwargs): + """ + Checks if IP blocking settings match according to the parameters specified. + + Args: + handle (ImcHandle) + kwargs: Key-Value paired arguments relevant to IpBlocking object + + Returns: + (True, IpBlocking object) if exists, else (False, None) + + Examples: + ip_blocking_exists(handle, fail_count='6', + fail_window='120', penalty_time='800') + """ + mo = IpBlocking(parent_mo_or_dn=_get_mgmt_if_dn(handle)) + mo = _get_mo(handle, dn=mo.dn) + + if mo.check_prop_match(**kwargs): + return (True, mo) + + return (False, None) + + +_IP_FILTER_LIST = ['filter1', 'filter2', 'filter3', 'filter4'] + + +def _set_ip_filters(mo, filters): + if len(filters) > len(_IP_FILTER_LIST): + raise ImcOperationError("Set Ip Filters", + "Cannot specify more than %d filters" % + len(_IP_FILTER_LIST)) + + params = {'filter' + str(x['id']): str(x['filter']) for x in filters} + mo.set_prop_multiple(**params) + + +def ip_filtering_enable(handle, filters=[]): + """ + Enables IP filtering with the filters specified + Connection is expected to drop. + + Args: + handle (ImcHandle) + filters (list): List of dictionaries of the format + {'id': 1, 'filter': "10.10.10.10", + 'id': 2, 'filter': "2.2.2.2-3.3.3.3"} + + Returns: + IpFiltering object + + Examples: + ip_filtering_enable(handle, filters=[{"id": 1, "filter": "1.1.1.0-255.255.255.255"}, + {"id": 2, "filter": "2.2.2.2"}]) + """ + log.warning('Changes to IP Filtering will be applied immediately. ' + 'Connectivity to Cisco IMC will be lost and a re-login is required.') + mo = IpFiltering(parent_mo_or_dn=_get_mgmt_if_dn(handle)) + mo.enable = 'yes' + _set_ip_filters(mo, filters) + + handle.set_mo(mo) + # Not returning the server copy as the connection drops after the above action + return mo + + +def ip_filtering_disable(handle): + """ + Disables IP filtering + + Args: + handle (ImcHandle) + + Returns: + None + + Examples: + ip_filtering_disable(handle) + """ + mo = IpFiltering(parent_mo_or_dn=_get_mgmt_if_dn(handle)) + mo.enable = 'no' + handle.set_mo(mo) + + +def is_ip_filtering_enabled(handle): + """ + Checks if IP Filtering is enabled + + Args: + handle (ImcHandle) + + Returns: + bool + + Examples: + is_ip_filtering_enabled(handle) + """ + mo = IpFiltering(parent_mo_or_dn=_get_mgmt_if_dn(handle)) + mo = handle.query_dn(mo.dn) + return mo.enable.lower() in ['yes', 'true'] + + +def ip_filtering_modify(handle, filters=[]): + """ + Modifies IP filtering with the filters specified + + Args: + handle (ImcHandle) + filters (list): List of dictionaries of the format + [{'id': 1, 'filter': '10.10.10.10'}, + {'id': 2, 'filter': '2.2.2.2-3.3.3.3'}] + + Returns: + IpFiltering object + + Examples: + ip_filtering_modify(handle, filters=[{'id': 1, 'filter': '10.10.10.10'}]) + """ + log.warning('Changes to IP Filtering will be applied immediately. ' + 'Connectivity to Cisco IMC will be lost and a re-login is required.') + mo = IpFiltering(parent_mo_or_dn=_get_mgmt_if_dn(handle)) + _set_ip_filters(mo, filters) + + handle.set_mo(mo) + # Not returning the server copy as the connection drops after the above action + return mo + + +def ip_filtering_clear(handle, filter_id=''): + """ + Clears the IP filters specified by + + Args: + handle (ImcHandle) + filter_id (str): String representing the filter id + + Returns: + IpFiltering object + + Examples: + ip_filtering_clear(handle, filter_id='3') + ip_filtering_clear(handle, filter_id='all') + """ + log.warning('Changes to IP Filtering will be applied immediately. ' + 'Connectivity to Cisco IMC maybe lost based on the filter being cleared') + mo = IpFiltering(parent_mo_or_dn=_get_mgmt_if_dn(handle)) + + if filter_id == 'all': + mo.admin_action = IpFilteringConsts.ADMIN_ACTION_CLEAR_ALL + handle.set_mo(mo) + # Not returning the server copy as the connection drops after the above action + return mo + + if not filter_id.isdigit(): + raise ImcOperationError("Clear Ip Filter", + "Filter-id must be a digit(1-%d) or 'all'" % + len(_IP_FILTER_LIST)) + + if int(filter_id) > len(_IP_FILTER_LIST): + raise ImcOperationError("Clear Ip Filter", + "Filter-id cannot be more than %d" % + len(_IP_FILTER_LIST)) + + mo.admin_action = 'clearFilter' + filter_id + handle.set_mo(mo) + # Not returning the server copy as the connection drops after the above action + return mo + + +def _check_ip_filter_match(ip_mo, mo): + for prop in _IP_FILTER_LIST: + configured_value = getattr(ip_mo, prop) + in_value = getattr(mo, prop) + if _is_invalid_value(configured_value) and \ + _is_invalid_value(in_value): + continue + if configured_value != in_value: + return False + + return True + + +def ip_filtering_exists(handle, **kwargs): + """ + Checks if IP filtering settings match according to the parameters specified. + + Args: + Args: + handle (ImcHandle) + kwargs: Key-Value paired arguments relevant to IpFiltering object + + Returns: + (True, IpFiltering object) if exists, else (False, None) + + Examples: + ip_filtering_exists(handle, enable='yes', + filters=[{"id": 1, "filter": "1.1.1.0-255.255.255.255"}, + {"id": 2, "filter": "2.2.2.2"}]) + """ + ip_mo = IpFiltering(parent_mo_or_dn=_get_mgmt_if_dn(handle)) + ip_mo = handle.query_dn(ip_mo.dn) + + if _is_valid_arg('enable', kwargs): + if ip_mo.enable.lower() != kwargs.get('enable').lower(): + return False, None + + if _is_valid_arg('filters', kwargs): + mo = IpFiltering(parent_mo_or_dn=_get_mgmt_if_dn(handle)) + _set_ip_filters(mo, kwargs.get('filters')) + if not _check_ip_filter_match(ip_mo, mo): + return False, None + + return True, ip_mo diff --git a/imcsdk/imcmo.py b/imcsdk/imcmo.py index 62f53b4f..98756835 100644 --- a/imcsdk/imcmo.py +++ b/imcsdk/imcmo.py @@ -200,6 +200,8 @@ def __set_prop(self, name, value, mark_dirty=True, forced=False): def check_prop_match(self, **kwargs): for prop_name in kwargs: + if kwargs[prop_name] is None: + continue if not imccoreutils.prop_exists(self, prop_name): raise ValueError("Invalid Property Name Exception - " "Class [%s]: Prop <%s> " diff --git a/tests/server/test_ip.py b/tests/server/test_ip.py new file mode 100644 index 00000000..bca03945 --- /dev/null +++ b/tests/server/test_ip.py @@ -0,0 +1,130 @@ +# Copyright 2016 Cisco Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from nose.tools import assert_equal +from ..connection.info import custom_setup, custom_teardown + +from imcsdk.apis.admin.ip import ip_blocking_enable, ip_blocking_disable, \ + is_ip_blocking_enabled, ip_blocking_exists,\ + ip_filtering_enable, ip_filtering_disable, \ + ip_filtering_modify, ip_filtering_clear, is_ip_filtering_enabled, \ + ip_filtering_exists + +handle = None +SLEEP_TIME = 25 + + +def setup_module(): + global handle + handle = custom_setup() + + +def teardown_module(): + global handle + custom_teardown(handle) + + +def test_ip_blocking_enable(): + ip_blocking_enable(handle, fail_count='6', fail_window='120', penalty_time='800') + assert_equal(is_ip_blocking_enabled(handle), True) + + +def test_ip_blocking_exists(): + match, mo = ip_blocking_exists(handle, + fail_count='6', + fail_window='120', + penalty_time='800') + assert_equal(match, True) + match, mo = ip_blocking_exists(handle, + fail_count='8', + fail_window='120', + penalty_time='500') + assert_equal(match, False) + + +def test_ip_blocking_disable(): + ip_blocking_disable(handle) + assert_equal(is_ip_blocking_enabled(handle), False) + + +ip_filters = [{"id": 1, "filter": "1.1.1.1-255.255.255.255"}, + {"id": 2, "filter": "2.2.2.2"}, + ] + +ip_filters_2 = [{"id": 1, "filter": "1.1.1.0-255.255.255.255"}, + {"id": 3, "filter": "2.2.2.5"}, + ] + + +def test_ip_filtering_enable(): + ip_filtering_clear(handle, filter_id='all') + handle.logout() + time.sleep(SLEEP_TIME) + handle.login() + ip_filtering_enable(handle, filters=ip_filters) + handle.logout() + time.sleep(SLEEP_TIME) + handle.login() + assert_equal(is_ip_filtering_enabled(handle), True) + + +def test_ip_filtering_exists(): + match, mo = ip_filtering_exists(handle, enable='yes', filters=ip_filters) + assert_equal(match, True) + match, mo = ip_filtering_exists(handle, enable='yes', filters=ip_filters_2) + assert_equal(match, False) + + +def test_ip_filtering_modify(): + ip_filtering_modify(handle, filters=ip_filters_2) + handle.logout() + time.sleep(SLEEP_TIME) + handle.login() + match, mo = ip_filtering_exists(handle, enable='yes', + filters=[{"id": 1, "filter": "1.1.1.0-255.255.255.255"}, + {"id": 2, "filter": "2.2.2.2"}, + {"id": 3, "filter": "2.2.2.5"}, + ]) + assert_equal(match, True) + + +def test_ip_filtering_clear(): + ip_filtering_clear(handle, filter_id='3') + handle.logout() + time.sleep(SLEEP_TIME) + handle.login() + match, mo = ip_filtering_exists(handle, enable='yes', + filters=[{"id": 1, "filter": "1.1.1.0-255.255.255.255"}, + {"id": 2, "filter": "2.2.2.2"} + ]) + assert_equal(match, True) + + +def test_ip_filtering_disable(): + ip_filtering_disable(handle) + handle.logout() + time.sleep(SLEEP_TIME) + handle.login() + assert_equal(is_ip_blocking_enabled(handle), False) + + +def test_ip_filtering_clear_all(): + ip_filtering_clear(handle, filter_id='all') + match, mo = ip_filtering_exists(handle, enable='no', + filters=[{"id": 1, "filter": ""}, + {"id": 2, "filter": ""} + ]) + assert_equal(match, True) + +