diff --git a/clear/main.py b/clear/main.py index 38dca2737f..cb1e3243b7 100755 --- a/clear/main.py +++ b/clear/main.py @@ -12,7 +12,7 @@ from show.plugins.pbh import read_pbh_counters from config.plugins.pbh import serialize_pbh_counters from . import plugins - +from . import stp # This is from the aliases example: # https://github.com/pallets/click/blob/57c6f09611fc47ca80db0bd010f05998b3c0aa95/examples/aliases/aliases.py class Config(object): @@ -145,6 +145,10 @@ def ipv6(): pass +# 'STP' +# +cli.add_command(stp.spanning_tree) + # # Inserting BGP functionality into cli's clear parse-chain. # BGP commands are determined by the routing-stack being elected. diff --git a/clear/stp.py b/clear/stp.py new file mode 100644 index 0000000000..c3e3a4b098 --- /dev/null +++ b/clear/stp.py @@ -0,0 +1,46 @@ +import click +import utilities_common.cli as clicommon + +# +# This group houses Spanning_tree commands and subgroups +# + + +@click.group(cls=clicommon.AliasedGroup) +@click.pass_context +def spanning_tree(ctx): + '''Clear Spanning-tree counters''' + pass + + +@spanning_tree.group('statistics', cls=clicommon.AliasedGroup, invoke_without_command=True) +@click.pass_context +def stp_clr_stats(ctx): + if ctx.invoked_subcommand is None: + command = 'sudo stpctl clrstsall' + clicommon.run_command(command) + + +@stp_clr_stats.command('interface') +@click.argument('interface_name', metavar='', required=True) +@click.pass_context +def stp_clr_stats_intf(ctx, interface_name): + command = 'sudo stpctl clrstsintf ' + interface_name + clicommon.run_command(command) + + +@stp_clr_stats.command('vlan') +@click.argument('vlan_id', metavar='', required=True) +@click.pass_context +def stp_clr_stats_vlan(ctx, vlan_id): + command = 'sudo stpctl clrstsvlan ' + vlan_id + clicommon.run_command(command) + + +@stp_clr_stats.command('vlan-interface') +@click.argument('vlan_id', metavar='', required=True) +@click.argument('interface_name', metavar='', required=True) +@click.pass_context +def stp_clr_stats_vlan_intf(ctx, vlan_id, interface_name): + command = 'sudo stpctl clrstsvlanintf ' + vlan_id + ' ' + interface_name + clicommon.run_command(command) diff --git a/config/main.py b/config/main.py index 9a71f1078b..26e7cc9554 100644 --- a/config/main.py +++ b/config/main.py @@ -66,7 +66,7 @@ from . import switchport from . import dns from . import bgp_cli - +from . import stp # mock masic APIs for unit test try: @@ -1434,7 +1434,10 @@ def config(ctx): config.add_command(vlan.vlan) config.add_command(vxlan.vxlan) -#add mclag commands +# add stp commands +config.add_command(stp.spanning_tree) + +# add mclag commands config.add_command(mclag.mclag) config.add_command(mclag.mclag_member) config.add_command(mclag.mclag_unique_ip) diff --git a/config/stp.py b/config/stp.py new file mode 100644 index 0000000000..85d7041847 --- /dev/null +++ b/config/stp.py @@ -0,0 +1,917 @@ + +# +# 'spanning-tree' group ('config spanning-tree ...') +# + +import click +import utilities_common.cli as clicommon +from natsort import natsorted +import logging + +STP_MIN_ROOT_GUARD_TIMEOUT = 5 +STP_MAX_ROOT_GUARD_TIMEOUT = 600 +STP_DEFAULT_ROOT_GUARD_TIMEOUT = 30 + +STP_MIN_FORWARD_DELAY = 4 +STP_MAX_FORWARD_DELAY = 30 +STP_DEFAULT_FORWARD_DELAY = 15 + +STP_MIN_HELLO_INTERVAL = 1 +STP_MAX_HELLO_INTERVAL = 10 +STP_DEFAULT_HELLO_INTERVAL = 2 + +STP_MIN_MAX_AGE = 6 +STP_MAX_MAX_AGE = 40 +STP_DEFAULT_MAX_AGE = 20 + +STP_MIN_BRIDGE_PRIORITY = 0 +STP_MAX_BRIDGE_PRIORITY = 61440 +STP_DEFAULT_BRIDGE_PRIORITY = 32768 + +PVST_MAX_INSTANCES = 255 + + +def get_intf_list_in_vlan_member_table(config_db): + """ + Get info from REDIS ConfigDB and create interface to vlan mapping + """ + get_int_vlan_configdb_info = config_db.get_table('VLAN_MEMBER') + int_list = [] + for key in get_int_vlan_configdb_info: + interface = key[1] + if interface not in int_list: + int_list.append(interface) + return int_list + +################################## +# STP parameter validations +################################## + + +def is_valid_root_guard_timeout(ctx, root_guard_timeout): + if root_guard_timeout not in range(STP_MIN_ROOT_GUARD_TIMEOUT, STP_MAX_ROOT_GUARD_TIMEOUT + 1): + ctx.fail("STP root guard timeout must be in range 5-600") + + +def is_valid_forward_delay(ctx, forward_delay): + if forward_delay not in range(STP_MIN_FORWARD_DELAY, STP_MAX_FORWARD_DELAY + 1): + ctx.fail("STP forward delay value must be in range 4-30") + + +def is_valid_hello_interval(ctx, hello_interval): + if hello_interval not in range(STP_MIN_HELLO_INTERVAL, STP_MAX_HELLO_INTERVAL + 1): + ctx.fail("STP hello timer must be in range 1-10") + + +def is_valid_max_age(ctx, max_age): + if max_age not in range(STP_MIN_MAX_AGE, STP_MAX_MAX_AGE + 1): + ctx.fail("STP max age value must be in range 6-40") + + +def is_valid_bridge_priority(ctx, priority): + if priority % 4096 != 0: + ctx.fail("STP bridge priority must be multiple of 4096") + if priority not in range(STP_MIN_BRIDGE_PRIORITY, STP_MAX_BRIDGE_PRIORITY + 1): + ctx.fail("STP bridge priority must be in range 0-61440") + + +def validate_params(forward_delay, max_age, hello_time): + if (2 * (int(forward_delay) - 1)) >= int(max_age) >= (2 * (int(hello_time) + 1)): + return True + else: + return False + + +def is_valid_stp_vlan_parameters(ctx, db, vlan_name, param_type, new_value): + stp_vlan_entry = db.get_entry('STP_VLAN', vlan_name) + cfg_vlan_forward_delay = stp_vlan_entry.get("forward_delay") + cfg_vlan_max_age = stp_vlan_entry.get("max_age") + cfg_vlan_hello_time = stp_vlan_entry.get("hello_time") + ret_val = False + if param_type == "forward_delay": + ret_val = validate_params(new_value, cfg_vlan_max_age, cfg_vlan_hello_time) + elif param_type == "max_age": + ret_val = validate_params(cfg_vlan_forward_delay, new_value, cfg_vlan_hello_time) + elif param_type == "hello_time": + ret_val = validate_params(cfg_vlan_forward_delay, cfg_vlan_max_age, new_value) + + if ret_val is not True: + ctx.fail("2*(forward_delay-1) >= max_age >= 2*(hello_time +1 ) not met for VLAN") + + +def is_valid_stp_global_parameters(ctx, db, param_type, new_value): + stp_global_entry = db.get_entry('STP', "GLOBAL") + cfg_forward_delay = stp_global_entry.get("forward_delay") + cfg_max_age = stp_global_entry.get("max_age") + cfg_hello_time = stp_global_entry.get("hello_time") + ret_val = False + if param_type == "forward_delay": + ret_val = validate_params(new_value, cfg_max_age, cfg_hello_time) + elif param_type == "max_age": + ret_val = validate_params(cfg_forward_delay, new_value, cfg_hello_time) + elif param_type == "hello_time": + ret_val = validate_params(cfg_forward_delay, cfg_max_age, new_value) + + if ret_val is not True: + ctx.fail("2*(forward_delay-1) >= max_age >= 2*(hello_time +1 ) not met") + + +def get_max_stp_instances(): + return PVST_MAX_INSTANCES + + +def update_stp_vlan_parameter(ctx, db, param_type, new_value): + stp_global_entry = db.get_entry('STP', "GLOBAL") + + allowed_params = {"priority", "max_age", "hello_time", "forward_delay"} + if param_type not in allowed_params: + ctx.fail("Invalid parameter") + + current_global_value = stp_global_entry.get("forward_delay") + + vlan_dict = db.get_table('STP_VLAN') + for vlan in vlan_dict.keys(): + vlan_entry = db.get_entry('STP_VLAN', vlan) + current_vlan_value = vlan_entry.get(param_type) + if current_global_value == current_vlan_value: + db.mod_entry('STP_VLAN', vlan, {param_type: new_value}) + + +def check_if_vlan_exist_in_db(db, ctx, vid): + vlan_name = 'Vlan{}'.format(vid) + vlan = db.get_entry('VLAN', vlan_name) + if len(vlan) == 0: + ctx.fail("{} doesn't exist".format(vlan_name)) + + +def enable_stp_for_vlans(db): + vlan_count = 0 + fvs = {'enabled': 'true', + 'forward_delay': get_global_stp_forward_delay(db), + 'hello_time': get_global_stp_hello_time(db), + 'max_age': get_global_stp_max_age(db), + 'priority': get_global_stp_priority(db) + } + vlan_dict = natsorted(db.get_table('VLAN')) + max_stp_instances = get_max_stp_instances() + for vlan_key in vlan_dict: + if vlan_count >= max_stp_instances: + logging.warning("Exceeded maximum STP configurable VLAN instances for {}".format(vlan_key)) + break + db.set_entry('STP_VLAN', vlan_key, fvs) + vlan_count += 1 + + +def get_stp_enabled_vlan_count(db): + count = 0 + stp_vlan_keys = db.get_table('STP_VLAN').keys() + for key in stp_vlan_keys: + if db.get_entry('STP_VLAN', key).get('enabled') == 'true': + count += 1 + return count + + +def vlan_enable_stp(db, vlan_name): + fvs = {'enabled': 'true', + 'forward_delay': get_global_stp_forward_delay(db), + 'hello_time': get_global_stp_hello_time(db), + 'max_age': get_global_stp_max_age(db), + 'priority': get_global_stp_priority(db) + } + if is_global_stp_enabled(db): + if get_stp_enabled_vlan_count(db) < get_max_stp_instances(): + db.set_entry('STP_VLAN', vlan_name, fvs) + else: + logging.warning("Exceeded maximum STP configurable VLAN instances for {}".format(vlan_name)) + + +def interface_enable_stp(db, interface_name): + fvs = {'enabled': 'true', + 'root_guard': 'false', + 'bpdu_guard': 'false', + 'bpdu_guard_do_disable': 'false', + 'portfast': 'false', + 'uplink_fast': 'false' + } + if is_global_stp_enabled(db): + db.set_entry('STP_PORT', interface_name, fvs) + + +def is_vlan_configured_interface(db, interface_name): + intf_to_vlan_list = get_vlan_list_for_interface(db, interface_name) + if intf_to_vlan_list: # if empty + return True + else: + return False + + +def is_interface_vlan_member(db, vlan_name, interface_name): + ctx = click.get_current_context() + key = vlan_name + '|' + interface_name + entry = db.get_entry('VLAN_MEMBER', key) + if len(entry) == 0: # if empty + ctx.fail("{} is not member of {}".format(interface_name, vlan_name)) + + +def get_vlan_list_for_interface(db, interface_name): + vlan_intf_info = db.get_table('VLAN_MEMBER') + vlan_list = [] + for line in vlan_intf_info: + if interface_name == line[1]: + vlan_name = line[0] + vlan_list.append(vlan_name) + return vlan_list + + +def get_pc_member_port_list(db): + pc_member_info = db.get_table('PORTCHANNEL_MEMBER') + pc_member_port_list = [] + for line in pc_member_info: + intf_name = line[1] + pc_member_port_list.append(intf_name) + return pc_member_port_list + + +def get_vlan_list_from_stp_vlan_intf_table(db, intf_name): + stp_vlan_intf_info = db.get_table('STP_VLAN_PORT') + vlan_list = [] + for line in stp_vlan_intf_info: + if line[1] == intf_name: + vlan_list.append(line[0]) + return vlan_list + + +def get_intf_list_from_stp_vlan_intf_table(db, vlan_name): + stp_vlan_intf_info = db.get_table('STP_VLAN_PORT') + intf_list = [] + for line in stp_vlan_intf_info: + if line[0] == vlan_name: + intf_list.append(line[1]) + return intf_list + + +def is_portchannel_member_port(db, interface_name): + return interface_name in get_pc_member_port_list(db) + + +def enable_stp_for_interfaces(db): + fvs = {'enabled': 'true', + 'root_guard': 'false', + 'bpdu_guard': 'false', + 'bpdu_guard_do_disable': 'false', + 'portfast': 'false', + 'uplink_fast': 'false' + } + port_dict = natsorted(db.get_table('PORT')) + intf_list_in_vlan_member_table = get_intf_list_in_vlan_member_table(db) + + for port_key in port_dict: + if port_key in intf_list_in_vlan_member_table: + db.set_entry('STP_PORT', port_key, fvs) + + po_ch_dict = natsorted(db.get_table('PORTCHANNEL')) + for po_ch_key in po_ch_dict: + if po_ch_key in intf_list_in_vlan_member_table: + db.set_entry('STP_PORT', po_ch_key, fvs) + + +def is_global_stp_enabled(db): + stp_entry = db.get_entry('STP', "GLOBAL") + mode = stp_entry.get("mode") + if mode: + return True + else: + return False + + +def check_if_global_stp_enabled(db, ctx): + if not is_global_stp_enabled(db): + ctx.fail("Global STP is not enabled - first configure STP mode") + + +def get_global_stp_mode(db): + stp_entry = db.get_entry('STP', "GLOBAL") + mode = stp_entry.get("mode") + return mode + + +def get_global_stp_forward_delay(db): + stp_entry = db.get_entry('STP', "GLOBAL") + forward_delay = stp_entry.get("forward_delay") + return forward_delay + + +def get_global_stp_hello_time(db): + stp_entry = db.get_entry('STP', "GLOBAL") + hello_time = stp_entry.get("hello_time") + return hello_time + + +def get_global_stp_max_age(db): + stp_entry = db.get_entry('STP', "GLOBAL") + max_age = stp_entry.get("max_age") + return max_age + + +def get_global_stp_priority(db): + stp_entry = db.get_entry('STP', "GLOBAL") + priority = stp_entry.get("priority") + return priority + + +@click.group() +@clicommon.pass_db +def spanning_tree(_db): + """STP command line""" + pass + + +############################################### +# STP Global commands implementation +############################################### + +# cmd: STP enable +@spanning_tree.command('enable') +@click.argument('mode', metavar='', required=True, type=click.Choice(["pvst"])) +@clicommon.pass_db +def spanning_tree_enable(_db, mode): + """enable STP """ + ctx = click.get_current_context() + db = _db.cfgdb + if mode == "pvst" and get_global_stp_mode(db) == "pvst": + ctx.fail("PVST is already configured") + fvs = {'mode': mode, + 'rootguard_timeout': STP_DEFAULT_ROOT_GUARD_TIMEOUT, + 'forward_delay': STP_DEFAULT_FORWARD_DELAY, + 'hello_time': STP_DEFAULT_HELLO_INTERVAL, + 'max_age': STP_DEFAULT_MAX_AGE, + 'priority': STP_DEFAULT_BRIDGE_PRIORITY + } + db.set_entry('STP', "GLOBAL", fvs) + # Enable STP for VLAN by default + enable_stp_for_interfaces(db) + enable_stp_for_vlans(db) + + +# cmd: STP disable +@spanning_tree.command('disable') +@click.argument('mode', metavar='', required=True, type=click.Choice(["pvst"])) +@clicommon.pass_db +def stp_disable(_db, mode): + """disable STP """ + db = _db.cfgdb + db.set_entry('STP', "GLOBAL", None) + # Disable STP for all VLANs and interfaces + db.delete_table('STP_VLAN') + db.delete_table('STP_PORT') + db.delete_table('STP_VLAN_PORT') + if get_global_stp_mode(db) == "pvst": + print("Error PVST disable failed") + + +# cmd: STP global root guard timeout +@spanning_tree.command('root_guard_timeout') +@click.argument('root_guard_timeout', metavar='<5-600 seconds>', required=True, type=int) +@clicommon.pass_db +def stp_global_root_guard_timeout(_db, root_guard_timeout): + """Configure STP global root guard timeout value""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_global_stp_enabled(db, ctx) + is_valid_root_guard_timeout(ctx, root_guard_timeout) + db.mod_entry('STP', "GLOBAL", {'rootguard_timeout': root_guard_timeout}) + + +# cmd: STP global forward delay +@spanning_tree.command('forward_delay') +@click.argument('forward_delay', metavar='<4-30 seconds>', required=True, type=int) +@clicommon.pass_db +def stp_global_forward_delay(_db, forward_delay): + """Configure STP global forward delay""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_global_stp_enabled(db, ctx) + is_valid_forward_delay(ctx, forward_delay) + is_valid_stp_global_parameters(ctx, db, "forward_delay", forward_delay) + update_stp_vlan_parameter(ctx, db, "forward_delay", forward_delay) + db.mod_entry('STP', "GLOBAL", {'forward_delay': forward_delay}) + + +# cmd: STP global hello interval +@spanning_tree.command('hello') +@click.argument('hello_interval', metavar='<1-10 seconds>', required=True, type=int) +@clicommon.pass_db +def stp_global_hello_interval(_db, hello_interval): + """Configure STP global hello interval""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_global_stp_enabled(db, ctx) + is_valid_hello_interval(ctx, hello_interval) + is_valid_stp_global_parameters(ctx, db, "hello_time", hello_interval) + update_stp_vlan_parameter(ctx, db, "hello_time", hello_interval) + db.mod_entry('STP', "GLOBAL", {'hello_time': hello_interval}) + + +# cmd: STP global max age +@spanning_tree.command('max_age') +@click.argument('max_age', metavar='<6-40 seconds>', required=True, type=int) +@clicommon.pass_db +def stp_global_max_age(_db, max_age): + """Configure STP global max_age""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_global_stp_enabled(db, ctx) + is_valid_max_age(ctx, max_age) + is_valid_stp_global_parameters(ctx, db, "max_age", max_age) + update_stp_vlan_parameter(ctx, db, "max_age", max_age) + db.mod_entry('STP', "GLOBAL", {'max_age': max_age}) + + +# cmd: STP global bridge priority +@spanning_tree.command('priority') +@click.argument('priority', metavar='<0-61440>', required=True, type=int) +@clicommon.pass_db +def stp_global_priority(_db, priority): + """Configure STP global bridge priority""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_global_stp_enabled(db, ctx) + is_valid_bridge_priority(ctx, priority) + update_stp_vlan_parameter(ctx, db, "priority", priority) + db.mod_entry('STP', "GLOBAL", {'priority': priority}) + + +############################################### +# STP VLAN commands implementation +############################################### +@spanning_tree.group('vlan') +@clicommon.pass_db +def spanning_tree_vlan(_db): + """Configure STP for a VLAN""" + pass + + +def is_stp_enabled_for_vlan(db, vlan_name): + stp_entry = db.get_entry('STP_VLAN', vlan_name) + stp_enabled = stp_entry.get("enabled") + if stp_enabled == "true": + return True + else: + return False + + +def check_if_stp_enabled_for_vlan(ctx, db, vlan_name): + if not is_stp_enabled_for_vlan(db, vlan_name): + ctx.fail("STP is not enabled for VLAN") + + +@spanning_tree_vlan.command('enable') +@click.argument('vid', metavar='', required=True, type=int) +@clicommon.pass_db +def stp_vlan_enable(_db, vid): + """Enable STP for a VLAN""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_vlan_exist_in_db(db, ctx, vid) + vlan_name = 'Vlan{}'.format(vid) + if is_stp_enabled_for_vlan(db, vlan_name): + ctx.fail("STP is already enabled for " + vlan_name) + if get_stp_enabled_vlan_count(db) >= get_max_stp_instances(): + ctx.fail("Exceeded maximum STP configurable VLAN instances") + check_if_global_stp_enabled(db, ctx) + # when enabled for first time, create VLAN entry with + # global values - else update only VLAN STP state + stp_vlan_entry = db.get_entry('STP_VLAN', vlan_name) + if len(stp_vlan_entry) == 0: + fvs = {'enabled': 'true', + 'forward_delay': get_global_stp_forward_delay(db), + 'hello_time': get_global_stp_hello_time(db), + 'max_age': get_global_stp_max_age(db), + 'priority': get_global_stp_priority(db) + } + db.set_entry('STP_VLAN', vlan_name, fvs) + else: + db.mod_entry('STP_VLAN', vlan_name, {'enabled': 'true'}) + # Refresh stp_vlan_intf entry for vlan + for vlan, intf in db.get_table('STP_VLAN_PORT'): + if vlan == vlan_name: + vlan_intf_key = "{}|{}".format(vlan_name, intf) + vlan_intf_entry = db.get_entry('STP_VLAN_PORT', vlan_intf_key) + db.mod_entry('STP_VLAN_PORT', vlan_intf_key, vlan_intf_entry) + + +@spanning_tree_vlan.command('disable') +@click.argument('vid', metavar='', required=True, type=int) +@clicommon.pass_db +def stp_vlan_disable(_db, vid): + """Disable STP for a VLAN""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_vlan_exist_in_db(db, ctx, vid) + vlan_name = 'Vlan{}'.format(vid) + db.mod_entry('STP_VLAN', vlan_name, {'enabled': 'false'}) + + +@spanning_tree_vlan.command('forward_delay') +@click.argument('vid', metavar='', required=True, type=int) +@click.argument('forward_delay', metavar='<4-30 seconds>', required=True, type=int) +@clicommon.pass_db +def stp_vlan_forward_delay(_db, vid, forward_delay): + """Configure STP forward delay for VLAN""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_vlan_exist_in_db(db, ctx, vid) + vlan_name = 'Vlan{}'.format(vid) + check_if_stp_enabled_for_vlan(ctx, db, vlan_name) + is_valid_forward_delay(ctx, forward_delay) + is_valid_stp_vlan_parameters(ctx, db, vlan_name, "forward_delay", forward_delay) + db.mod_entry('STP_VLAN', vlan_name, {'forward_delay': forward_delay}) + + +@spanning_tree_vlan.command('hello') +@click.argument('vid', metavar='', required=True, type=int) +@click.argument('hello_interval', metavar='<1-10 seconds>', required=True, type=int) +@clicommon.pass_db +def stp_vlan_hello_interval(_db, vid, hello_interval): + """Configure STP hello interval for VLAN""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_vlan_exist_in_db(db, ctx, vid) + vlan_name = 'Vlan{}'.format(vid) + check_if_stp_enabled_for_vlan(ctx, db, vlan_name) + is_valid_hello_interval(ctx, hello_interval) + is_valid_stp_vlan_parameters(ctx, db, vlan_name, "hello_time", hello_interval) + db.mod_entry('STP_VLAN', vlan_name, {'hello_time': hello_interval}) + + +@spanning_tree_vlan.command('max_age') +@click.argument('vid', metavar='', required=True, type=int) +@click.argument('max_age', metavar='<6-40 seconds>', required=True, type=int) +@clicommon.pass_db +def stp_vlan_max_age(_db, vid, max_age): + """Configure STP max age for VLAN""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_vlan_exist_in_db(db, ctx, vid) + vlan_name = 'Vlan{}'.format(vid) + check_if_stp_enabled_for_vlan(ctx, db, vlan_name) + is_valid_max_age(ctx, max_age) + is_valid_stp_vlan_parameters(ctx, db, vlan_name, "max_age", max_age) + db.mod_entry('STP_VLAN', vlan_name, {'max_age': max_age}) + + +@spanning_tree_vlan.command('priority') +@click.argument('vid', metavar='', required=True, type=int) +@click.argument('priority', metavar='<0-61440>', required=True, type=int) +@clicommon.pass_db +def stp_vlan_priority(_db, vid, priority): + """Configure STP bridge priority for VLAN""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_vlan_exist_in_db(db, ctx, vid) + vlan_name = 'Vlan{}'.format(vid) + check_if_stp_enabled_for_vlan(ctx, db, vlan_name) + is_valid_bridge_priority(ctx, priority) + db.mod_entry('STP_VLAN', vlan_name, {'priority': priority}) + + +############################################### +# STP interface commands implementation +############################################### + + +def is_stp_enabled_for_interface(db, intf_name): + stp_entry = db.get_entry('STP_PORT', intf_name) + stp_enabled = stp_entry.get("enabled") + if stp_enabled == "true": + return True + else: + return False + + +def check_if_stp_enabled_for_interface(ctx, db, intf_name): + if not is_stp_enabled_for_interface(db, intf_name): + ctx.fail("STP is not enabled for interface {}".format(intf_name)) + + +def check_if_interface_is_valid(ctx, db, interface_name): + from config.main import interface_name_is_valid + if interface_name_is_valid(db, interface_name) is False: + ctx.fail("Interface name is invalid. Please enter a valid interface name!!") + for key in db.get_table('INTERFACE'): + if type(key) != tuple: + continue + if key[0] == interface_name: + ctx.fail(" {} has ip address {} configured - It's not a L2 interface".format(interface_name, key[1])) + if is_portchannel_member_port(db, interface_name): + ctx.fail(" {} is a portchannel member port - STP can't be configured".format(interface_name)) + if not is_vlan_configured_interface(db, interface_name): + ctx.fail(" {} has no VLAN configured - It's not a L2 interface".format(interface_name)) + + +@spanning_tree.group('interface') +@clicommon.pass_db +def spanning_tree_interface(_db): + """Configure STP for interface""" + pass + + +@spanning_tree_interface.command('enable') +@click.argument('interface_name', metavar='', required=True) +@clicommon.pass_db +def stp_interface_enable(_db, interface_name): + """Enable STP for interface""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_global_stp_enabled(db, ctx) + if is_stp_enabled_for_interface(db, interface_name): + ctx.fail("STP is already enabled for " + interface_name) + check_if_interface_is_valid(ctx, db, interface_name) + stp_intf_entry = db.get_entry('STP_PORT', interface_name) + if len(stp_intf_entry) == 0: + fvs = {'enabled': 'true', + 'root_guard': 'false', + 'bpdu_guard': 'false', + 'bpdu_guard_do_disable': 'false', + 'portfast': 'false', + 'uplink_fast': 'false'} + db.set_entry('STP_PORT', interface_name, fvs) + else: + db.mod_entry('STP_PORT', interface_name, {'enabled': 'true'}) + + +@spanning_tree_interface.command('disable') +@click.argument('interface_name', metavar='', required=True) +@clicommon.pass_db +def stp_interface_disable(_db, interface_name): + """Disable STP for interface""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_global_stp_enabled(db, ctx) + check_if_interface_is_valid(ctx, db, interface_name) + db.mod_entry('STP_PORT', interface_name, {'enabled': 'false'}) + + +# STP interface port priority +STP_INTERFACE_MIN_PRIORITY = 0 +STP_INTERFACE_MAX_PRIORITY = 240 +STP_INTERFACE_DEFAULT_PRIORITY = 128 + + +def is_valid_interface_priority(ctx, intf_priority): + if intf_priority not in range(STP_INTERFACE_MIN_PRIORITY, STP_INTERFACE_MAX_PRIORITY + 1): + ctx.fail("STP interface priority must be in range 0-240") + + +@spanning_tree_interface.command('priority') +@click.argument('interface_name', metavar='', required=True) +@click.argument('priority', metavar='<0-240>', required=True, type=int) +@clicommon.pass_db +def stp_interface_priority(_db, interface_name, priority): + """Configure STP port priority for interface""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_stp_enabled_for_interface(ctx, db, interface_name) + check_if_interface_is_valid(ctx, db, interface_name) + is_valid_interface_priority(ctx, priority) + curr_intf_proirty = db.get_entry('STP_PORT', interface_name).get('priority') + db.mod_entry('STP_PORT', interface_name, {'priority': priority}) + # update interface priority in all stp_vlan_intf entries if entry exists + for vlan, intf in db.get_table('STP_VLAN_PORT'): + if intf == interface_name: + vlan_intf_key = "{}|{}".format(vlan, interface_name) + vlan_intf_entry = db.get_entry('STP_VLAN_PORT', vlan_intf_key) + if len(vlan_intf_entry) != 0: + vlan_intf_priority = vlan_intf_entry.get('priority') + if curr_intf_proirty == vlan_intf_priority: + db.mod_entry('STP_VLAN_PORT', vlan_intf_key, {'priority': priority}) + # end + + +# STP interface port path cost +STP_INTERFACE_MIN_PATH_COST = 1 +STP_INTERFACE_MAX_PATH_COST = 200000000 + + +def is_valid_interface_path_cost(ctx, intf_path_cost): + if intf_path_cost < STP_INTERFACE_MIN_PATH_COST or intf_path_cost > STP_INTERFACE_MAX_PATH_COST: + ctx.fail("STP interface path cost must be in range 1-200000000") + + +@spanning_tree_interface.command('cost') +@click.argument('interface_name', metavar='', required=True) +@click.argument('cost', metavar='<1-200000000>', required=True, type=int) +@clicommon.pass_db +def stp_interface_path_cost(_db, interface_name, cost): + """Configure STP path cost for interface""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_stp_enabled_for_interface(ctx, db, interface_name) + check_if_interface_is_valid(ctx, db, interface_name) + is_valid_interface_path_cost(ctx, cost) + curr_intf_cost = db.get_entry('STP_PORT', interface_name).get('path_cost') + db.mod_entry('STP_PORT', interface_name, {'path_cost': cost}) + # update interface path_cost in all stp_vlan_intf entries if entry exists + for vlan, intf in db.get_table('STP_VLAN_PORT'): + if intf == interface_name: + vlan_intf_key = "{}|{}".format(vlan, interface_name) + vlan_intf_entry = db.get_entry('STP_VLAN_PORT', vlan_intf_key) + if len(vlan_intf_entry) != 0: + vlan_intf_cost = vlan_intf_entry.get('path_cost') + if curr_intf_cost == vlan_intf_cost: + db.mod_entry('STP_VLAN_PORT', vlan_intf_key, {'path_cost': cost}) + # end + + +# STP interface root guard +@spanning_tree_interface.group('root_guard') +@clicommon.pass_db +def spanning_tree_interface_root_guard(_db): + """Configure STP root guard for interface""" + pass + + +@spanning_tree_interface_root_guard.command('enable') +@click.argument('interface_name', metavar='', required=True) +@clicommon.pass_db +def stp_interface_root_guard_enable(_db, interface_name): + """Enable STP root guard for interface""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_stp_enabled_for_interface(ctx, db, interface_name) + check_if_interface_is_valid(ctx, db, interface_name) + db.mod_entry('STP_PORT', interface_name, {'root_guard': 'true'}) + + +@spanning_tree_interface_root_guard.command('disable') +@click.argument('interface_name', metavar='', required=True) +@clicommon.pass_db +def stp_interface_root_guard_disable(_db, interface_name): + """Disable STP root guard for interface""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_stp_enabled_for_interface(ctx, db, interface_name) + check_if_interface_is_valid(ctx, db, interface_name) + db.mod_entry('STP_PORT', interface_name, {'root_guard': 'false'}) + + +# STP interface bpdu guard +@spanning_tree_interface.group('bpdu_guard') +@clicommon.pass_db +def spanning_tree_interface_bpdu_guard(_db): + """Configure STP bpdu guard for interface""" + pass + + +@spanning_tree_interface_bpdu_guard.command('enable') +@click.argument('interface_name', metavar='', required=True) +@click.option('-s', '--shutdown', is_flag=True) +@clicommon.pass_db +def stp_interface_bpdu_guard_enable(_db, interface_name, shutdown): + """Enable STP bpdu guard for interface""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_stp_enabled_for_interface(ctx, db, interface_name) + check_if_interface_is_valid(ctx, db, interface_name) + if shutdown is True: + bpdu_guard_do_disable = 'true' + else: + bpdu_guard_do_disable = 'false' + fvs = {'bpdu_guard': 'true', + 'bpdu_guard_do_disable': bpdu_guard_do_disable} + db.mod_entry('STP_PORT', interface_name, fvs) + + +@spanning_tree_interface_bpdu_guard.command('disable') +@click.argument('interface_name', metavar='', required=True) +@clicommon.pass_db +def stp_interface_bpdu_guard_disable(_db, interface_name): + """Disable STP bpdu guard for interface""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_stp_enabled_for_interface(ctx, db, interface_name) + check_if_interface_is_valid(ctx, db, interface_name) + db.mod_entry('STP_PORT', interface_name, {'bpdu_guard': 'false'}) + + +# STP interface portfast +@spanning_tree_interface.group('portfast') +@clicommon.pass_db +def spanning_tree_interface_portfast(_db): + """Configure STP portfast for interface""" + pass + + +@spanning_tree_interface_portfast.command('enable') +@click.argument('interface_name', metavar='', required=True) +@clicommon.pass_db +def stp_interface_portfast_enable(_db, interface_name): + """Enable STP portfast for interface""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_stp_enabled_for_interface(ctx, db, interface_name) + check_if_interface_is_valid(ctx, db, interface_name) + db.mod_entry('STP_PORT', interface_name, {'portfast': 'true'}) + + +@spanning_tree_interface_portfast.command('disable') +@click.argument('interface_name', metavar='', required=True) +@clicommon.pass_db +def stp_interface_portfast_disable(_db, interface_name): + """Disable STP portfast for interface""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_stp_enabled_for_interface(ctx, db, interface_name) + check_if_interface_is_valid(ctx, db, interface_name) + db.mod_entry('STP_PORT', interface_name, {'portfast': 'false'}) + + +# STP interface root uplink_fast +@spanning_tree_interface.group('uplink_fast') +@clicommon.pass_db +def spanning_tree_interface_uplink_fast(_db): + """Configure STP uplink fast for interface""" + pass + + +@spanning_tree_interface_uplink_fast.command('enable') +@click.argument('interface_name', metavar='', required=True) +@clicommon.pass_db +def stp_interface_uplink_fast_enable(_db, interface_name): + """Enable STP uplink fast for interface""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_stp_enabled_for_interface(ctx, db, interface_name) + check_if_interface_is_valid(ctx, db, interface_name) + db.mod_entry('STP_PORT', interface_name, {'uplink_fast': 'true'}) + + +@spanning_tree_interface_uplink_fast.command('disable') +@click.argument('interface_name', metavar='', required=True) +@clicommon.pass_db +def stp_interface_uplink_fast_disable(_db, interface_name): + """Disable STP uplink fast for interface""" + ctx = click.get_current_context() + db = _db.cfgdb + check_if_stp_enabled_for_interface(ctx, db, interface_name) + check_if_interface_is_valid(ctx, db, interface_name) + db.mod_entry('STP_PORT', interface_name, {'uplink_fast': 'false'}) + + +############################################### +# STP interface per VLAN commands implementation +############################################### +@spanning_tree_vlan.group('interface') +@clicommon.pass_db +def spanning_tree_vlan_interface(_db): + """Configure STP parameters for interface per VLAN""" + pass + + +# STP interface per vlan port priority +def is_valid_vlan_interface_priority(ctx, priority): + if priority not in range(STP_INTERFACE_MIN_PRIORITY, STP_INTERFACE_MAX_PRIORITY + 1): + ctx.fail("STP per vlan port priority must be in range 0-240") + + +@spanning_tree_vlan_interface.command('priority') +@click.argument('vid', metavar='', required=True, type=int) +@click.argument('interface_name', metavar='', required=True) +@click.argument('priority', metavar='<0-240>', required=True, type=int) +@clicommon.pass_db +def stp_vlan_interface_priority(_db, vid, interface_name, priority): + """Configure STP per vlan port priority for interface""" + ctx = click.get_current_context() + db = _db.cfgdb + vlan_name = 'Vlan{}'.format(vid) + check_if_stp_enabled_for_vlan(ctx, db, vlan_name) + check_if_stp_enabled_for_interface(ctx, db, interface_name) + check_if_vlan_exist_in_db(db, ctx, vid) + is_interface_vlan_member(db, vlan_name, interface_name) + is_valid_vlan_interface_priority(ctx, priority) + vlan_interface = str(vlan_name) + "|" + interface_name + db.mod_entry('STP_VLAN_PORT', vlan_interface, {'priority': priority}) + + +@spanning_tree_vlan_interface.command('cost') +@click.argument('vid', metavar='', required=True, type=int) +@click.argument('interface_name', metavar='', required=True) +@click.argument('cost', metavar='<1-200000000>', required=True, type=int) +@clicommon.pass_db +def stp_vlan_interface_cost(_db, vid, interface_name, cost): + """Configure STP per vlan path cost for interface""" + ctx = click.get_current_context() + db = _db.cfgdb + vlan_name = 'Vlan{}'.format(vid) + check_if_stp_enabled_for_vlan(ctx, db, vlan_name) + check_if_stp_enabled_for_interface(ctx, db, interface_name) + check_if_vlan_exist_in_db(db, ctx, vid) + is_interface_vlan_member(db, vlan_name, interface_name) + is_valid_interface_path_cost(ctx, cost) + vlan_interface = str(vlan_name) + "|" + interface_name + db.mod_entry('STP_VLAN_PORT', vlan_interface, {'path_cost': cost}) + + +# Invoke main() +# if __name__ == '__main__': +# spanning_tree() diff --git a/config/vlan.py b/config/vlan.py index 98cc95757e..eae51eb312 100644 --- a/config/vlan.py +++ b/config/vlan.py @@ -6,6 +6,7 @@ from time import sleep from .utils import log from .validated_config_db_connector import ValidatedConfigDBConnector +from . import stp ADHOC_VALIDATION = True DHCP_RELAY_TABLE = "DHCP_RELAY" @@ -76,6 +77,9 @@ def add_vlan(db, vid, multiple): if clicommon.check_if_vlanid_exist(db.cfgdb, vlan, "DHCP_RELAY"): ctx.fail("DHCPv6 relay config for {} already exists".format(vlan)) + # Enable STP on VLAN if PVST is enabled globally + stp.vlan_enable_stp(db.cfgdb, vlan) + # set dhcpv4_relay table set_dhcp_relay_table('VLAN', config_db, vlan, {'vlanid': str(vid)}) @@ -97,6 +101,29 @@ def delete_db_entry(entry_name, db_connector, db_name): db_connector.delete(db_name, entry_name) +def enable_stp_on_port(db, port): + if stp.is_global_stp_enabled(db) is True: + vlan_list_for_intf = stp.get_vlan_list_for_interface(db, port) + if len(vlan_list_for_intf) == 0: + stp.interface_enable_stp(db, port) + + +def disable_stp_on_vlan_port(db, vlan, port): + if stp.is_global_stp_enabled(db) is True: + vlan_interface = str(vlan) + "|" + port + db.set_entry('STP_VLAN_PORT', vlan_interface, None) + vlan_list_for_intf = stp.get_vlan_list_for_interface(db, port) + if len(vlan_list_for_intf) == 0: + db.set_entry('STP_PORT', port, None) + + +def disable_stp_on_vlan(db, vlan_interface): + db.set_entry('STP_VLAN', vlan_interface, None) + stp_intf_list = stp.get_intf_list_from_stp_vlan_intf_table(db, vlan_interface) + for intf_name in stp_intf_list: + key = vlan_interface + "|" + intf_name + db.set_entry('STP_VLAN_PORT', key, None) + @vlan.command('del') @click.argument('vid', metavar='', required=True) @click.option('-m', '--multiple', is_flag=True, help="Add Multiple Vlan(s) in Range or in Comma separated list") @@ -154,7 +181,8 @@ def del_vlan(db, vid, multiple, no_restart_dhcp_relay): for vxmap_key, vxmap_data in vxlan_table.items(): if vxmap_data['vlan'] == 'Vlan{}'.format(vid): ctx.fail("vlan: {} can not be removed. " - "First remove vxlan mapping '{}' assigned to VLAN".format(vid, '|'.join(vxmap_key))) + "First remove vxlan mapping '{}' assigned to VLAN".format( + vid, '|'.join(vxmap_key))) # set dhcpv4_relay table set_dhcp_relay_table('VLAN', config_db, vlan, None) @@ -169,6 +197,9 @@ def del_vlan(db, vid, multiple, no_restart_dhcp_relay): delete_db_entry("DHCPv6_COUNTER_TABLE|{}".format(vlan), db.db, db.db.STATE_DB) delete_db_entry("DHCP_COUNTER_TABLE|{}".format(vlan), db.db, db.db.STATE_DB) + # Delete STP_VLAN & STP_VLAN_PORT entries when VLAN is deleted. + disable_stp_on_vlan(db.cfgdb, 'Vlan{}'.format(vid)) + vlans = db.cfgdb.get_keys('VLAN') if not vlans: docker_exec_cmd = ['docker', 'exec', '-i', 'swss'] @@ -312,6 +343,10 @@ def add_vlan_member(db, vid, port, untagged, multiple, except_flag): ctx.fail("{} is in access mode! Tagged Members cannot be added".format(port)) elif existing_mode == mode_type or (existing_mode == "trunk" and mode_type == "access"): pass + + # If port is being made L2 port, enable STP + enable_stp_on_port(db.cfgdb, port) + try: config_db.set_entry('VLAN_MEMBER', (vlan, port), {'tagging_mode': "untagged" if untagged else "tagged"}) except ValueError: @@ -356,6 +391,9 @@ def del_vlan_member(db, vid, port, multiple, except_flag): if not clicommon.is_port_vlan_member(db.cfgdb, port, vlan): # TODO: MISSING CONSTRAINT IN YANG MODEL ctx.fail("{} is not a member of {}".format(port, vlan)) + # If port is being made non-L2 port, disable STP + disable_stp_on_vlan_port(db.cfgdb, vlan, port) + try: config_db.set_entry('VLAN_MEMBER', (vlan, port), None) delete_db_entry("DHCPv6_COUNTER_TABLE|{}".format(port), db.db, db.db.STATE_DB) diff --git a/debug/main.py b/debug/main.py index 069159fc75..1c12dffe85 100755 --- a/debug/main.py +++ b/debug/main.py @@ -4,6 +4,7 @@ import subprocess from shlex import join + def run_command(command, pager=False): command_str = join(command) click.echo(click.style("Command: ", fg='cyan') + click.style(command_str, fg='green')) @@ -25,6 +26,7 @@ def cli(): """SONiC command line - 'debug' command""" pass + prefix_pattern = '^[A-Za-z0-9.:/]*$' p = subprocess.check_output(['sudo', 'vtysh', '-c', 'show version'], text=True) if 'FRRouting' in p: diff --git a/debug/stp.py b/debug/stp.py new file mode 100644 index 0000000000..c154537e2a --- /dev/null +++ b/debug/stp.py @@ -0,0 +1,92 @@ +import click +import utilities_common.cli as clicommon + + +# +# This group houses Spanning_tree commands and subgroups +# +@click.group(cls=clicommon.AliasedGroup, default_if_no_args=False, invoke_without_command=True) +@click.pass_context +def spanning_tree(ctx): + '''debug spanning_tree commands''' + if ctx.invoked_subcommand is None: + command = 'sudo stpctl dbg enable' + clicommon.run_command(command) + + +@spanning_tree.group('dump', cls=clicommon.AliasedGroup, default_if_no_args=False, invoke_without_command=True) +def stp_debug_dump(): + pass + + +@stp_debug_dump.command('global') +def stp_debug_dump_global(): + command = 'sudo stpctl global' + clicommon.run_command(command) + + +@stp_debug_dump.command('vlan') +@click.argument('vlan_id', metavar='', required=True) +def stp_debug_dump_vlan(vlan_id): + command = 'sudo stpctl vlan ' + vlan_id + clicommon.run_command(command) + + +@stp_debug_dump.command('interface') +@click.argument('vlan_id', metavar='', required=True) +@click.argument('interface_name', metavar='', required=True) +def stp_debug_dump_vlan_intf(vlan_id, interface_name): + command = 'sudo stpctl port ' + vlan_id + " " + interface_name + clicommon.run_command(command) + + +@spanning_tree.command('show') +def stp_debug_show(): + command = 'sudo stpctl dbg show' + clicommon.run_command(command) + + +@spanning_tree.command('reset') +def stp_debug_reset(): + command = 'sudo stpctl dbg disable' + clicommon.run_command(command) + + +@spanning_tree.command('bpdu') +@click.argument('mode', metavar='{rx|tx}', required=False) +@click.option('-d', '--disable', is_flag=True) +def stp_debug_bpdu(mode, disable): + command = 'sudo stpctl dbg bpdu {}{}'.format( + ('rx-' if mode == 'rx' else 'tx-' if mode == 'tx' else ''), + ('off' if disable else 'on')) + clicommon.run_command(command) + + +@spanning_tree.command('verbose') +@click.option('-d', '--disable', is_flag=True) +def stp_debug_verbose(disable): + command = 'sudo stpctl dbg verbose {}'.format("off" if disable else "on") + clicommon.run_command(command) + + +@spanning_tree.command('event') +@click.option('-d', '--disable', is_flag=True) +def stp_debug_event(disable): + command = 'sudo stpctl dbg event {}'.format("off" if disable else "on") + clicommon.run_command(command) + + +@spanning_tree.command('vlan') +@click.argument('vlan_id', metavar='', required=True) +@click.option('-d', '--disable', is_flag=True) +def stp_debug_vlan(vlan_id, disable): + command = 'sudo stpctl dbg vlan {} {}'.format(vlan_id, "off" if disable else "on") + clicommon.run_command(command) + + +@spanning_tree.command('interface') +@click.argument('interface_name', metavar='', required=True) +@click.option('-d', '--disable', is_flag=True) +def stp_debug_intf(interface_name, disable): + command = 'sudo stpctl dbg port {} {}'.format(interface_name, "off" if disable else "on") + clicommon.run_command(command) diff --git a/scripts/generate_dump b/scripts/generate_dump index a0051bca94..7357de30a3 100755 --- a/scripts/generate_dump +++ b/scripts/generate_dump @@ -2085,6 +2085,12 @@ main() { fi wait + save_cmd "stpctl all" "stp.log" + save_cmd "show spanning_tree" "stp.show" + save_cmd "show spanning_tree statistics" "stp.stats" + save_cmd "show spanning_tree bpdu_guard" "stp.bg" + save_cmd "show spanning_tree root_guard" "stp.rg" + save_cmd "ps aux" "ps.aux" & save_cmd "top -b -n 1" "top" & save_cmd "free" "free" & diff --git a/show/main.py b/show/main.py index 3151e4d61b..370798d219 100755 --- a/show/main.py +++ b/show/main.py @@ -67,6 +67,7 @@ from . import syslog from . import dns from . import bgp_cli +from . import stp # Global Variables PLATFORM_JSON = 'platform.json' @@ -318,6 +319,7 @@ def cli(ctx): cli.add_command(system_health.system_health) cli.add_command(warm_restart.warm_restart) cli.add_command(dns.dns) +cli.add_command(stp.spanning_tree) # syslog module cli.add_command(syslog.syslog) @@ -1887,6 +1889,16 @@ def syslog(verbose): click.echo(tabulate(body, header, tablefmt="simple", stralign="left", missingval="")) +# 'spanning-tree' subcommand ("show runningconfiguration spanning_tree") +@runningconfiguration.command() +@click.option('--verbose', is_flag=True, help="Enable verbose output") +def spanning_tree(verbose): + """Show spanning_tree running configuration""" + stp_list = ["STP", "STP_PORT", "STP_VLAN", "STP_VLAN_PORT"] + for key in stp_list: + cmd = ['sudo', 'sonic-cfggen', '-d', '--var-json', key] + run_command(cmd, display_cmd=verbose) + # # 'startupconfiguration' group ("show startupconfiguration ...") # diff --git a/show/stp.py b/show/stp.py new file mode 100644 index 0000000000..a64d9764f5 --- /dev/null +++ b/show/stp.py @@ -0,0 +1,403 @@ +import re +import click +# import subprocess +import utilities_common.cli as clicommon +from swsscommon.swsscommon import SonicV2Connector, ConfigDBConnector + + +############################################################################## +# 'spanning_tree' group ("show spanning_tree ...") +############################################################################### +# STP show commands:- +# show spanning_tree +# show spanning_tree vlan +# show spanning_tree vlan interface +# show spanning_tree bpdu_guard +# show spanning_tree statistics +# show spanning_tree statistics vlan +# +############################################################################### +g_stp_vlanid = 0 +# +# Utility API's +# + + +def is_stp_docker_running(): + return True +# running_docker = subprocess.check_output('docker ps', shell=True) +# if running_docker.find("docker-stp".encode()) == -1: +# return False +# else: +# return True + + +def connect_to_cfg_db(): + config_db = ConfigDBConnector() + config_db.connect() + return config_db + + +def connect_to_appl_db(): + appl_db = SonicV2Connector(host="127.0.0.1") + appl_db.connect(appl_db.APPL_DB) + return appl_db + + +# Redis DB only supports limiter pattern search wildcards. +# check https://redis.io/commands/KEYS before using this api +# Redis-db uses glob-style patterns not regex +def stp_get_key_from_pattern(db_connect, db, pattern): + keys = db_connect.keys(db, pattern) + if keys: + return keys[0] + else: + return None + + +# get_all doesnt accept regex patterns, it requires exact key +def stp_get_all_from_pattern(db_connect, db, pattern): + key = stp_get_key_from_pattern(db_connect, db, pattern) + if key: + entry = db_connect.get_all(db, key) + return entry + + +def stp_is_port_fast_enabled(ifname): + app_db_entry = stp_get_all_from_pattern( + g_stp_appl_db, g_stp_appl_db.APPL_DB, "*STP_PORT_TABLE:{}".format(ifname)) + if (not app_db_entry or not ('port_fast' in app_db_entry) or app_db_entry['port_fast'] == 'no'): + return False + return True + + +def stp_is_uplink_fast_enabled(ifname): + entry = g_stp_cfg_db.get_entry("STP_PORT", ifname) + if (entry and ('uplink_fast' in entry) and entry['uplink_fast'] == 'true'): + return True + return False + + +def stp_get_entry_from_vlan_tb(db, vlanid): + entry = stp_get_all_from_pattern(db, db.APPL_DB, "*STP_VLAN_TABLE:Vlan{}".format(vlanid)) + if not entry: + return entry + + if 'bridge_id' not in entry: + entry['bridge_id'] = 'NA' + if 'max_age' not in entry: + entry['max_age'] = '0' + if 'hello_time' not in entry: + entry['hello_time'] = '0' + if 'forward_delay' not in entry: + entry['forward_delay'] = '0' + if 'hold_time' not in entry: + entry['hold_time'] = '0' + if 'last_topology_change' not in entry: + entry['last_topology_change'] = '0' + if 'topology_change_count' not in entry: + entry['topology_change_count'] = '0' + if 'root_bridge_id' not in entry: + entry['root_bridge_id'] = 'NA' + if 'root_path_cost' not in entry: + entry['root_path_cost'] = '0' + if 'desig_bridge_id' not in entry: + entry['desig_bridge_id'] = 'NA' + if 'root_port' not in entry: + entry['root_port'] = 'NA' + if 'root_max_age' not in entry: + entry['root_max_age'] = '0' + if 'root_hello_time' not in entry: + entry['root_hello_time'] = '0' + if 'root_forward_delay' not in entry: + entry['root_forward_delay'] = '0' + if 'stp_instance' not in entry: + entry['stp_instance'] = '65535' + + return entry + + +def stp_get_entry_from_vlan_intf_tb(db, vlanid, ifname): + entry = stp_get_all_from_pattern(db, db.APPL_DB, "*STP_VLAN_PORT_TABLE:Vlan{}:{}".format(vlanid, ifname)) + if not entry: + return entry + + if 'port_num' not in entry: + entry['port_num'] = 'NA' + if 'priority' not in entry: + entry['priority'] = '0' + if 'path_cost' not in entry: + entry['path_cost'] = '0' + if 'root_guard' not in entry: + entry['root_guard'] = 'NA' + if 'bpdu_guard' not in entry: + entry['bpdu_guard'] = 'NA' + if 'port_state' not in entry: + entry['port_state'] = 'NA' + if 'desig_cost' not in entry: + entry['desig_cost'] = '0' + if 'desig_root' not in entry: + entry['desig_root'] = 'NA' + if 'desig_bridge' not in entry: + entry['desig_bridge'] = 'NA' + + return entry + + +# +# This group houses Spanning_tree commands and subgroups +@click.group(cls=clicommon.AliasedGroup, invoke_without_command=True) +@click.pass_context +def spanning_tree(ctx): + """Show spanning_tree commands""" + global g_stp_appl_db + global g_stp_cfg_db + + if not is_stp_docker_running(): + ctx.fail("STP docker is not running") + + g_stp_appl_db = connect_to_appl_db() + g_stp_cfg_db = connect_to_cfg_db() + + global_cfg = g_stp_cfg_db.get_entry("STP", "GLOBAL") + if not global_cfg: + click.echo("Spanning-tree is not configured") + return + + global g_stp_mode + if 'pvst' == global_cfg['mode']: + g_stp_mode = 'PVST' + + if ctx.invoked_subcommand is None: + keys = g_stp_appl_db.keys(g_stp_appl_db.APPL_DB, "*STP_VLAN_TABLE:Vlan*") + if not keys: + return + vlan_list = [] + for key in keys: + result = re.search('.STP_VLAN_TABLE:Vlan(.*)', key) + vlanid = result.group(1) + vlan_list.append(int(vlanid)) + vlan_list.sort() + for vlanid in vlan_list: + ctx.invoke(show_stp_vlan, vlanid=vlanid) + + +@spanning_tree.group('vlan', cls=clicommon.AliasedGroup, invoke_without_command=True) +@click.argument('vlanid', metavar='', required=True, type=int) +@click.pass_context +def show_stp_vlan(ctx, vlanid): + """Show spanning_tree vlan information""" + global g_stp_vlanid + g_stp_vlanid = vlanid + + vlan_tb_entry = stp_get_entry_from_vlan_tb(g_stp_appl_db, g_stp_vlanid) + if not vlan_tb_entry: + return + + global g_stp_mode + if g_stp_mode: + click.echo("Spanning-tree Mode: {}".format(g_stp_mode)) + # reset so we dont print again + g_stp_mode = '' + + click.echo("") + click.echo("VLAN {} - STP instance {}".format(g_stp_vlanid, vlan_tb_entry['stp_instance'])) + click.echo("--------------------------------------------------------------------") + click.echo("STP Bridge Parameters:") + + click.echo("{:17}{:7}{:7}{:7}{:6}{:13}{}".format( + "Bridge", "Bridge", "Bridge", "Bridge", "Hold", "LastTopology", "Topology")) + click.echo("{:17}{:7}{:7}{:7}{:6}{:13}{}".format( + "Identifier", "MaxAge", "Hello", "FwdDly", "Time", "Change", "Change")) + click.echo("{:17}{:7}{:7}{:7}{:6}{:13}{}".format("hex", "sec", "sec", "sec", "sec", "sec", "cnt")) + click.echo("{:17}{:7}{:7}{:7}{:6}{:13}{}".format( + vlan_tb_entry['bridge_id'], + vlan_tb_entry['max_age'], + vlan_tb_entry['hello_time'], + vlan_tb_entry['forward_delay'], + vlan_tb_entry['hold_time'], + vlan_tb_entry['last_topology_change'], + vlan_tb_entry['topology_change_count'])) + + click.echo("") + click.echo("{:17}{:10}{:18}{:19}{:4}{:4}{}".format( + "RootBridge", "RootPath", "DesignatedBridge", "RootPort", "Max", "Hel", "Fwd")) + click.echo("{:17}{:10}{:18}{:19}{:4}{:4}{}".format("Identifier", "Cost", "Identifier", "", "Age", "lo", "Dly")) + click.echo("{:17}{:10}{:18}{:19}{:4}{:4}{}".format("hex", "", "hex", "", "sec", "sec", "sec")) + click.echo("{:17}{:10}{:18}{:19}{:4}{:4}{}".format( + vlan_tb_entry['root_bridge_id'], + vlan_tb_entry['root_path_cost'], + vlan_tb_entry['desig_bridge_id'], + vlan_tb_entry['root_port'], + vlan_tb_entry['root_max_age'], + vlan_tb_entry['root_hello_time'], + vlan_tb_entry['root_forward_delay'])) + + click.echo("") + click.echo("STP Port Parameters:") + click.echo("{:17}{:5}{:10}{:5}{:7}{:14}{:12}{:17}{}".format( + "Port", "Prio", "Path", "Port", "Uplink", "State", "Designated", "Designated", "Designated")) + click.echo("{:17}{:5}{:10}{:5}{:7}{:14}{:12}{:17}{}".format( + "Name", "rity", "Cost", "Fast", "Fast", "", "Cost", "Root", "Bridge")) + if ctx.invoked_subcommand is None: + keys = g_stp_appl_db.keys(g_stp_appl_db.APPL_DB, "*STP_VLAN_PORT_TABLE:Vlan{}:*".format(vlanid)) + if not keys: + return + intf_list = [] + for key in keys: + result = re.search('.STP_VLAN_PORT_TABLE:Vlan{}:(.*)'.format(vlanid), key) + ifname = result.group(1) + intf_list.append(ifname) + eth_list = [ifname[len("Ethernet"):] for ifname in intf_list if ifname.startswith("Ethernet")] + po_list = [ifname[len("PortChannel"):] for ifname in intf_list if ifname.startswith("PortChannel")] + + eth_list.sort() + po_list.sort() + for port_num in eth_list: + ctx.invoke(show_stp_interface, ifname="Ethernet"+str(port_num)) + for port_num in po_list: + ctx.invoke(show_stp_interface, ifname="PortChannel"+port_num) + + +@show_stp_vlan.command('interface') +@click.argument('ifname', metavar='', required=True) +@click.pass_context +def show_stp_interface(ctx, ifname): + """Show spanning_tree vlan interface information""" + + vlan_intf_tb_entry = stp_get_entry_from_vlan_intf_tb(g_stp_appl_db, g_stp_vlanid, ifname) + if not vlan_intf_tb_entry: + return + + click.echo("{:17}{:5}{:10}{:5}{:7}{:14}{:12}{:17}{}".format( + ifname, + vlan_intf_tb_entry['priority'], + vlan_intf_tb_entry['path_cost'], + 'Y' if (stp_is_port_fast_enabled(ifname)) else 'N', + 'Y' if (stp_is_uplink_fast_enabled(ifname)) else 'N', + vlan_intf_tb_entry['port_state'], + vlan_intf_tb_entry['desig_cost'], + vlan_intf_tb_entry['desig_root'], + vlan_intf_tb_entry['desig_bridge'] + )) + + +@spanning_tree.command('bpdu_guard') +@click.pass_context +def show_stp_bpdu_guard(ctx): + """Show spanning_tree bpdu_guard""" + + print_header = 1 + ifname_all = g_stp_cfg_db.get_keys("STP_PORT") + for ifname in ifname_all: + cfg_entry = g_stp_cfg_db.get_entry("STP_PORT", ifname) + if cfg_entry['bpdu_guard'] == 'true' and cfg_entry['enabled'] == 'true': + if print_header: + click.echo("{:17}{:13}{}".format("PortNum", "Shutdown", "Port Shut")) + click.echo("{:17}{:13}{}".format("", "Configured", "due to BPDU guard")) + click.echo("-------------------------------------------") + print_header = 0 + + if cfg_entry['bpdu_guard_do_disable'] == 'true': + disabled = 'No' + keys = g_stp_appl_db.keys(g_stp_appl_db.APPL_DB, "*STP_PORT_TABLE:{}".format(ifname)) + # only 1 key per ifname is expected in BPDU_GUARD_TABLE. + if keys: + appdb_entry = g_stp_appl_db.get_all(g_stp_appl_db.APPL_DB, keys[0]) + if appdb_entry and 'bpdu_guard_shutdown' in appdb_entry: + if appdb_entry['bpdu_guard_shutdown'] == 'yes': + disabled = 'Yes' + click.echo("{:17}{:13}{}".format(ifname, "Yes", disabled)) + else: + click.echo("{:17}{:13}{}".format(ifname, "No", "NA")) + + +@spanning_tree.command('root_guard') +@click.pass_context +def show_stp_root_guard(ctx): + """Show spanning_tree root_guard""" + + print_header = 1 + ifname_all = g_stp_cfg_db.get_keys("STP_PORT") + for ifname in ifname_all: + entry = g_stp_cfg_db.get_entry("STP_PORT", ifname) + if entry['root_guard'] == 'true' and entry['enabled'] == 'true': + if print_header: + global_entry = g_stp_cfg_db.get_entry("STP", "GLOBAL") + click.echo("Root guard timeout: {} secs".format(global_entry['rootguard_timeout'])) + click.echo("") + click.echo("{:17}{:7}{}".format("Port", "VLAN", "Current State")) + click.echo("-------------------------------------------") + print_header = 0 + + state = '' + vlanid = '' + keys = g_stp_appl_db.keys(g_stp_appl_db.APPL_DB, "*STP_VLAN_PORT_TABLE:*:{}".format(ifname)) + if keys: + for key in keys: + entry = g_stp_appl_db.get_all(g_stp_appl_db.APPL_DB, key) + if entry and 'root_guard_timer' in entry: + if entry['root_guard_timer'] == '0': + state = 'Consistent state' + else: + state = 'Inconsistent state ({} seconds left on timer)'.format(entry['root_guard_timer']) + + vlanid = re.search(':Vlan(.*):', key) + if vlanid: + click.echo("{:17}{:7}{}".format(ifname, vlanid.group(1), state)) + else: + click.echo("{:17}{:7}{}".format(ifname, vlanid, state)) + + +@spanning_tree.group('statistics', cls=clicommon.AliasedGroup, invoke_without_command=True) +@click.pass_context +def show_stp_statistics(ctx): + """Show spanning_tree statistics""" + + if ctx.invoked_subcommand is None: + keys = g_stp_appl_db.keys(g_stp_appl_db.APPL_DB, "*STP_VLAN_TABLE:Vlan*") + if not keys: + return + + vlan_list = [] + for key in keys: + result = re.search('.STP_VLAN_TABLE:Vlan(.*)', key) + vlanid = result.group(1) + vlan_list.append(int(vlanid)) + vlan_list.sort() + for vlanid in vlan_list: + ctx.invoke(show_stp_vlan_statistics, vlanid=vlanid) + + +@show_stp_statistics.command('vlan') +@click.argument('vlanid', metavar='', required=True, type=int) +@click.pass_context +def show_stp_vlan_statistics(ctx, vlanid): + """Show spanning_tree statistics vlan""" + + stp_inst_entry = stp_get_all_from_pattern( + g_stp_appl_db, g_stp_appl_db.APPL_DB, "*STP_VLAN_TABLE:Vlan{}".format(vlanid)) + if not stp_inst_entry: + return + + click.echo("VLAN {} - STP instance {}".format(vlanid, stp_inst_entry['stp_instance'])) + click.echo("--------------------------------------------------------------------") + click.echo("{:17}{:15}{:15}{:15}{}".format("PortNum", "BPDU Tx", "BPDU Rx", "TCN Tx", "TCN Rx")) + keys = g_stp_appl_db.keys(g_stp_appl_db.APPL_DB, "*STP_VLAN_PORT_TABLE:Vlan{}:*".format(vlanid)) + if keys: + for key in keys: + result = re.search('.STP_VLAN_PORT_TABLE:Vlan(.*):(.*)', key) + ifname = result.group(2) + entry = g_stp_appl_db.get_all(g_stp_appl_db.APPL_DB, key) + if entry: + if 'bpdu_sent' not in entry: + entry['bpdu_sent'] = '-' + if 'bpdu_received' not in entry: + entry['bpdu_received'] = '-' + if 'tc_sent' not in entry: + entry['tc_sent'] = '-' + if 'tc_received' not in entry: + entry['tc_received'] = '-' + + click.echo("{:17}{:15}{:15}{:15}{}".format( + ifname, entry['bpdu_sent'], entry['bpdu_received'], entry['tc_sent'], entry['tc_received'])) diff --git a/sonic_installer/main.py b/sonic_installer/main.py index c5d3a256f2..d85e3731aa 100644 --- a/sonic_installer/main.py +++ b/sonic_installer/main.py @@ -763,6 +763,7 @@ def cleanup(): "radv", "restapi", "sflow", + "stp", "snmp", "swss", "syncd", diff --git a/tests/mock_tables/appl_db.json b/tests/mock_tables/appl_db.json index e967caa758..d755f46428 100644 --- a/tests/mock_tables/appl_db.json +++ b/tests/mock_tables/appl_db.json @@ -305,6 +305,40 @@ "type": "dynamic", "vni": "200" }, + "_STP_VLAN_TABLE:Vlan500": { + "bridge_id": "8064b86a97e24e9c", + "max_age": "20", + "hello_time": "2", + "forward_delay": "15", + "hold_time": "1", + "root_bridge_id": "0064b86a97e24e9c", + "root_path_cost": "600", + "desig_bridge_id": "806480a235f281ec", + "root_port": "Root", + "root_max_age": "20", + "root_hello_time": "2", + "root_forward_delay": "15", + "stp_instance": "0", + "topology_change_count": "1", + "last_topology_change": "0" + }, + "_STP_VLAN_PORT_TABLE:Vlan500:Ethernet4": { + "port_num": "4", + "priority": "128", + "path_cost": "200", + "port_state": "FORWARDING", + "desig_cost": "400", + "desig_root": "0064b86a97e24e9c", + "desig_bridge": "806480a235f281ec", + "desig_port": "4", + "bpdu_sent": "10", + "bpdu_received": "15", + "config_bpdu_sent": "10", + "config_bpdu_received": "2", + "tc_sent": "15", + "tc_received": "5", + "root_guard_timer": "0" + }, "MUX_CABLE_TABLE:Ethernet32": { "state": "active" }, diff --git a/tests/mock_tables/config_db.json b/tests/mock_tables/config_db.json index 187efed553..3deca74255 100644 --- a/tests/mock_tables/config_db.json +++ b/tests/mock_tables/config_db.json @@ -2798,5 +2798,28 @@ "dhcpv6_servers": [ "fc02:2000::1" ] - } + }, + "STP|GLOBAL": { + "forward_delay": "15", + "hello_time": "2", + "max_age": "20", + "mode": "pvst", + "priority": "32768", + "rootguard_timeout": "30" + }, + "STP_PORT|Ethernet4": { + "bpdu_guard": "true", + "bpdu_guard_do_disable": "false", + "enabled": "true", + "portfast": "true", + "root_guard": "true", + "uplink_fast": "false" + }, + "STP_VLAN|Vlan500": { + "enabled": "true", + "forward_delay": "15", + "hello_time": "2", + "max_age": "20", + "priority": "32768" + } } diff --git a/tests/mock_tables/state_db.json b/tests/mock_tables/state_db.json index 49ffaeedd8..bad7882cb6 100644 --- a/tests/mock_tables/state_db.json +++ b/tests/mock_tables/state_db.json @@ -1681,5 +1681,8 @@ }, "PORT_CAPACITY_TABLE|PORT_CAPACITY_DATA" : { "capacity": "80000" + }, + "STP_TABLE|GLOBAL": { + "max_stp_inst": "510" } } diff --git a/tests/stp_test.py b/tests/stp_test.py new file mode 100644 index 0000000000..44a93065cc --- /dev/null +++ b/tests/stp_test.py @@ -0,0 +1,414 @@ +import os +import re +import pytest +from click.testing import CliRunner + +import config.main as config +import show.main as show +from utilities_common.db import Db +from .mock_tables import dbconnector + + +EXPECTED_SHOW_SPANNING_TREE_OUTPUT = """\ +Spanning-tree Mode: PVST + +VLAN 500 - STP instance 0 +-------------------------------------------------------------------- +STP Bridge Parameters: +Bridge Bridge Bridge Bridge Hold LastTopology Topology +Identifier MaxAge Hello FwdDly Time Change Change +hex sec sec sec sec sec cnt +8064b86a97e24e9c 20 2 15 1 0 1 + +RootBridge RootPath DesignatedBridge RootPort Max Hel Fwd +Identifier Cost Identifier Age lo Dly +hex hex sec sec sec +0064b86a97e24e9c 600 806480a235f281ec Root 20 2 15 + +STP Port Parameters: +Port Prio Path Port Uplink State Designated Designated Designated +Name rity Cost Fast Fast Cost Root Bridge +Ethernet4 128 200 N N FORWARDING 400 0064b86a97e24e9c 806480a235f281ec +""" + +EXPECTED_SHOW_SPANNING_TREE_VLAN_OUTPUT = """\ + +VLAN 500 - STP instance 0 +-------------------------------------------------------------------- +STP Bridge Parameters: +Bridge Bridge Bridge Bridge Hold LastTopology Topology +Identifier MaxAge Hello FwdDly Time Change Change +hex sec sec sec sec sec cnt +8064b86a97e24e9c 20 2 15 1 0 1 + +RootBridge RootPath DesignatedBridge RootPort Max Hel Fwd +Identifier Cost Identifier Age lo Dly +hex hex sec sec sec +0064b86a97e24e9c 600 806480a235f281ec Root 20 2 15 + +STP Port Parameters: +Port Prio Path Port Uplink State Designated Designated Designated +Name rity Cost Fast Fast Cost Root Bridge +Ethernet4 128 200 N N FORWARDING 400 0064b86a97e24e9c 806480a235f281ec +""" + +EXPECTED_SHOW_SPANNING_TREE_STATISTICS_OUTPUT = """\ +VLAN 500 - STP instance 0 +-------------------------------------------------------------------- +PortNum BPDU Tx BPDU Rx TCN Tx TCN Rx +Ethernet4 10 15 15 5 +""" + +EXPECTED_SHOW_SPANNING_TREE_BPDU_GUARD_OUTPUT = """\ +PortNum Shutdown Port Shut + Configured due to BPDU guard +------------------------------------------- +Ethernet4 No NA +""" + +EXPECTED_SHOW_SPANNING_TREE_ROOT_GUARD_OUTPUT = """\ +Root guard timeout: 30 secs + +Port VLAN Current State +------------------------------------------- +Ethernet4 500 Consistent state +""" + + +class TestStp(object): + @classmethod + def setup_class(cls): + os.environ['UTILITIES_UNIT_TESTING'] = "1" + print("SETUP") + + # Fixture for initializing the CliRunner + @pytest.fixture(scope="module") + def runner(self): + return CliRunner() + + # Fixture for initializing the Db + @pytest.fixture(scope="module") + def db(self): + return Db() + + def test_show_spanning_tree(self, runner, db): + result = runner.invoke(show.cli.commands["spanning-tree"], [], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert (re.sub(r'\s+', ' ', result.output.strip())) == (re.sub( + r'\s+', ' ', EXPECTED_SHOW_SPANNING_TREE_OUTPUT.strip())) + + def test_show_spanning_tree_vlan(self, runner, db): + result = runner.invoke(show.cli.commands["spanning-tree"].commands["vlan"], ["500"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert re.sub(r'\s+', ' ', result.output.strip()) == re.sub( + r'\s+', ' ', EXPECTED_SHOW_SPANNING_TREE_VLAN_OUTPUT.strip()) + + def test_show_spanning_tree_statistics(self, runner, db): + result = runner.invoke(show.cli.commands["spanning-tree"].commands["statistics"], [], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert re.sub(r'\s+', ' ', result.output.strip()) == re.sub( + r'\s+', ' ', EXPECTED_SHOW_SPANNING_TREE_STATISTICS_OUTPUT.strip()) + + def test_show_spanning_tree_statistics_vlan(self, runner, db): + result = runner.invoke( + show.cli.commands["spanning-tree"].commands["statistics"].commands["vlan"], ["500"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert re.sub(r'\s+', ' ', result.output.strip()) == re.sub( + r'\s+', ' ', EXPECTED_SHOW_SPANNING_TREE_STATISTICS_OUTPUT.strip()) + + def test_show_spanning_tree_bpdu_guard(self, runner, db): + result = runner.invoke(show.cli.commands["spanning-tree"].commands["bpdu_guard"], [], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert re.sub(r'\s+', ' ', result.output.strip()) == re.sub( + r'\s+', ' ', EXPECTED_SHOW_SPANNING_TREE_BPDU_GUARD_OUTPUT.strip()) + + def test_show_spanning_tree_root_guard(self, runner, db): + result = runner.invoke(show.cli.commands["spanning-tree"].commands["root_guard"], [], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert re.sub(r'\s+', ' ', result.output.strip()) == re.sub( + r'\s+', ' ', EXPECTED_SHOW_SPANNING_TREE_ROOT_GUARD_OUTPUT.strip()) + + @pytest.mark.parametrize("command, args, expected_exit_code, expected_output", [ + # Disable PVST + (config.config.commands["spanning-tree"].commands["disable"], ["pvst"], 0, None), + # Enable PVST + (config.config.commands["spanning-tree"].commands["enable"], ["pvst"], 0, None), + # Add VLAN and member + (config.config.commands["vlan"].commands["add"], ["500"], 0, None), + (config.config.commands["vlan"].commands["member"].commands["add"], ["500", "Ethernet4"], 0, None), + # Attempt to enable PVST when it is already enabled + (config.config.commands["spanning-tree"].commands["enable"], ["pvst"], 2, "PVST is already configured") + ]) + def test_disable_enable_global_pvst(self, runner, db, command, args, expected_exit_code, expected_output): + # Execute the command + result = runner.invoke(command, args, obj=db) + + # Print for debugging + print(result.exit_code) + print(result.output) + + # Check the exit code + assert result.exit_code == expected_exit_code + + # Check the output if an expected output is defined + if expected_output: + assert expected_output in result.output + + @pytest.mark.parametrize("command, args, expected_exit_code, expected_output", [ + # Disable pvst + (config.config.commands["spanning-tree"].commands["disable"], ["pvst"], 0, None), + # Attempt enabling STP interface without global STP enabled + (config.config.commands["spanning-tree"].commands["interface"].commands["enable"], + ["Ethernet4"], 2, "Global STP is not enabled"), + # Enable pvst + (config.config.commands["spanning-tree"].commands["enable"], ["pvst"], 0, None), + # Configure interface priority and cost + (config.config.commands["spanning-tree"].commands["interface"].commands["priority"], + ["Ethernet4", "16"], 0, None), + (config.config.commands["spanning-tree"].commands["interface"].commands["cost"], + ["Ethernet4", "500"], 0, None), + # Disable and enable interface spanning tree + (config.config.commands["spanning-tree"].commands["interface"].commands["disable"], ["Ethernet4"], 0, None), + (config.config.commands["spanning-tree"].commands["interface"].commands["enable"], ["Ethernet4"], 0, None), + # Configure portfast disable and enable + (config.config.commands["spanning-tree"].commands["interface"].commands["portfast"].commands["disable"], + ["Ethernet4"], 0, None), + (config.config.commands["spanning-tree"].commands["interface"].commands["portfast"].commands["enable"], + ["Ethernet4"], 0, None), + # Configure uplink fast disable and enable + (config.config.commands["spanning-tree"].commands["interface"].commands["uplink_fast"].commands["disable"], + ["Ethernet4"], 0, None), + (config.config.commands["spanning-tree"].commands["interface"].commands["uplink_fast"].commands["enable"], + ["Ethernet4"], 0, None), + # Configure BPDU guard enable and disable with shutdown + (config.config.commands["spanning-tree"].commands["interface"].commands["bpdu_guard"].commands["enable"], + ["Ethernet4"], 0, None), + (config.config.commands["spanning-tree"].commands["interface"].commands["bpdu_guard"].commands["disable"], + ["Ethernet4"], 0, None), + (config.config.commands["spanning-tree"].commands["interface"].commands["bpdu_guard"].commands["enable"], + ["Ethernet4", "--shutdown"], 0, None), + (config.config.commands["spanning-tree"].commands["interface"].commands["bpdu_guard"].commands["disable"], + ["Ethernet4"], 0, None), + # Configure root guard enable and disable + (config.config.commands["spanning-tree"].commands["interface"].commands["root_guard"].commands["enable"], + ["Ethernet4"], 0, None), + (config.config.commands["spanning-tree"].commands["interface"].commands["root_guard"].commands["disable"], + ["Ethernet4"], 0, None), + # Invalid cost and priority values + (config.config.commands["spanning-tree"].commands["interface"].commands["cost"], ["Ethernet4", "0"], + 2, "STP interface path cost must be in range 1-200000000"), + (config.config.commands["spanning-tree"].commands["interface"].commands["cost"], ["Ethernet4", "2000000000"], + 2, "STP interface path cost must be in range 1-200000000"), + (config.config.commands["spanning-tree"].commands["interface"].commands["priority"], ["Ethernet4", "1000"], + 2, "STP interface priority must be in range 0-240"), + # Attempt to enable STP on interface with various conflicts + (config.config.commands["spanning-tree"].commands["interface"].commands["enable"], ["Ethernet4"], + 2, "STP is already enabled for"), + (config.config.commands["spanning-tree"].commands["interface"].commands["enable"], ["Ethernet0"], + 2, "has ip address"), + (config.config.commands["spanning-tree"].commands["interface"].commands["enable"], ["Ethernet120"], + 2, "is a portchannel member port"), + (config.config.commands["spanning-tree"].commands["interface"].commands["enable"], ["Ethernet20"], + 2, "has no VLAN configured") + ]) + def test_stp_validate_interface_params(self, runner, db, command, args, expected_exit_code, expected_output): + # Execute the command + result = runner.invoke(command, args, obj=db) + + # Print for debugging + print(result.exit_code) + print(result.output) + + # Check the exit code + assert result.exit_code == expected_exit_code + + # Check the output if an expected output is defined + if expected_output: + assert expected_output in result.output + + @pytest.mark.parametrize("command, args, expected_exit_code, expected_output", [ + (config.config.commands["spanning-tree"].commands["disable"], ["pvst"], 0, None), + (config.config.commands["spanning-tree"].commands["enable"], ["pvst"], 0, None), + (config.config.commands["spanning-tree"].commands["vlan"].commands["interface"].commands["cost"], + ["500", "Ethernet4", "200"], 0, None), + (config.config.commands["spanning-tree"].commands["vlan"].commands["interface"].commands["priority"], + ["500", "Ethernet4", "32"], 0, None), + (config.config.commands["spanning-tree"].commands["vlan"].commands["interface"].commands["cost"], + ["500", "Ethernet4", "0"], 2, "STP interface path cost must be in range 1-200000000"), + (config.config.commands["spanning-tree"].commands["vlan"].commands["interface"].commands["cost"], + ["500", "Ethernet4", "2000000000"], 2, "STP interface path cost must be in range 1-200000000"), + (config.config.commands["spanning-tree"].commands["vlan"].commands["interface"].commands["priority"], + ["500", "Ethernet4", "1000"], 2, "STP per vlan port priority must be in range 0-240"), + (config.config.commands["vlan"].commands["add"], ["99"], 0, None), + (config.config.commands["spanning-tree"].commands["vlan"].commands["interface"].commands["priority"], + ["99", "Ethernet4", "16"], 2, "is not member of"), + (config.config.commands["vlan"].commands["del"], ["99"], 0, None), + (config.config.commands["vlan"].commands["member"].commands["del"], ["500", "Ethernet4"], 0, None), + (config.config.commands["vlan"].commands["del"], ["500"], 0, None) + ]) + def test_stp_validate_vlan_interface_params(self, runner, db, command, args, expected_exit_code, expected_output): + # Execute the command + result = runner.invoke(command, args, obj=db) + # Output result information + print(result.exit_code) + print(result.output) + + # Check exit code + assert result.exit_code == expected_exit_code + + # If an expected output is defined, check that as well + if expected_output is not None: + assert expected_output in result.output + + @pytest.mark.parametrize("command, args, expected_exit_code, expected_output", [ + (config.config.commands["spanning-tree"].commands["disable"], ["pvst"], 0, None), + (config.config.commands["spanning-tree"].commands["enable"], ["pvst"], 0, None), + # Add VLAN and member + (config.config.commands["vlan"].commands["add"], ["500"], 0, None), + (config.config.commands["spanning-tree"].commands["vlan"].commands["hello"], ["500", "3"], 0, None), + (config.config.commands["spanning-tree"].commands["vlan"].commands["max_age"], ["500", "21"], 0, None), + (config.config.commands["spanning-tree"].commands["vlan"].commands["forward_delay"], ["500", "16"], 0, None), + (config.config.commands["spanning-tree"].commands["vlan"].commands["priority"], ["500", "4096"], 0, None), + (config.config.commands["spanning-tree"].commands["vlan"].commands["hello"], ["500", "0"], + 2, "STP hello timer must be in range 1-10"), + (config.config.commands["spanning-tree"].commands["vlan"].commands["hello"], ["500", "20"], + 2, "STP hello timer must be in range 1-10"), + (config.config.commands["spanning-tree"].commands["vlan"].commands["forward_delay"], ["500", "2"], + 2, "STP forward delay value must be in range 4-30"), + (config.config.commands["spanning-tree"].commands["vlan"].commands["forward_delay"], ["500", "42"], + 2, "STP forward delay value must be in range 4-30"), + (config.config.commands["spanning-tree"].commands["vlan"].commands["max_age"], ["500", "4"], + 2, "STP max age value must be in range 6-40"), + (config.config.commands["spanning-tree"].commands["vlan"].commands["max_age"], ["500", "45"], + 2, "STP max age value must be in range 6-40"), + (config.config.commands["spanning-tree"].commands["vlan"].commands["forward_delay"], ["500", "4"], + 2, "2*(forward_delay-1) >= max_age >= 2*(hello_time +1 )"), + (config.config.commands["spanning-tree"].commands["vlan"].commands["priority"], ["500", "65536"], + 2, "STP bridge priority must be in range 0-61440"), + (config.config.commands["spanning-tree"].commands["vlan"].commands["priority"], ["500", "8000"], + 2, "STP bridge priority must be multiple of 4096"), + (config.config.commands["vlan"].commands["del"], ["500"], 0, None) + ]) + def test_stp_validate_vlan_timer_and_priority_params(self, runner, db, + command, args, expected_exit_code, expected_output): + # Execute the command + result = runner.invoke(command, args, obj=db) + + # Print for debugging + print(result.exit_code) + print(result.output) + + # Check the exit code + assert result.exit_code == expected_exit_code + + # Check the output if there's an expected output + if expected_output: + assert expected_output in result.output + + @pytest.mark.parametrize("command, args, expected_exit_code, expected_output", [ + # Disable PVST globally + (config.config.commands["spanning-tree"].commands["disable"], ["pvst"], 0, None), + # Add VLAN 500 and assign a member port + (config.config.commands["vlan"].commands["add"], ["500"], 0, None), + (config.config.commands["vlan"].commands["member"].commands["add"], ["500", "Ethernet4"], 0, None), + # Enable PVST globally + (config.config.commands["spanning-tree"].commands["enable"], ["pvst"], 0, None), + # Add VLAN 600 + (config.config.commands["vlan"].commands["add"], ["600"], 0, None), + # Disable and then enable spanning-tree on VLAN 600 + (config.config.commands["spanning-tree"].commands["vlan"].commands["disable"], ["600"], 0, None), + (config.config.commands["spanning-tree"].commands["vlan"].commands["enable"], ["600"], 0, None), + # Attempt to delete VLAN 600 while STP is enabled + (config.config.commands["vlan"].commands["del"], ["600"], 0, None), + # Enable STP on non-existing VLAN 1010 + (config.config.commands["spanning-tree"].commands["vlan"].commands["enable"], ["1010"], 2, "doesn't exist"), + # Disable STP on non-existing VLAN 1010 + (config.config.commands["spanning-tree"].commands["vlan"].commands["disable"], ["1010"], 2, "doesn't exist"), + ]) + def test_add_vlan_enable_pvst(self, runner, db, command, args, expected_exit_code, expected_output): + # Execute the command + result = runner.invoke(command, args, obj=db) + + # Print for debugging + print(result.exit_code) + print(result.output) + + # Check the exit code + assert result.exit_code == expected_exit_code + + # Check the output if an expected output is defined + if expected_output: + assert expected_output in result.output + + @pytest.mark.parametrize("command, args, expected_exit_code, expected_output", [ + # Valid cases + (config.config.commands["spanning-tree"].commands["hello"], ["3"], 0, None), + (config.config.commands["spanning-tree"].commands["forward_delay"], ["16"], 0, None), + (config.config.commands["spanning-tree"].commands["max_age"], ["22"], 0, None), + (config.config.commands["spanning-tree"].commands["priority"], ["8192"], 0, None), + (config.config.commands["spanning-tree"].commands["root_guard_timeout"], ["500"], 0, None), + # Invalid hello timer values + (config.config.commands["spanning-tree"].commands["hello"], ["0"], 2, + "STP hello timer must be in range 1-10"), + (config.config.commands["spanning-tree"].commands["hello"], ["20"], 2, + "STP hello timer must be in range 1-10"), + # Invalid forward delay values + (config.config.commands["spanning-tree"].commands["forward_delay"], ["2"], 2, + "STP forward delay value must be in range 4-30"), + (config.config.commands["spanning-tree"].commands["forward_delay"], ["50"], 2, + "STP forward delay value must be in range 4-30"), + # Invalid max age values + (config.config.commands["spanning-tree"].commands["max_age"], ["5"], 2, + "STP max age value must be in range 6-40"), + (config.config.commands["spanning-tree"].commands["max_age"], ["45"], 2, + "STP max age value must be in range 6-40"), + # Consistency check for forward delay and max age + (config.config.commands["spanning-tree"].commands["forward_delay"], ["4"], 2, + "2*(forward_delay-1) >= max_age >= 2*(hello_time +1 )"), + # Invalid root guard timeout values + (config.config.commands["spanning-tree"].commands["root_guard_timeout"], ["4"], 2, + "STP root guard timeout must be in range 5-600"), + (config.config.commands["spanning-tree"].commands["root_guard_timeout"], ["700"], 2, + "STP root guard timeout must be in range 5-600"), + # Invalid priority values + (config.config.commands["spanning-tree"].commands["priority"], ["65536"], 2, + "STP bridge priority must be in range 0-61440"), + (config.config.commands["spanning-tree"].commands["priority"], ["8000"], 2, + "STP bridge priority must be multiple of 4096"), + (config.config.commands["vlan"].commands["member"].commands["del"], ["500", "Ethernet4"], 0, None), + (config.config.commands["vlan"].commands["del"], ["500"], 0, None) + ]) + def test_stp_validate_global_timer_and_priority_params(self, runner, db, command, + args, expected_exit_code, expected_output): + # Execute the command + result = runner.invoke(command, args, obj=db) + + # Print for debugging + print(result.exit_code) + print(result.output) + + # Check the exit code + assert result.exit_code == expected_exit_code + + # Check the output if an expected output is defined + if expected_output: + assert expected_output in result.output + + @classmethod + def teardown_class(cls): + os.environ['UTILITIES_UNIT_TESTING'] = "0" + print("TEARDOWN") + dbconnector.load_namespace_config() + dbconnector.dedicated_dbs.clear()