diff --git a/imcsdk/apis/admin/snmp.py b/imcsdk/apis/admin/snmp.py index 4108e6ff..26618d60 100644 --- a/imcsdk/apis/admin/snmp.py +++ b/imcsdk/apis/admin/snmp.py @@ -16,11 +16,14 @@ """ from imcsdk.imcexception import ImcOperationError +from imcsdk.apis.utils import _get_mo + +SNMP_DN = 'sys/svc-ext/snmp-svc' def snmp_enable(handle, community=None, privilege="disabled", trap_community=None, - sys_contact=None, sys_location=None, port="161"): + sys_contact=None, sys_location=None, port="161", **kwargs): """ Enables SNMP. @@ -32,6 +35,7 @@ def snmp_enable(handle, community=None, sys_contact (string): sys_contact sys_location (string): sys_location port (string): port on which SNMP agent runs + kwargs: key-value paired arguments for future use Returns: CommSnmp: Managed object @@ -48,27 +52,22 @@ def snmp_enable(handle, community=None, from imcsdk.mometa.comm.CommSnmp import CommSnmpConsts - dn = "sys/svc-ext/snmp-svc" - mo = handle.query_dn(dn) - if not mo: - raise ImcOperationError("snmp enable", "snmp config does not exist") - - mo.admin_state = CommSnmpConsts.ADMIN_STATE_ENABLED - - if community: - mo.community = community - if privilege: - mo.com2_sec = privilege - if trap_community: - mo.trap_community = trap_community - if sys_contact: - mo.sys_contact = sys_contact - if sys_location: - mo.sys_location = sys_location - mo.port = port + mo = _get_mo(handle, dn=SNMP_DN) + + params = { + 'admin_state': CommSnmpConsts.ADMIN_STATE_ENABLED, + 'community': community, + 'com2_sec': privilege, + 'trap_community': trap_community, + 'sys_contact': sys_contact, + 'sys_location': sys_location, + 'port': port, + } + mo.set_prop_multiple(**params) + mo.set_prop_multiple(**kwargs) handle.set_mo(mo) - return mo + return handle.query_dn(mo.dn) def snmp_disable(handle): @@ -90,19 +89,17 @@ def snmp_disable(handle): from imcsdk.mometa.comm.CommSnmp import CommSnmpConsts - dn = "sys/svc-ext/snmp-svc" - mo = handle.query_dn(dn) - if not mo: - raise ImcOperationError("snmp enable", "snmp config does not exist") + mo = _get_mo(handle, dn=SNMP_DN) mo.admin_state = CommSnmpConsts.ADMIN_STATE_DISABLED handle.set_mo(mo) return mo -def snmp_enabled(handle): +def is_snmp_enabled(handle): """ - This method checks if snmp is enabled or not + Checks if snmp is enabled or not + Args: handle (ImcHandle) @@ -111,27 +108,14 @@ def snmp_enabled(handle): """ from imcsdk.mometa.comm.CommSnmp import CommSnmpConsts - dn = "sys/svc-ext/snmp-svc" - mo = handle.query_dn(dn) - if not mo: - raise ImcOperationError("snmp enable", "snmp config does not exist") - + mo = _get_mo(handle, dn=SNMP_DN) return (mo.admin_state == CommSnmpConsts.ADMIN_STATE_ENABLED) def _get_free_snmp_trap_obj(handle): - """ - This is internally used by trap add to get a free, unconfigured trap object - - Args: - handle (ImcHandle) - - Returns: - CommSnmpTrap object - """ from imcsdk.mometa.comm.CommSnmpTrap import CommSnmpTrapConsts - traps = handle.query_children(in_dn="sys/svc-ext/snmp-svc", + traps = handle.query_children(in_dn=SNMP_DN, class_id="CommSnmpTrap") for trap in traps: if trap.admin_state == CommSnmpTrapConsts.ADMIN_STATE_DISABLED: @@ -141,7 +125,8 @@ def _get_free_snmp_trap_obj(handle): def snmp_trap_add(handle, hostname, port, version="v3", - notification_type="traps", user=None): + notification_type="traps", user=None, + **kwargs): """ Adds snmp trap. @@ -153,6 +138,7 @@ def snmp_trap_add(handle, hostname, port, version="v3", notification_type (string): "informs", "traps" Required only for version "v2c" and "v3" user (string): send traps for a specific user + kwargs: Key-Value paired arguments for future use Returns: CommSnmpTrap: Managed Object @@ -175,34 +161,34 @@ def snmp_trap_add(handle, hostname, port, version="v3", raise ImcOperationError("Snmp Trap Add", "No free traps available to configure") - mo.admin_state = CommSnmpTrapConsts.ADMIN_STATE_ENABLED - mo.hostname = hostname - mo.port = port - mo.version = version - mo.notification_type = notification_type + params = { + 'admin_state': CommSnmpTrapConsts.ADMIN_STATE_ENABLED, + 'hostname': hostname, + 'port': port, + 'version': version, + 'notification_type': notification_type, + } + + mo.set_prop_multiple(**params) + mo.set_prop_multiple(**kwargs) + if version != "v2c": mo.user = user handle.set_mo(mo) - return mo + return handle.query_dn(mo.dn) -def snmp_trap_exists(handle, hostname, port, version="v3", - notification_type="traps", user="unknown"): +def snmp_trap_exists(handle, **kwargs): """ checks if snmp trap exists Args: handle (ImcHandle) - hostname (string): ip address - port (number): port - version (string): "v2c", "v3" - notification_type (string): "informs", "traps" - Required only for version "v2c" and "v3" - user (string): username + kwargs: Key-Value paired arguments relevant to CommSnmpTrap object Returns: - Int: 0 if trap not found, else returns the trap-id. Range is (1,15) + True, trap-id(int) if found, else False, 0 Example: snmp_trap_exists(handle, hostname="10.10.10.10", @@ -213,35 +199,24 @@ def snmp_trap_exists(handle, hostname, port, version="v3", """ - traps = handle.query_children(in_dn="sys/svc-ext/snmp-svc", + traps = handle.query_children(in_dn=SNMP_DN, class_id="CommSnmpTrap") - if version == "v2c": - user = "unknown" for trap in traps: - if ((trap.hostname == hostname) and - (trap.port == port) and - (trap.version == version) and - (trap.notification_type == notification_type) and - (trap.user == user)): - return int(trap.id) - return 0 + if trap.check_prop_match(**kwargs): + return True, int(trap.id) + + return False, 0 -def snmp_trap_modify(handle, trap_id=0, hostname=None, port=None, - version=None, notification_type=None, user=None): +def snmp_trap_modify(handle, trap_id, **kwargs): """ Modifies snmp trap referred to by id Args: handle (ImcHandle) trap_id (int) : Range is (1,15) - hostname (string): ip address - port (number): port - version (string): "v2c", "v3" - notification_type (string): "informs", "traps" - Required only for version "v2c" and "v3" - user (string): username + kwargs : Key-Value paired arguments relevant to CommSnmpTrap object Returns: CommSnmpTrap: Managed Object @@ -257,27 +232,15 @@ def snmp_trap_modify(handle, trap_id=0, hostname=None, port=None, user="username") """ - dn = "sys/svc-ext/snmp-svc/snmp-trap-" + str(trap_id) - mo = handle.query_dn(dn) - if not mo: - raise ImcOperationError("Snmp Trap Modify", "Trap not found") - - if hostname: - mo.hostname = hostname - if port: - mo.port = port - if version: - mo.version = version - if notification_type: - mo.notification_type = notification_type - if user: - mo.user = user + dn = SNMP_DN + '/snmp-trap-' + str(trap_id) + mo = _get_mo(handle, dn=dn) + mo.set_prop_multiple(**kwargs) handle.set_mo(mo) - return mo + return handle.query_dn(mo.dn) -def snmp_trap_remove(handle, trap_id=0): +def snmp_trap_remove(handle, trap_id): """ Modifies snmp trap. @@ -297,10 +260,8 @@ def snmp_trap_remove(handle, trap_id=0): from imcsdk.mometa.comm.CommSnmpTrap import CommSnmpTrapConsts - dn = "sys/svc-ext/snmp-svc/snmp-trap-" + str(trap_id) - mo = handle.query_dn(dn) - if not mo: - raise ImcOperationError("Snmp Trap Remove", "Trap not found") + dn = SNMP_DN + '/snmp-trap-' + str(trap_id) + mo = _get_mo(handle, dn=dn) mo.admin_state = CommSnmpTrapConsts.ADMIN_STATE_DISABLED mo.admin_action = CommSnmpTrapConsts.ADMIN_ACTION_CLEAR @@ -308,24 +269,19 @@ def snmp_trap_remove(handle, trap_id=0): def _get_free_snmp_user(handle): - users = handle.query_children(in_dn="sys/svc-ext/snmp-svc", + users = handle.query_children(in_dn=SNMP_DN, class_id="CommSnmpUser") - free_user = None for user in users: if user.name == "": - free_user = user - break + return user - if free_user is None: - raise ImcOperationError("Snmp User Add", - "Maximum number of users already configured") - - return free_user + raise ImcOperationError("Snmp User Add", + "Maximum number of users already configured") def snmp_user_add(handle, name, security_level="authpriv", auth_pwd=None, auth="MD5", - priv_pwd=None, priv="AES"): + privacy_pwd=None, privacy="AES", **kwargs): """ Adds snmp user. @@ -335,8 +291,8 @@ def snmp_user_add(handle, name, security_level="authpriv", security_level (string): "authpriv", "authnopriv", "noauthnopriv" auth_pwd (string): password auth (string): "MD5", "SHA" - priv_pwd (string): privacy password - priv (string): "AES", "DES" + privacy_pwd (string): privacy password + privacy (string): "AES", "DES" Returns: CommSnmpUser: Managed Object @@ -347,23 +303,60 @@ def snmp_user_add(handle, name, security_level="authpriv", Example: snmp_user_add(handle, name="snmpuser", security_level="authpriv", auth_pwd="abcd", - auth="MD5", priv_pwd="xyz", priv="DES") + auth="MD5", privacy_pwd="xyz", privacy="DES") """ from imcsdk.mometa.comm.CommSnmpUser import CommSnmpUserConsts - free_user = _get_free_snmp_user(handle) + mo = _get_free_snmp_user(handle) + + params = { + 'name': name, + 'security_level': security_level + } + mo.set_prop_multiple(**params) + params = {} + if security_level == CommSnmpUserConsts.SECURITY_LEVEL_AUTHNOPRIV: + params = { + 'auth': auth, + 'auth_pwd': auth_pwd + } + + if security_level == CommSnmpUserConsts.SECURITY_LEVEL_AUTHPRIV: + params = { + 'auth': auth, + 'auth_pwd': auth_pwd, + 'privacy': privacy, + 'privacy_pwd': privacy_pwd + } + + mo.set_prop_multiple(**params) + mo.set_prop_multiple(**kwargs) + handle.set_mo(mo) + return handle.query_dn(mo.dn) + + +def snmp_user_get(handle, name): + """ + Gets the snmp user + + Args: + handle (ImcHandle) + name (str): snmp username - free_user.name = name - free_user.security_level = security_level - if security_level != CommSnmpUserConsts.SECURITY_LEVEL_NOAUTHNOPRIV: - free_user.auth_pwd = auth_pwd - free_user.auth = auth - free_user.privacy_pwd = priv_pwd - free_user.privacy = priv + Returns: + CommSnmpUser object - handle.set_mo(free_user) - return free_user + Examples: + user = snmp_user_get(handle, name = "snmp-user") + """ + users = handle.query_children(in_dn=SNMP_DN, + class_id="CommSnmpUser") + for user in users: + if user.name == name: + return user + + return None def snmp_user_exists(handle, name): @@ -375,35 +368,26 @@ def snmp_user_exists(handle, name): name (string): snmp username Returns: - Int: 0 is user does not exist, user-id(1-15) if user exists + True, user_id(int) if found, else False, 0 Example: snmp_user_exists(handle, name="snmpuser") """ - users = handle.query_children(in_dn="sys/svc-ext/snmp-svc", - class_id="CommSnmpUser") - for user in users: - if user.name == name: - return int(user.id) - return 0 + user = snmp_user_get(handle, name=name) + if user: + return (True, int(user.id)) + return (False, 0) -def snmp_user_modify(handle, user_id, username=None, security_level=None, - auth_pwd=None, auth=None, - priv_pwd=None, priv=None): +def snmp_user_modify(handle, user_id, **kwargs): """ Modifies snmp user. Use this after getting the id from snmp_user_exists Args: handle (ImcHandle) user_id (int) : unique id for the user - username (string): snmp username - security_level (string): "authpriv", "authnopriv", "noauthnopriv" - auth_pwd (string): password - auth (string): "MD5", "SHA" - priv_pwd (string): privacy password - priv (string): "AES", "DES" + kwargs: Key-Value paired arguments relevant to CommSnmpUser object Returns: CommSnmpUser: Managed Object @@ -412,31 +396,17 @@ def snmp_user_modify(handle, user_id, username=None, security_level=None, ImcOperationError: If user is not present Example: - snmp_user_modify(handle, user_id=1, username="snmpuser", + snmp_user_modify(handle, user_id=1, name="snmpuser", security_level="authpriv", auth_pwd="password", - auth="MD5", priv="AES", priv_pwd="password") + auth="MD5", privacy="AES", privacy_pwd="password") """ - dn = "sys/svc-ext/snmp-svc/snmpv3-user-" + str(user_id) - mo = handle.query_dn(dn) - if not mo: - raise ImcOperationError("Snmp User Modify", "User does not exist") - - if username: - mo.name = username - if security_level: - mo.security_level = security_level - if auth_pwd: - mo.auth_pwd = auth_pwd - if auth: - mo.auth = auth - if priv_pwd: - mo.privacy_pwd = priv_pwd - if priv: - mo.privacy = priv + dn = SNMP_DN + "/snmpv3-user-" + str(user_id) + mo = _get_mo(handle, dn=dn) + mo.set_prop_multiple(**kwargs) handle.set_mo(mo) - return mo + return handle.query_dn(mo.dn) def snmp_user_remove(handle, name): @@ -460,14 +430,7 @@ def snmp_user_remove(handle, name): from imcsdk.mometa.comm.CommSnmpUser import CommSnmpUserConsts - users = handle.query_children(in_dn="sys/svc-ext/snmp-svc", - class_id="CommSnmpUser") - found_user = None - for user in users: - if user.name == name: - found_user = user - break - + found_user = snmp_user_get(handle, name=name) if found_user is None: raise ImcOperationError("Snmp User Delete", "User does not exist") diff --git a/tests/server/test_user.py b/tests/server/test_user.py index 1ab9818b..bbc194c0 100644 --- a/tests/server/test_user.py +++ b/tests/server/test_user.py @@ -16,7 +16,7 @@ from imcsdk.apis.admin.user import local_user_create, local_user_delete, \ local_user_exists, strong_password_set, is_strong_password_set, \ password_expiration_set, password_expiration_exists -from imcsdk.apis.admin.snmp import snmp_enable, snmp_disable, snmp_enabled, \ +from imcsdk.apis.admin.snmp import snmp_enable, snmp_disable, is_snmp_enabled,\ snmp_user_add, snmp_user_exists, snmp_user_remove, snmp_user_modify, \ snmp_trap_add, snmp_trap_exists, snmp_trap_remove @@ -73,20 +73,34 @@ def test_snmp_enable(): snmp_enable(handle, community="test", privilege="full", trap_community="test-trap", sys_contact="abcd@pqrs.com", sys_location="somewhere") - assert_equal(snmp_enabled(handle), True) + assert_equal(is_snmp_enabled(handle), True) def test_snmp_user_create(): snmp_user = snmp_user_add(handle, name="test-snmp-user", security_level="authpriv", auth_pwd="Nbv-12345", auth="MD5", - priv_pwd="Nbv-12345", priv="AES") - assert_equal(snmp_user_exists( - handle, name="test-snmp-user"), int(snmp_user.id)) + privacy_pwd="Nbv-12345", privacy="AES") + match, user_id = snmp_user_exists(handle, name="test-snmp-user") + assert_equal(user_id, int(snmp_user.id)) + + snmp_user = snmp_user_add(handle, name="test-snmp-user2", + security_level="authnopriv", + auth_pwd="Nbv-12345", auth="MD5", + privacy_pwd="Nbv-12345", privacy="AES") + match, user_id = snmp_user_exists(handle, name="test-snmp-user2") + assert_equal(user_id, int(snmp_user.id)) + + snmp_user = snmp_user_add(handle, name="test-snmp-user3", + security_level="noauthnopriv", + auth_pwd="Nbv-12345", auth="MD5", + privacy_pwd="Nbv-12345", privacy="AES") + match, user_id = snmp_user_exists(handle, name="test-snmp-user3") + assert_equal(user_id, int(snmp_user.id)) def test_snmp_user_modify(): - snmp_user_id = snmp_user_exists(handle, name="test-snmp-user") + match, snmp_user_id = snmp_user_exists(handle, name="test-snmp-user") snmp_user = snmp_user_modify(handle, user_id=snmp_user_id, auth_pwd="Nbv-56789", security_level="authnopriv") @@ -99,24 +113,34 @@ def test_snmp_trap_create(): version="v3", notification_type="traps", user="test-snmp-user") snmp_trap_id = int(snmp_trap.id) - assert_equal(snmp_trap_exists(handle, hostname="2.2.2.2", port="3000", - version="v3", notification_type="traps", - user="test-snmp-user"), snmp_trap_id) + match, trap_id = snmp_trap_exists(handle, hostname="2.2.2.2", port="3000", + version="v3", notification_type="traps", + user="test-snmp-user") + assert_equal(trap_id, snmp_trap_id) def test_snmp_trap_delete(): - global snmp_trap_id snmp_trap_remove(handle, snmp_trap_id) - assert_equal(snmp_trap_exists(handle, hostname="2.2.2.2", port="3000", - version="v3", notification_type="traps", - user="test-snmp-user"), 0) + match, trap_id = snmp_trap_exists(handle, hostname="2.2.2.2", port="3000", + version="v3", notification_type="traps", + user="test-snmp-user") + assert_equal(trap_id, 0) def test_snmp_user_delete(): snmp_user_remove(handle, name="test-snmp-user") - assert_equal(snmp_user_exists(handle, name="test-snmp-user"), 0) + match, user_id = snmp_user_exists(handle, name="test-snmp-user") + assert_equal(user_id, 0) + + snmp_user_remove(handle, name="test-snmp-user2") + match, user_id = snmp_user_exists(handle, name="test-snmp-user2") + assert_equal(user_id, 0) + + snmp_user_remove(handle, name="test-snmp-user3") + match, user_id = snmp_user_exists(handle, name="test-snmp-user3") + assert_equal(user_id, 0) def test_snmp_disable(): snmp_disable(handle) - assert_equal(snmp_enabled(handle), False) + assert_equal(is_snmp_enabled(handle), False)