From 9799471858f2c49e111c8e07062d31ba4fe85598 Mon Sep 17 00:00:00 2001 From: Danny Allen Date: Wed, 11 Mar 2020 09:14:46 -0700 Subject: [PATCH] [dvs] Refactor ACL tests to use polling interface to access redis (#1213) * Introduce new DVSDatabase abstraction and SONiC redis fixtures * Refactor ACL tests to use redis fixtures Signed-off-by: Danny Allen --- tests/conftest.py | 5 + tests/dvslib/__init__.py | 0 tests/dvslib/dvs_database.py | 378 ++++++++ tests/test_acl.py | 1694 +++++++++------------------------- 4 files changed, 839 insertions(+), 1238 deletions(-) create mode 100644 tests/dvslib/__init__.py create mode 100644 tests/dvslib/dvs_database.py diff --git a/tests/conftest.py b/tests/conftest.py index a85c9596cc..7ce4dacfb3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,9 +10,14 @@ import tarfile import StringIO import subprocess + from datetime import datetime from swsscommon import swsscommon +pytest_plugins = [ + "dvslib.dvs_database" +] + def ensure_system(cmd): (rc, output) = commands.getstatusoutput(cmd) if rc: diff --git a/tests/dvslib/__init__.py b/tests/dvslib/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/dvslib/dvs_database.py b/tests/dvslib/dvs_database.py new file mode 100644 index 0000000000..09d09122d8 --- /dev/null +++ b/tests/dvslib/dvs_database.py @@ -0,0 +1,378 @@ +""" + dvs_database contains utilities for interacting with redis when writing + tests for the virtual switch. +""" +from __future__ import print_function + +import time +import collections +import pytest + +from swsscommon import swsscommon + +APP_DB_ID = 0 +ASIC_DB_ID = 1 +COUNTERS_DB_ID = 2 +CONFIG_DB_ID = 4 +FLEX_COUNTER_DB_ID = 5 +STATE_DB_ID = 6 + + +# PollingConfig provides parameters that are used to control polling behavior +# when accessing redis: +# - polling_interval: how often to check for updates in redis +# - timeout: the max amount of time to wait for updates in redis +# - strict: if the strict flag is set, failure to receive updates will cause +# the polling method to cause tests to fail (e.g. assert False) +PollingConfig = collections.namedtuple('PollingConfig', 'polling_interval timeout strict') + + +class DVSDatabase(object): + """ + DVSDatabase provides access to redis databases on the virtual switch. + + By default, database operations are configured to use + `DEFAULT_POLLING_CONFIG`. Users can specify their own PollingConfig, + but this shouldn't typically be necessary. + """ + DEFAULT_POLLING_CONFIG = PollingConfig(polling_interval=0.01, timeout=5, strict=True) + + def __init__(self, db_id, connector): + """ + Initializes a DVSDatabase instance. + + Args: + db_id (int): The integer ID used to identify the given database + instance in redis. + connector (str): The I/O connection used to communicate with + redis (e.g. unix socket, tcp socket, etc.). + + NOTE: Currently it's most convenient to let the user specify the + connector since it's set up in the dvs fixture. We may abstract + this further in the future as we refactor dvs. + """ + + self.db_connection = swsscommon.DBConnector(db_id, connector, 0) + + def create_entry(self, table_name, key, entry): + """ + Adds the mapping {`key` -> `entry`} to the specified table. + + Args: + table_name (str): The name of the table to add the entry to. + key (str): The key that maps to the entry. + entry (Dict[str, str]): A set of key-value pairs to be stored. + """ + + table = swsscommon.Table(self.db_connection, table_name) + formatted_entry = swsscommon.FieldValuePairs(entry.items()) + table.set(key, formatted_entry) + + def wait_for_entry(self, table_name, key, + polling_config=DEFAULT_POLLING_CONFIG): + """ + Gets the entry stored at `key` in the specified table. This method + will wait for the entry to exist. + + Args: + table_name (str): The name of the table where the entry is + stored. + key (str): The key that maps to the entry being retrieved. + polling_config (PollingConfig): The parameters to use to poll + the db. + + Returns: + Dict[str, str]: The entry stored at `key`. If no entry is found, + then an empty Dict will be returned. + + """ + + access_function = self._get_entry_access_function(table_name, key, True) + return self._db_poll(polling_config, access_function) + + def delete_entry(self, table_name, key): + """ + Removes the entry stored at `key` in the specified table. + + Args: + table_name (str): The name of the table where the entry is + being removed. + key (str): The key that maps to the entry being removed. + """ + + table = swsscommon.Table(self.db_connection, table_name) + table._del(key) # pylint: disable=protected-access + + def wait_for_empty_entry(self, + table_name, + key, + polling_config=DEFAULT_POLLING_CONFIG): + """ + Checks if there is any entry stored at `key` in the specified + table. This method will wait for the entry to be empty. + + Args: + table_name (str): The name of the table being checked. + key (str): The key to be checked. + polling_config (PollingConfig): The parameters to use to poll + the db. + + Returns: + bool: True if no entry exists at `key`, False otherwise. + """ + + access_function = self._get_entry_access_function(table_name, key, False) + return not self._db_poll(polling_config, access_function) + + def wait_for_n_keys(self, + table_name, + num_keys, + polling_config=DEFAULT_POLLING_CONFIG): + """ + Gets all of the keys stored in the specified table. This method + will wait for the specified number of keys. + + Args: + table_name (str): The name of the table from which to fetch + the keys. + num_keys (int): The expected number of keys to retrieve from + the table. + polling_config (PollingConfig): The parameters to use to poll + the db. + + Returns: + List[str]: The keys stored in the table. If no keys are found, + then an empty List will be returned. + """ + + access_function = self._get_keys_access_function(table_name, num_keys) + return self._db_poll(polling_config, access_function) + + def _get_keys_access_function(self, table_name, num_keys): + """ + Generates an access function to check for `num_keys` in the given + table and return the list of keys if successful. + + Args: + table_name (str): The name of the table from which to fetch + the keys. + num_keys (int): The number of keys to check for in the table. + If this is set to None, then this function will just return + whatever keys are in the table. + + Returns: + Callable([[], (bool, List[str])]): A function that can be + called to access the database. + + If `num_keys` keys are found in the given table, or left + unspecified, then the function will return True along with + the list of keys that were found. Otherwise, the function will + return False and some undefined list of keys. + """ + + table = swsscommon.Table(self.db_connection, table_name) + + def _accessor(): + keys = table.getKeys() + if not keys: + keys = [] + + if not num_keys and num_keys != 0: + status = True + else: + status = len(keys) == num_keys + + return (status, keys) + + return _accessor + + def _get_entry_access_function(self, table_name, key, expect_entry): + """ + Generates an access function to check for existence of an entry + at `key` and return it if successful. + + Args: + table_name (str): The name of the table from which to fetch + the entry. + key (str): The key that maps to the entry being retrieved. + expect_entry (bool): Whether or not we expect to see an entry + at `key`. + + Returns: + Callable([[], (bool, Dict[str, str])]): A function that can be + called to access the database. + + If `expect_entry` is set and an entry is found, then the + function will return True along with the entry that was found. + + If `expect_entry` is not set and no entry is found, then the + function will return True along with an empty Dict. + + In all other cases, the function will return False with some + undefined Dict. + """ + + table = swsscommon.Table(self.db_connection, table_name) + + def _accessor(): + (status, fv_pairs) = table.get(key) + + status = expect_entry == status + + if fv_pairs: + entry = dict(fv_pairs) + else: + entry = {} + + return (status, entry) + + return _accessor + + @staticmethod + def _db_poll(polling_config, access_function): + """ + _db_poll will periodically run `access_function` on the database + using the parameters described in `polling_config` and return the + output of the access function. + + Args: + polling_config (PollingConfig): The parameters to use to poll + the db. + access_function (Callable[[], (bool, Any)]): The function used + for polling the db. Note that the function must return a + status which indicates if the function was succesful or + not, as well as some return value. + + Returns: + Any: The output of the access function, if it is succesful, + None otherwise. + """ + if polling_config.polling_interval == 0: + iterations = 1 + else: + iterations = int(polling_config.timeout // polling_config.polling_interval) + 1 + + for _ in range(iterations): + (status, result) = access_function() + + if status: + return result + + time.sleep(polling_config.polling_interval) + + if polling_config.strict: + assert False + + return None + + +@pytest.fixture +def app_db(dvs): + """ + Provides access to the SONiC APP DB. + + Args: + dvs (DockerVirtualSwitch): The dvs fixture, automatically injected + by pytest. + + Returns: + DVSDatabase: An instance of APP DB + """ + + return DVSDatabase(APP_DB_ID, dvs.redis_sock) + + +@pytest.fixture +def asic_db(dvs): + """ + Provides access to the SONiC ASIC DB. + + Args: + dvs (DockerVirtualSwitch): The dvs fixture, automatically injected + by pytest. + + Attributes: + default_acl_tables (List[str]): IDs for the ACL tables that are + configured in SONiC by default + default_acl_entries (List[str]): IDs for the ACL rules that are + configured in SONiC by default + port_name_map (Dict[str, str]): A mapping from interface names + (e.g. Ethernet0) to port IDs + + Returns: + DVSDatabase: An instance of ASIC DB + """ + + db = DVSDatabase(ASIC_DB_ID, dvs.redis_sock) # pylint: disable=invalid-name + + # NOTE: This is an ugly hack to emulate the current asic db behavior, + # this will be refactored along with the dvs fixture. + db.default_acl_tables = dvs.asicdb.default_acl_tables # pylint: disable=attribute-defined-outside-init + db.default_acl_entries = dvs.asicdb.default_acl_entries # pylint: disable=attribute-defined-outside-init + db.port_name_map = dvs.asicdb.portnamemap # pylint: disable=attribute-defined-outside-init + + return db + + +@pytest.fixture +def counters_db(dvs): + """ + Provides access to the SONiC Counters DB. + + Args: + dvs (DockerVirtualSwitch): The dvs fixture, automatically injected + by pytest. + + Returns: + DVSDatabase: An instance of Counters DB + """ + + return DVSDatabase(COUNTERS_DB_ID, dvs.redis_sock) + + +@pytest.fixture +def config_db(dvs): + """ + Provides access to the SONiC Config DB. + + Args: + dvs (DockerVirtualSwitch): The dvs fixture, automatically injected + by pytest. + + Returns: + DVSDatabase: An instance of Config DB + """ + + return DVSDatabase(CONFIG_DB_ID, dvs.redis_sock) + + +@pytest.fixture +def flex_counter_db(dvs): + """ + Provides access to the SONiC Flex Counter DB. + + Args: + dvs (DockerVirtualSwitch): The dvs fixture, automatically injected + by pytest. + + Returns: + DVSDatabase: An instance of Flex Counter DB + """ + + return DVSDatabase(FLEX_COUNTER_DB_ID, dvs.redis_sock) + + +@pytest.fixture +def state_db(dvs): + """ + Provides access to the SONiC State DB. + + Args: + dvs (DockerVirtualSwitch): The dvs fixture, automatically injected + by pytest. + + Returns: + DVSDatabase: An instance of State DB + """ + + return DVSDatabase(STATE_DB_ID, dvs.redis_sock) diff --git a/tests/test_acl.py b/tests/test_acl.py index fa571aed11..f46b171f3d 100644 --- a/tests/test_acl.py +++ b/tests/test_acl.py @@ -1,1409 +1,632 @@ import time -import re -import json -import pytest - -from swsscommon import swsscommon class BaseTestAcl(object): - """ base class with helpers for Test classes """ - def setup_db(self, dvs): - self.pdb = swsscommon.DBConnector(0, dvs.redis_sock, 0) - self.adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) - self.cdb = swsscommon.DBConnector(4, dvs.redis_sock, 0) - self.sdb = swsscommon.DBConnector(6, dvs.redis_sock, 0) - - def create_acl_table(self, table, type, ports, stage=None): - tbl = swsscommon.Table(self.cdb, "ACL_TABLE") - table_props = [("policy_desc", "test"), - ("type", type), - ("ports", ",".join(ports))] - - if stage is not None: - table_props += [("stage", stage)] - - fvs = swsscommon.FieldValuePairs(table_props) - tbl.set(table, fvs) - time.sleep(1) + def setup_db(self, asic_db, config_db, state_db): + self.asic_db = asic_db + self.config_db = config_db + self.state_db = state_db + + def create_acl_table(self, table_name, table_type, ports, stage=None): + table_attrs = { + "policy_desc": "DVS acl table test", + "type": table_type, + "ports": ",".join(ports) + } - def remove_acl_table(self, table): - tbl = swsscommon.Table(self.cdb, "ACL_TABLE") - tbl._del(table) - time.sleep(1) + if stage: + table_attrs["stage"] = stage - def get_acl_table_id(self, dvs): - tbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE") - keys = tbl.getKeys() + self.config_db.create_entry("ACL_TABLE", table_name, table_attrs) - for k in dvs.asicdb.default_acl_tables: - assert k in keys + def remove_acl_table(self, table_name): + self.config_db.delete_entry("ACL_TABLE", table_name) - acl_tables = [k for k in keys if k not in dvs.asicdb.default_acl_tables] - assert len(acl_tables) == 1 + def get_acl_table_id(self): + num_keys = len(self.asic_db.default_acl_tables) + 1 + keys = self.asic_db.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE", num_keys) + acl_tables = [k for k in keys if k not in self.asic_db.default_acl_tables] return acl_tables[0] - def verify_if_any_acl_table_created(self, dvs, adb): - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE") - keys = atbl.getKeys() - for k in dvs.asicdb.default_acl_tables: - assert k in keys - acl_tables = [k for k in keys if k not in dvs.asicdb.default_acl_tables] - - if len(acl_tables) != 0: - return True - - return False - - def clean_up_left_over(self, dvs): - adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP") - keys = atbl.getKeys() - for key in keys: - atbl._del(key) - - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP") - keys = atbl.getKeys() - assert len(keys) == 0 - - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP_MEMBER") - keys = atbl.getKeys() - for key in keys: - atbl._del(key) - - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP_MEMBER") - keys = atbl.getKeys() - assert len(keys) == 0 - - def verify_acl_group_num(self, adb, expt): - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP") - acl_table_groups = atbl.getKeys() - assert len(acl_table_groups) == expt - - for k in acl_table_groups: - (status, fvs) = atbl.get(k) - assert status == True - for fv in fvs: - if fv[0] == "SAI_ACL_TABLE_GROUP_ATTR_ACL_STAGE": - assert fv[1] == "SAI_ACL_STAGE_INGRESS" - elif fv[0] == "SAI_ACL_TABLE_GROUP_ATTR_ACL_BIND_POINT_TYPE_LIST": - assert fv[1] == "1:SAI_ACL_BIND_POINT_TYPE_PORT" - elif fv[0] == "SAI_ACL_TABLE_GROUP_ATTR_TYPE": - assert fv[1] == "SAI_ACL_TABLE_GROUP_TYPE_PARALLEL" + def verify_no_acl_tables(self): + num_keys = len(self.asic_db.default_acl_tables) + keys = self.asic_db.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE", num_keys) + assert set(keys) == set(self.asic_db.default_acl_tables) + + def verify_acl_group_num(self, expt): + acl_table_groups = self.asic_db.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP", expt) + + for group in acl_table_groups: + fvs = self.asic_db.wait_for_entry("ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP", group) + for k, v in fvs.items(): + if k == "SAI_ACL_TABLE_GROUP_ATTR_ACL_STAGE": + assert v == "SAI_ACL_STAGE_INGRESS" + elif k == "SAI_ACL_TABLE_GROUP_ATTR_ACL_BIND_POINT_TYPE_LIST": + assert v == "1:SAI_ACL_BIND_POINT_TYPE_PORT" + elif k == "SAI_ACL_TABLE_GROUP_ATTR_TYPE": + assert v == "SAI_ACL_TABLE_GROUP_TYPE_PARALLEL" else: assert False - def verify_acl_group_member(self, adb, acl_group_ids, acl_table_id): - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP_MEMBER") - keys = atbl.getKeys() + def verify_acl_group_member(self, acl_group_ids, acl_table_id): + members = self.asic_db.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP_MEMBER", len(acl_group_ids)) member_groups = [] - for k in keys: - (status, fvs) = atbl.get(k) - assert status == True - assert len(fvs) == 3 - for fv in fvs: - if fv[0] == "SAI_ACL_TABLE_GROUP_MEMBER_ATTR_ACL_TABLE_GROUP_ID": - assert fv[1] in acl_group_ids - member_groups.append(fv[1]) - elif fv[0] == "SAI_ACL_TABLE_GROUP_MEMBER_ATTR_ACL_TABLE_ID": - assert fv[1] == acl_table_id - elif fv[0] == "SAI_ACL_TABLE_GROUP_MEMBER_ATTR_PRIORITY": + for member in members: + fvs = self.asic_db.wait_for_entry("ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP_MEMBER", member) + for k, v in fvs.items(): + if k == "SAI_ACL_TABLE_GROUP_MEMBER_ATTR_ACL_TABLE_GROUP_ID": + assert v in acl_group_ids + member_groups.append(v) + elif k == "SAI_ACL_TABLE_GROUP_MEMBER_ATTR_ACL_TABLE_ID": + assert v == acl_table_id + elif k == "SAI_ACL_TABLE_GROUP_MEMBER_ATTR_PRIORITY": assert True else: assert False assert set(member_groups) == set(acl_group_ids) - def verify_acl_port_binding(self, dvs, adb, bind_ports): - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP") - acl_table_groups = atbl.getKeys() - assert len(acl_table_groups) == len(bind_ports) + def verify_acl_port_binding(self, bind_ports): + acl_table_groups = self.asic_db.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP", len(bind_ports)) - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_PORT") port_groups = [] - for p in [dvs.asicdb.portnamemap[portname] for portname in bind_ports]: - (status, fvs) = atbl.get(p) - for fv in fvs: - if fv[0] == "SAI_PORT_ATTR_INGRESS_ACL": - assert fv[1] in acl_table_groups - port_groups.append(fv[1]) + for port in [self.asic_db.port_name_map[p] for p in bind_ports]: + fvs = self.asic_db.wait_for_entry("ASIC_STATE:SAI_OBJECT_TYPE_PORT", port) + acl_table_group = fvs.pop("SAI_PORT_ATTR_INGRESS_ACL", None) + assert acl_table_group in acl_table_groups + port_groups.append(acl_table_group) assert len(port_groups) == len(bind_ports) assert set(port_groups) == set(acl_table_groups) - def check_rule_existence(self, entry, rules, verifs): - """ helper function to verify if rule exists """ - - for rule in rules: - ruleD = dict(rule) - # find the rule to match with based on priority - if ruleD["PRIORITY"] == entry['SAI_ACL_ENTRY_ATTR_PRIORITY']: - ruleIndex = rules.index(rule) - # use verification dictionary to match entry to rule - for key in verifs[ruleIndex]: - assert verifs[ruleIndex][key] == entry[key] - return True - return False - - def create_acl_rule(self, table, rule, field, value): - tbl = swsscommon.Table(self.cdb, "ACL_RULE") - fvs = swsscommon.FieldValuePairs([("priority", "666"), - ("PACKET_ACTION", "FORWARD"), - (field, value)]) - tbl.set(table + "|" + rule, fvs) - time.sleep(1) - - def remove_acl_rule(self, table, rule): - tbl = swsscommon.Table(self.cdb, "ACL_RULE") - tbl._del(table + "|" + rule) - time.sleep(1) - - def verify_acl_rule(self, dvs, field, value): - acl_table_id = self.get_acl_table_id(dvs) - - tbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - acl_entries = [k for k in tbl.getKeys() if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entries) == 1 - - (status, fvs) = tbl.get(acl_entries[0]) - assert status == True - assert len(fvs) == 6 - for fv in fvs: - if fv[0] == "SAI_ACL_ENTRY_ATTR_TABLE_ID": - assert fv[1] == acl_table_id - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ADMIN_STATE": - assert fv[1] == "true" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_PRIORITY": - assert fv[1] == "666" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_COUNTER": - assert True - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION": - assert fv[1] == "SAI_PACKET_ACTION_FORWARD" - elif fv[0] == field: - assert fv[1] == value - else: - assert False + def create_acl_rule(self, table_name, rule_name, qualifiers, action="FORWARD", priority="2020"): + fvs = { + "priority": priority, + "PACKET_ACTION": action + } + for k, v in qualifiers.items(): + fvs[k] = v -class TestAcl(BaseTestAcl): - def test_AclTableCreation(self, dvs, testlog): - self.setup_db(dvs) - db = swsscommon.DBConnector(4, dvs.redis_sock, 0) - adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) + self.config_db.create_entry("ACL_RULE", "{}|{}".format(table_name, rule_name), fvs) - # create ACL_TABLE in config db - bind_ports = ["Ethernet0", "Ethernet4"] - tbl = swsscommon.Table(db, "ACL_TABLE") - fvs = swsscommon.FieldValuePairs([("policy_desc", "test"), ("type", "L3"), ("ports", ",".join(bind_ports))]) - tbl.set("test", fvs) - time.sleep(1) + def remove_acl_rule(self, table_name, rule_name): + self.config_db.delete_entry("ACL_RULE", "{}|{}".format(table_name, rule_name)) - # check acl table in asic db - test_acl_table_id = self.get_acl_table_id(dvs) - assert test_acl_table_id + def get_acl_rule_id(self): + num_keys = len(self.asic_db.default_acl_entries) + 1 + keys = self.asic_db.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY", num_keys) - # check acl table group in asic db - self.verify_acl_group_num(adb, 2) + acl_entries = [k for k in keys if k not in self.asic_db.default_acl_entries] + return acl_entries[0] - # get acl table group ids and verify the id numbers - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP") - acl_group_ids = atbl.getKeys() - assert len(acl_group_ids) == 2 + def verify_no_acl_rules(self): + num_keys = len(self.asic_db.default_acl_entries) + keys = self.asic_db.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY", num_keys) + assert set(keys) == set(self.asic_db.default_acl_entries) - # check acl table group member - self.verify_acl_group_member(adb, acl_group_ids, test_acl_table_id) + def verify_acl_rule(self, qualifiers, action="FORWARD", priority="2020"): + acl_rule_id = self.get_acl_rule_id() - # check port binding - self.verify_acl_port_binding(dvs, adb, bind_ports) + fvs = self.asic_db.wait_for_entry("ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY", acl_rule_id) + self._check_acl_entry(fvs, qualifiers, action, priority) - def test_AclRuleL4SrcPort(self, dvs, testlog): - """ - hmset ACL_RULE|test|acl_test_rule priority 55 PACKET_ACTION FORWARD L4_SRC_PORT 65000 - """ + def verify_acl_rule_set(self, priorities, in_actions, expected): + num_keys = len(self.asic_db.default_acl_entries) + len(priorities) + keys = self.asic_db.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY", num_keys) + + acl_entries = [k for k in keys if k not in self.asic_db.default_acl_entries] + for entry in acl_entries: + rule = self.asic_db.wait_for_entry("ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY", entry) + priority = rule.get("SAI_ACL_ENTRY_ATTR_PRIORITY", None) + assert priority in priorities + self._check_acl_entry(rule, expected[priority], + action=in_actions[priority], priority=priority) - self.setup_db(dvs) - db = swsscommon.DBConnector(4, dvs.redis_sock, 0) - adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) - - # create acl rule - tbl = swsscommon.Table(db, "ACL_RULE") - fvs = swsscommon.FieldValuePairs([("priority", "55"), ("PACKET_ACTION", "FORWARD"), ("L4_SRC_PORT", "65000")]) - tbl.set("test|acl_test_rule", fvs) - - time.sleep(1) - - test_acl_table_id = self.get_acl_table_id(dvs) - - # check acl table in asic db - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() - - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 1 - - (status, fvs) = atbl.get(acl_entry[0]) - assert status == True - assert len(fvs) == 6 - for fv in fvs: - if fv[0] == "SAI_ACL_ENTRY_ATTR_TABLE_ID": - assert fv[1] == test_acl_table_id - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ADMIN_STATE": - assert fv[1] == "true" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_PRIORITY": - assert fv[1] == "55" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_COUNTER": + def _check_acl_entry(self, entry, qualifiers, action, priority): + acl_table_id = self.get_acl_table_id() + + for k, v in entry.items(): + if k == "SAI_ACL_ENTRY_ATTR_TABLE_ID": + assert v == acl_table_id + elif k == "SAI_ACL_ENTRY_ATTR_ADMIN_STATE": + assert v == "true" + elif k == "SAI_ACL_ENTRY_ATTR_PRIORITY": + assert v == priority + elif k == "SAI_ACL_ENTRY_ATTR_ACTION_COUNTER": assert True - elif fv[0] == "SAI_ACL_ENTRY_ATTR_FIELD_L4_SRC_PORT": - assert fv[1] == "65000&mask:0xffff" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION": - assert fv[1] == "SAI_PACKET_ACTION_FORWARD" + elif k == "SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION": + if action == "FORWARD": + assert v == "SAI_PACKET_ACTION_FORWARD" + elif action == "DROP": + assert v == "SAI_PACKET_ACTION_DROP" + else: + assert False + elif k == "SAI_ACL_ENTRY_ATTR_ACTION_REDIRECT": + if "REDIRECT" not in action: + assert False + elif k in qualifiers: + assert qualifiers[k](v) else: assert False - # remove acl rule - tbl._del("test|acl_test_rule") + def get_simple_qualifier_comparator(self, expected_qualifier): + def _match_qualifier(sai_qualifier): + return expected_qualifier == sai_qualifier - time.sleep(1) + return _match_qualifier - (status, fvs) = atbl.get(acl_entry[0]) - assert status == False + def get_port_list_comparator(self, expected_ports): + def _match_port_list(sai_port_list): + if not sai_port_list.startswith("{}:".format(len(expected_ports))): + return False + for port in expected_ports: + if self.asic_db.port_name_map[port] not in sai_port_list: + return False - def test_AclRuleInOutPorts(self, dvs, testlog): - """ - hmset ACL_RULE|test|acl_test_rule priority 55 PACKET_ACTION FORWARD IN_PORTS Ethernet0,Ethernet4 OUT_PORTS Ethernet8,Ethernet12 - """ + return True - self.setup_db(dvs) - db = swsscommon.DBConnector(4, dvs.redis_sock, 0) - adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) - - # create acl rule - tbl = swsscommon.Table(db, "ACL_RULE") - fvs = swsscommon.FieldValuePairs([("priority", "55"), - ("PACKET_ACTION", "FORWARD"), - ("IN_PORTS", "Ethernet0,Ethernet4"), - ("OUT_PORTS", "Ethernet8,Ethernet12")]) - tbl.set("test|acl_test_rule", fvs) - - time.sleep(1) - - test_acl_table_id = self.get_acl_table_id(dvs) - - # check acl table in asic db - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() - - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 1 - - (status, fvs) = atbl.get(acl_entry[0]) - assert status == True - assert len(fvs) == 7 - for fv in fvs: - if fv[0] == "SAI_ACL_ENTRY_ATTR_TABLE_ID": - assert fv[1] == test_acl_table_id - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ADMIN_STATE": - assert fv[1] == "true" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_PRIORITY": - assert fv[1] == "55" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_COUNTER": - assert True - elif fv[0] == "SAI_ACL_ENTRY_ATTR_FIELD_IN_PORTS": - assert fv[1].startswith("2:") - assert dvs.asicdb.portnamemap["Ethernet0"] in fv[1] - assert dvs.asicdb.portnamemap["Ethernet4"] in fv[1] - elif fv[0] == "SAI_ACL_ENTRY_ATTR_FIELD_OUT_PORTS": - assert fv[1].startswith("2:") - assert dvs.asicdb.portnamemap["Ethernet8"] in fv[1] - assert dvs.asicdb.portnamemap["Ethernet12"] in fv[1] - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION": - assert fv[1] == "SAI_PACKET_ACTION_FORWARD" - else: - assert False + return _match_port_list + + def get_acl_range_comparator(self, expected_type, expected_ports): + def _match_acl_range(sai_acl_range): + range_id = sai_acl_range.split(":", 1)[1] + fvs = self.asic_db.wait_for_entry("ASIC_STATE:SAI_OBJECT_TYPE_ACL_RANGE", range_id) + for k, v in fvs.items(): + if k == "SAI_ACL_RANGE_ATTR_TYPE" and v == expected_type: + continue + elif k == "SAI_ACL_RANGE_ATTR_LIMIT" and v == expected_ports: + continue + else: + return False - # remove acl rule - tbl._del("test|acl_test_rule") - - time.sleep(1) - - (status, fvs) = atbl.get(acl_entry[0]) - assert status == False - - def test_AclRuleInPortsNonExistingInterface(self, dvs, testlog): - self.setup_db(dvs) - - # Create ACL rule with a completely wrong interface - tbl = swsscommon.Table(self.cdb, "ACL_RULE") - fvs = swsscommon.FieldValuePairs([("priority", "55"), - ("PACKET_ACTION", "FORWARD"), - ("IN_PORTS", "FOO_BAR_BAZ")]) - tbl.set("test|foo_bar_baz", fvs) - time.sleep(1) - - # Make sure no rules were created - atbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 0 - - # Create ACL rule with a correct interface and a completely wrong interface - tbl = swsscommon.Table(self.cdb, "ACL_RULE") - fvs = swsscommon.FieldValuePairs([("priority", "55"), - ("PACKET_ACTION", "FORWARD"), - ("IN_PORTS", "Ethernet0,FOO_BAR_BAZ")]) - tbl.set("test|foo_bar_baz", fvs) - time.sleep(1) - - # Make sure no rules were created - atbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 0 - - # Delete rule - tbl._del("test|foo_bar_baz") - time.sleep(1) - - def test_AclRuleOutPortsNonExistingInterface(self, dvs, testlog): - self.setup_db(dvs) - - # Create ACL rule with a completely wrong interface - tbl = swsscommon.Table(self.cdb, "ACL_RULE") - fvs = swsscommon.FieldValuePairs([("priority", "55"), - ("PACKET_ACTION", "FORWARD"), - ("OUT_PORTS", "FOO_BAR_BAZ")]) - tbl.set("test|foo_bar_baz", fvs) - time.sleep(1) - - # Make sure no rules were created - atbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 0 - - # Create ACL rule with a correct interface and a completely wrong interface - tbl = swsscommon.Table(self.cdb, "ACL_RULE") - fvs = swsscommon.FieldValuePairs([("priority", "55"), - ("PACKET_ACTION", "FORWARD"), - ("OUT_PORTS", "Ethernet0,FOO_BAR_BAZ")]) - tbl.set("test|foo_bar_baz", fvs) - time.sleep(1) - - # Make sure no rules were created - atbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 0 - - # Delete rule - tbl._del("test|foo_bar_baz") - time.sleep(1) - - def test_AclTableDeletion(self, dvs, testlog): - self.setup_db(dvs) - - tbl = swsscommon.Table(self.cdb, "ACL_TABLE") - tbl._del("test") - - time.sleep(1) - - atbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE") - keys = atbl.getKeys() - # only the default table was left along with DTel tables - assert len(keys) >= 1 - - def test_V6AclTableCreation(self, dvs, testlog): - - self.setup_db(dvs) - db = swsscommon.DBConnector(4, dvs.redis_sock, 0) - adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) + return True - bind_ports = ["Ethernet0", "Ethernet4", "Ethernet8"] - # create ACL_TABLE in config db - tbl = swsscommon.Table(db, "ACL_TABLE") - fvs = swsscommon.FieldValuePairs([("policy_desc", "testv6"), ("type", "L3V6"), ("ports", ",".join(bind_ports))]) - tbl.set("test-aclv6", fvs) - - time.sleep(1) - - # check acl table in asic db - test_acl_table_id = self.get_acl_table_id(dvs) - - # check acl table group in asic db - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP") - acl_table_groups = atbl.getKeys() - assert len(acl_table_groups) == 3 - - for k in acl_table_groups: - (status, fvs) = atbl.get(k) - assert status == True - - for fv in fvs: - if fv[0] == "SAI_ACL_TABLE_GROUP_ATTR_ACL_STAGE": - assert fv[1] == "SAI_ACL_STAGE_INGRESS" - elif fv[0] == "SAI_ACL_TABLE_GROUP_ATTR_ACL_BIND_POINT_TYPE_LIST": - assert fv[1] == "1:SAI_ACL_BIND_POINT_TYPE_PORT" - elif fv[0] == "SAI_ACL_TABLE_GROUP_ATTR_TYPE": - assert fv[1] == "SAI_ACL_TABLE_GROUP_TYPE_PARALLEL" - else: - assert False + return _match_acl_range - # check acl table group member - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP_MEMBER") - keys = atbl.getKeys() - # three ports - assert len(keys) == 3 +class TestAcl(BaseTestAcl): + def test_AclTableCreation(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) - member_groups = [] - for k in keys: - (status, fvs) = atbl.get(k) - assert status == True - - assert len(fvs) == 3 - for fv in fvs: - if fv[0] == "SAI_ACL_TABLE_GROUP_MEMBER_ATTR_ACL_TABLE_GROUP_ID": - assert fv[1] in acl_table_groups - member_groups.append(fv[1]) - elif fv[0] == "SAI_ACL_TABLE_GROUP_MEMBER_ATTR_ACL_TABLE_ID": - assert fv[1] == test_acl_table_id - elif fv[0] == "SAI_ACL_TABLE_GROUP_MEMBER_ATTR_PRIORITY": - assert True - else: - assert False + bind_ports = ["Ethernet0", "Ethernet4"] + self.create_acl_table("test", "L3", bind_ports) - assert set(member_groups) == set(acl_table_groups) + self.verify_acl_group_num(len(bind_ports)) + acl_group_ids = self.asic_db.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP", len(bind_ports)) + self.verify_acl_group_member(acl_group_ids, self.get_acl_table_id()) + self.verify_acl_port_binding(bind_ports) - # check port binding - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_PORT") + def test_AclRuleL4SrcPort(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) - port_groups = [] - for p in [dvs.asicdb.portnamemap[portname] for portname in bind_ports]: - (status, fvs) = atbl.get(p) - for fv in fvs: - if fv[0] == "SAI_PORT_ATTR_INGRESS_ACL": - assert fv[1] in acl_table_groups - port_groups.append(fv[1]) + config_qualifiers = {"L4_SRC_PORT": "65000"} + expected_sai_qualifiers = {"SAI_ACL_ENTRY_ATTR_FIELD_L4_SRC_PORT": self.get_simple_qualifier_comparator("65000&mask:0xffff")} - assert set(port_groups) == set(acl_table_groups) + self.create_acl_rule("test", "acl_test_rule", config_qualifiers) + self.verify_acl_rule(expected_sai_qualifiers) - def test_V6AclRuleIPv6Any(self, dvs, testlog): - """ - hmset ACL_RULE|test-aclv6|test_rule1 priority 1000 PACKET_ACTION FORWARD IPv6Any - """ + self.remove_acl_rule("test", "acl_test_rule") + self.verify_no_acl_rules() - self.setup_db(dvs) - db = swsscommon.DBConnector(4, dvs.redis_sock, 0) - adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) - - # create acl rule - tbl = swsscommon.Table(db, "ACL_RULE") - fvs = swsscommon.FieldValuePairs([("priority", "1001"), ("PACKET_ACTION", "FORWARD"), ("IP_TYPE", "IPv6ANY")]) - tbl.set("test-aclv6|test_rule1", fvs) - - time.sleep(1) - - test_acl_table_id = self.get_acl_table_id(dvs) - - # check acl table in asic db - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() - - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 1 - - (status, fvs) = atbl.get(acl_entry[0]) - assert status == True - assert len(fvs) == 6 - for fv in fvs: - if fv[0] == "SAI_ACL_ENTRY_ATTR_TABLE_ID": - assert fv[1] == test_acl_table_id - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ADMIN_STATE": - assert fv[1] == "true" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_PRIORITY": - assert fv[1] == "1001" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_COUNTER": - assert True - elif fv[0] == "SAI_ACL_ENTRY_ATTR_FIELD_ACL_IP_TYPE": - assert fv[1] == "SAI_ACL_IP_TYPE_IPV6ANY&mask:0xffffffffffffffff" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION": - assert fv[1] == "SAI_PACKET_ACTION_FORWARD" - else: - assert False + def test_AclRuleInOutPorts(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) - # remove acl rule - tbl._del("test-aclv6|test_rule1") + config_qualifiers = { + "IN_PORTS": "Ethernet0,Ethernet4", + "OUT_PORTS": "Ethernet8,Ethernet12" + } - time.sleep(1) + expected_sai_qualifiers = { + "SAI_ACL_ENTRY_ATTR_FIELD_IN_PORTS": self.get_port_list_comparator(["Ethernet0", "Ethernet4"]), + "SAI_ACL_ENTRY_ATTR_FIELD_OUT_PORTS": self.get_port_list_comparator(["Ethernet8", "Ethernet12"]) + } - (status, fvs) = atbl.get(acl_entry[0]) - assert status == False + self.create_acl_rule("test", "acl_test_rule", config_qualifiers) + self.verify_acl_rule(expected_sai_qualifiers) - def test_V6AclRuleIPv6AnyDrop(self, dvs, testlog): - """ - hmset ACL_RULE|test-aclv6|test_rule2 priority 1002 PACKET_ACTION DROP IPv6Any - """ + self.remove_acl_rule("test", "acl_test_rule") + self.verify_no_acl_rules() - self.setup_db(dvs) - db = swsscommon.DBConnector(4, dvs.redis_sock, 0) - adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) - - # create acl rule - tbl = swsscommon.Table(db, "ACL_RULE") - fvs = swsscommon.FieldValuePairs([("priority", "1002"), ("PACKET_ACTION", "DROP"), ("IP_TYPE", "IPv6ANY")]) - tbl.set("test-aclv6|test_rule2", fvs) - - time.sleep(1) - - test_acl_table_id = self.get_acl_table_id(dvs) - - # check acl table in asic db - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() - - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 1 - - (status, fvs) = atbl.get(acl_entry[0]) - assert status == True - assert len(fvs) == 6 - for fv in fvs: - if fv[0] == "SAI_ACL_ENTRY_ATTR_TABLE_ID": - assert fv[1] == test_acl_table_id - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ADMIN_STATE": - assert fv[1] == "true" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_PRIORITY": - assert fv[1] == "1002" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_COUNTER": - assert True - elif fv[0] == "SAI_ACL_ENTRY_ATTR_FIELD_ACL_IP_TYPE": - assert fv[1] == "SAI_ACL_IP_TYPE_IPV6ANY&mask:0xffffffffffffffff" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION": - assert fv[1] == "SAI_PACKET_ACTION_DROP" - else: - assert False + def test_AclRuleInPortsNonExistingInterface(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) - # remove acl rule - tbl._del("test-aclv6|test_rule2") + config_qualifiers = { + "IN_PORTS": "FOO_BAR_BAZ" + } - time.sleep(1) + self.create_acl_rule("test", "acl_test_rule", config_qualifiers) - (status, fvs) = atbl.get(acl_entry[0]) - assert status == False + self.verify_no_acl_rules() + self.remove_acl_rule("test", "acl_test_rule") - def test_V6AclRuleIpProtocol(self, dvs, testlog): - """ - hmset ACL_RULE|test-aclv6|test_rule3 priority 1003 PACKET_ACTION DROP IP_PROTOCOL 6 - """ + def test_AclRuleOutPortsNonExistingInterface(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) - self.setup_db(dvs) - db = swsscommon.DBConnector(4, dvs.redis_sock, 0) - adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) - - # create acl rule - tbl = swsscommon.Table(db, "ACL_RULE") - fvs = swsscommon.FieldValuePairs([("priority", "1003"), ("PACKET_ACTION", "DROP"), ("IP_PROTOCOL", "6")]) - tbl.set("test-aclv6|test_rule3", fvs) - - time.sleep(1) - - test_acl_table_id = self.get_acl_table_id(dvs) - - # check acl table in asic db - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() - - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 1 - - (status, fvs) = atbl.get(acl_entry[0]) - assert status == True - assert len(fvs) == 6 - for fv in fvs: - if fv[0] == "SAI_ACL_ENTRY_ATTR_TABLE_ID": - assert fv[1] == test_acl_table_id - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ADMIN_STATE": - assert fv[1] == "true" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_PRIORITY": - assert fv[1] == "1003" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_COUNTER": - assert True - elif fv[0] == "SAI_ACL_ENTRY_ATTR_FIELD_IP_PROTOCOL": - assert fv[1] == "6&mask:0xff" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION": - assert fv[1] == "SAI_PACKET_ACTION_DROP" - else: - assert False + config_qualifiers = { + "OUT_PORTS": "FOO_BAR_BAZ" + } - # remove acl rule - tbl._del("test-aclv6|test_rule3") + self.create_acl_rule("test", "acl_test_rule", config_qualifiers) - time.sleep(1) + self.verify_no_acl_rules() + self.remove_acl_rule("test", "acl_test_rule") - (status, fvs) = atbl.get(acl_entry[0]) - assert status == False + def test_AclTableDeletion(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) - def test_V6AclRuleSrcIPv6(self, dvs, testlog): - """ - hmset ACL_RULE|test-aclv6|test_rule4 priority 1004 PACKET_ACTION DROP SRC_IPV6 2777::0/64 - """ + self.remove_acl_table("test") + self.verify_no_acl_tables() - self.setup_db(dvs) - db = swsscommon.DBConnector(4, dvs.redis_sock, 0) - adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) - - # create acl rule - tbl = swsscommon.Table(db, "ACL_RULE") - fvs = swsscommon.FieldValuePairs([("priority", "1004"), ("PACKET_ACTION", "DROP"), ("SRC_IPV6", "2777::0/64")]) - tbl.set("test-aclv6|test_rule4", fvs) - - time.sleep(1) - - test_acl_table_id = self.get_acl_table_id(dvs) - - # check acl table in asic db - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() - - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 1 - - (status, fvs) = atbl.get(acl_entry[0]) - assert status == True - assert len(fvs) == 6 - for fv in fvs: - if fv[0] == "SAI_ACL_ENTRY_ATTR_TABLE_ID": - assert fv[1] == test_acl_table_id - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ADMIN_STATE": - assert fv[1] == "true" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_PRIORITY": - assert fv[1] == "1004" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_COUNTER": - assert True - elif fv[0] == "SAI_ACL_ENTRY_ATTR_FIELD_SRC_IPV6": - assert fv[1] == "2777::&mask:ffff:ffff:ffff:ffff::" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION": - assert fv[1] == "SAI_PACKET_ACTION_DROP" - else: - assert False + def test_V6AclTableCreation(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) - # remove acl rule - tbl._del("test-aclv6|test_rule4") + bind_ports = ["Ethernet0", "Ethernet4", "Ethernet8"] + self.create_acl_table("test_aclv6", "L3V6", bind_ports) - time.sleep(1) + self.verify_acl_group_num(len(bind_ports)) + acl_group_ids = self.asic_db.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP", len(bind_ports)) + self.verify_acl_group_member(acl_group_ids, self.get_acl_table_id()) + self.verify_acl_port_binding(bind_ports) - (status, fvs) = atbl.get(acl_entry[0]) - assert status == False + def test_V6AclRuleIPv6Any(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) - def test_V6AclRuleDstIPv6(self, dvs, testlog): - """ - hmset ACL_RULE|test-aclv6|test_rule5 priority 1005 PACKET_ACTION DROP DST_IPV6 2002::2/128 - """ + config_qualifiers = {"IP_TYPE": "IPv6ANY"} + expected_sai_qualifiers = { + "SAI_ACL_ENTRY_ATTR_FIELD_ACL_IP_TYPE": self.get_simple_qualifier_comparator("SAI_ACL_IP_TYPE_IPV6ANY&mask:0xffffffffffffffff") + } - self.setup_db(dvs) - db = swsscommon.DBConnector(4, dvs.redis_sock, 0) - adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) - - # create acl rule - tbl = swsscommon.Table(db, "ACL_RULE") - fvs = swsscommon.FieldValuePairs([("priority", "1005"), ("PACKET_ACTION", "DROP"), ("DST_IPV6", "2002::2/128")]) - tbl.set("test-aclv6|test_rule5", fvs) - - time.sleep(1) - - test_acl_table_id = self.get_acl_table_id(dvs) - - # check acl table in asic db - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() - - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 1 - - (status, fvs) = atbl.get(acl_entry[0]) - assert status == True - assert len(fvs) == 6 - for fv in fvs: - if fv[0] == "SAI_ACL_ENTRY_ATTR_TABLE_ID": - assert fv[1] == test_acl_table_id - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ADMIN_STATE": - assert fv[1] == "true" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_PRIORITY": - assert fv[1] == "1005" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_COUNTER": - assert True - elif fv[0] == "SAI_ACL_ENTRY_ATTR_FIELD_DST_IPV6": - assert fv[1] == "2002::2&mask:ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION": - assert fv[1] == "SAI_PACKET_ACTION_DROP" - else: - assert False + self.create_acl_rule("test_aclv6", "acl_test_rule", config_qualifiers) + self.verify_acl_rule(expected_sai_qualifiers) - # remove acl rule - tbl._del("test-aclv6|test_rule5") + self.remove_acl_rule("test_aclv6", "acl_test_rule") + self.verify_no_acl_rules() - time.sleep(1) + def test_V6AclRuleIPv6AnyDrop(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) - (status, fvs) = atbl.get(acl_entry[0]) - assert status == False + config_qualifiers = {"IP_TYPE": "IPv6ANY"} + expected_sai_qualifiers = { + "SAI_ACL_ENTRY_ATTR_FIELD_ACL_IP_TYPE": self.get_simple_qualifier_comparator("SAI_ACL_IP_TYPE_IPV6ANY&mask:0xffffffffffffffff") + } - def test_V6AclRuleL4SrcPort(self, dvs, testlog): - """ - hmset ACL_RULE|test-aclv6|test_rule6 priority 1006 PACKET_ACTION DROP L4_SRC_PORT 65000 - """ + self.create_acl_rule("test_aclv6", "acl_test_rule", config_qualifiers, action="DROP") + self.verify_acl_rule(expected_sai_qualifiers, action="DROP") - self.setup_db(dvs) - db = swsscommon.DBConnector(4, dvs.redis_sock, 0) - adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) - - # create acl rule - tbl = swsscommon.Table(db, "ACL_RULE") - fvs = swsscommon.FieldValuePairs([("priority", "1006"), ("PACKET_ACTION", "DROP"), ("L4_SRC_PORT", "65000")]) - tbl.set("test-aclv6|test_rule6", fvs) - - time.sleep(1) - - test_acl_table_id = self.get_acl_table_id(dvs) - - # check acl table in asic db - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() - - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 1 - - (status, fvs) = atbl.get(acl_entry[0]) - assert status == True - assert len(fvs) == 6 - for fv in fvs: - if fv[0] == "SAI_ACL_ENTRY_ATTR_TABLE_ID": - assert fv[1] == test_acl_table_id - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ADMIN_STATE": - assert fv[1] == "true" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_PRIORITY": - assert fv[1] == "1006" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_COUNTER": - assert True - elif fv[0] == "SAI_ACL_ENTRY_ATTR_FIELD_L4_SRC_PORT": - assert fv[1] == "65000&mask:0xffff" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION": - assert fv[1] == "SAI_PACKET_ACTION_DROP" - else: - assert False + self.remove_acl_rule("test_aclv6", "acl_test_rule") + self.verify_no_acl_rules() - # remove acl rule - tbl._del("test-aclv6|test_rule6") + def test_V6AclRuleIpProtocol(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) - time.sleep(1) + config_qualifiers = {"IP_PROTOCOL": "6"} + expected_sai_qualifiers = {"SAI_ACL_ENTRY_ATTR_FIELD_IP_PROTOCOL": self.get_simple_qualifier_comparator("6&mask:0xff")} - (status, fvs) = atbl.get(acl_entry[0]) - assert status == False + self.create_acl_rule("test_aclv6", "acl_test_rule", config_qualifiers) + self.verify_acl_rule(expected_sai_qualifiers) - def test_V6AclRuleL4DstPort(self, dvs, testlog): - """ - hmset ACL_RULE|test-aclv6|test_rule7 priority 1007 PACKET_ACTION DROP L4_DST_PORT 65001 - """ + self.remove_acl_rule("test_aclv6", "acl_test_rule") + self.verify_no_acl_rules() - self.setup_db(dvs) - db = swsscommon.DBConnector(4, dvs.redis_sock, 0) - adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) - - # create acl rule - tbl = swsscommon.Table(db, "ACL_RULE") - fvs = swsscommon.FieldValuePairs([("priority", "1007"), ("PACKET_ACTION", "DROP"), ("L4_DST_PORT", "65001")]) - tbl.set("test-aclv6|test_rule7", fvs) - - time.sleep(1) - - test_acl_table_id = self.get_acl_table_id(dvs) - - # check acl table in asic db - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() - - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 1 - - (status, fvs) = atbl.get(acl_entry[0]) - assert status == True - assert len(fvs) == 6 - for fv in fvs: - if fv[0] == "SAI_ACL_ENTRY_ATTR_TABLE_ID": - assert fv[1] == test_acl_table_id - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ADMIN_STATE": - assert fv[1] == "true" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_PRIORITY": - assert fv[1] == "1007" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_COUNTER": - assert True - elif fv[0] == "SAI_ACL_ENTRY_ATTR_FIELD_L4_DST_PORT": - assert fv[1] == "65001&mask:0xffff" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION": - assert fv[1] == "SAI_PACKET_ACTION_DROP" - else: - assert False + def test_V6AclRuleSrcIPv6(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) - # remove acl rule - tbl._del("test-aclv6|test_rule7") + config_qualifiers = {"SRC_IPV6": "2777::0/64"} + expected_sai_qualifiers = { + "SAI_ACL_ENTRY_ATTR_FIELD_SRC_IPV6": self.get_simple_qualifier_comparator("2777::&mask:ffff:ffff:ffff:ffff::") + } - time.sleep(1) + self.create_acl_rule("test_aclv6", "acl_test_rule", config_qualifiers) + self.verify_acl_rule(expected_sai_qualifiers) - (status, fvs) = atbl.get(acl_entry[0]) - assert status == False + self.remove_acl_rule("test_aclv6", "acl_test_rule") + self.verify_no_acl_rules() - def test_V6AclRuleTCPFlags(self, dvs, testlog): - """ - hmset ACL_RULE|test-aclv6|test_rule8 priority 1008 PACKET_ACTION DROP TCP_FLAGS 0x7/0x3f - """ + def test_V6AclRuleDstIPv6(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) - self.setup_db(dvs) - db = swsscommon.DBConnector(4, dvs.redis_sock, 0) - adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) - - # create acl rule - tbl = swsscommon.Table(db, "ACL_RULE") - fvs = swsscommon.FieldValuePairs([("priority", "1008"), ("PACKET_ACTION", "DROP"), ("TCP_FLAGS", "0x07/0x3f")]) - tbl.set("test-aclv6|test_rule8", fvs) - - time.sleep(1) - - test_acl_table_id = self.get_acl_table_id(dvs) - - # check acl table in asic db - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() - - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 1 - - (status, fvs) = atbl.get(acl_entry[0]) - assert status == True - assert len(fvs) == 6 - for fv in fvs: - if fv[0] == "SAI_ACL_ENTRY_ATTR_TABLE_ID": - assert fv[1] == test_acl_table_id - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ADMIN_STATE": - assert fv[1] == "true" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_PRIORITY": - assert fv[1] == "1008" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_COUNTER": - assert True - elif fv[0] == "SAI_ACL_ENTRY_ATTR_FIELD_TCP_FLAGS": - assert fv[1] == "7&mask:0x3f" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION": - assert fv[1] == "SAI_PACKET_ACTION_DROP" - else: - assert False + config_qualifiers = {"DST_IPV6": "2002::2/128"} + expected_sai_qualifiers = { + "SAI_ACL_ENTRY_ATTR_FIELD_DST_IPV6": self.get_simple_qualifier_comparator("2002::2&mask:ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff") + } - # remove acl rule - tbl._del("test-aclv6|test_rule8") + self.create_acl_rule("test_aclv6", "acl_test_rule", config_qualifiers) + self.verify_acl_rule(expected_sai_qualifiers) - time.sleep(1) + self.remove_acl_rule("test_aclv6", "acl_test_rule") + self.verify_no_acl_rules() - (status, fvs) = atbl.get(acl_entry[0]) - assert status == False + def test_V6AclRuleL4SrcPort(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) - def test_V6AclRuleL4SrcPortRange(self, dvs, testlog): - """ - hmset ACL_RULE|test-aclv6|test_rule9 priority 1009 PACKET_ACTION DROP L4_SRC_PORT_RANGE 1-100 - """ + config_qualifiers = {"L4_SRC_PORT": "65000"} + expected_sai_qualifiers = {"SAI_ACL_ENTRY_ATTR_FIELD_L4_SRC_PORT": self.get_simple_qualifier_comparator("65000&mask:0xffff")} - self.setup_db(dvs) - db = swsscommon.DBConnector(4, dvs.redis_sock, 0) - adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) - - # create acl rule - tbl = swsscommon.Table(db, "ACL_RULE") - fvs = swsscommon.FieldValuePairs([("priority", "1009"), ("PACKET_ACTION", "DROP"), ("L4_SRC_PORT_RANGE", "1-100")]) - tbl.set("test-aclv6|test_rule9", fvs) - - time.sleep(1) - - test_acl_table_id = self.get_acl_table_id(dvs) - - # check acl table in asic db - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() - - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 1 - - (status, fvs) = atbl.get(acl_entry[0]) - assert status == True - assert len(fvs) == 6 - for fv in fvs: - if fv[0] == "SAI_ACL_ENTRY_ATTR_TABLE_ID": - assert fv[1] == test_acl_table_id - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ADMIN_STATE": - assert fv[1] == "true" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_PRIORITY": - assert fv[1] == "1009" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_COUNTER": - assert True - elif fv[0] == "SAI_ACL_ENTRY_ATTR_FIELD_ACL_RANGE_TYPE": - aclrange = fv[1] - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION": - assert fv[1] == "SAI_PACKET_ACTION_DROP" - else: - assert False + self.create_acl_rule("test_aclv6", "acl_test_rule", config_qualifiers) + self.verify_acl_rule(expected_sai_qualifiers) - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_RANGE") - aclrange_obj = aclrange.split(":", 1)[1] - - (status, fvs) = atbl.get(aclrange_obj) - assert status == True - assert len(fvs) == 2 - for fv in fvs: - if fv[0] == "SAI_ACL_RANGE_ATTR_TYPE": - assert fv[1] == "SAI_ACL_RANGE_TYPE_L4_SRC_PORT_RANGE" - elif fv[0] == "SAI_ACL_RANGE_ATTR_LIMIT": - assert fv[1] == "1,100" - else: - assert False + self.remove_acl_rule("test_aclv6", "acl_test_rule") + self.verify_no_acl_rules() - # remove acl rule - tbl._del("test-aclv6|test_rule9") + def test_V6AclRuleL4DstPort(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) - time.sleep(1) + config_qualifiers = {"L4_DST_PORT": "65001"} + expected_sai_qualifiers = {"SAI_ACL_ENTRY_ATTR_FIELD_L4_DST_PORT": self.get_simple_qualifier_comparator("65001&mask:0xffff")} - (status, fvs) = atbl.get(acl_entry[0]) - assert status == False + self.create_acl_rule("test_aclv6", "acl_test_rule", config_qualifiers) + self.verify_acl_rule(expected_sai_qualifiers) - def test_V6AclRuleL4DstPortRange(self, dvs, testlog): - """ - hmset ACL_RULE|test-aclv6|test_rule10 priority 1010 PACKET_ACTION DROP L4_DST_PORT_RANGE 101-200 - """ + self.remove_acl_rule("test_aclv6", "acl_test_rule") + self.verify_no_acl_rules() - self.setup_db(dvs) - db = swsscommon.DBConnector(4, dvs.redis_sock, 0) - adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) - - # create acl rule - tbl = swsscommon.Table(db, "ACL_RULE") - fvs = swsscommon.FieldValuePairs([("priority", "1010"), ("PACKET_ACTION", "DROP"), ("L4_DST_PORT_RANGE", "101-200")]) - tbl.set("test-aclv6|test_rule10", fvs) - - time.sleep(1) - - test_acl_table_id = self.get_acl_table_id(dvs) - - # check acl table in asic db - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() - - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 1 - - (status, fvs) = atbl.get(acl_entry[0]) - assert status == True - assert len(fvs) == 6 - for fv in fvs: - if fv[0] == "SAI_ACL_ENTRY_ATTR_TABLE_ID": - assert fv[1] == test_acl_table_id - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ADMIN_STATE": - assert fv[1] == "true" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_PRIORITY": - assert fv[1] == "1010" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_COUNTER": - assert True - elif fv[0] == "SAI_ACL_ENTRY_ATTR_FIELD_ACL_RANGE_TYPE": - aclrange = fv[1] - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION": - assert fv[1] == "SAI_PACKET_ACTION_DROP" - else: - assert False + def test_V6AclRuleTCPFlags(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_RANGE") - aclrange_obj = aclrange.split(":", 1)[1] - - (status, fvs) = atbl.get(aclrange_obj) - assert status == True - assert len(fvs) == 2 - for fv in fvs: - if fv[0] == "SAI_ACL_RANGE_ATTR_TYPE": - assert fv[1] == "SAI_ACL_RANGE_TYPE_L4_DST_PORT_RANGE" - elif fv[0] == "SAI_ACL_RANGE_ATTR_LIMIT": - assert fv[1] == "101,200" - else: - assert False + config_qualifiers = {"TCP_FLAGS": "0x07/0x3f"} + expected_sai_qualifiers = {"SAI_ACL_ENTRY_ATTR_FIELD_TCP_FLAGS": self.get_simple_qualifier_comparator("7&mask:0x3f")} - # remove acl rule - tbl._del("test-aclv6|test_rule10") + self.create_acl_rule("test_aclv6", "acl_test_rule", config_qualifiers) + self.verify_acl_rule(expected_sai_qualifiers) - time.sleep(1) + self.remove_acl_rule("test_aclv6", "acl_test_rule") + self.verify_no_acl_rules() - (status, fvs) = atbl.get(acl_entry[0]) - assert status == False + def test_V6AclRuleL4SrcPortRange(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) - def test_V6AclTableDeletion(self, dvs, testlog): + config_qualifiers = {"L4_SRC_PORT_RANGE": "1-100"} + expected_sai_qualifiers = { + "SAI_ACL_ENTRY_ATTR_FIELD_ACL_RANGE_TYPE": self.get_acl_range_comparator("SAI_ACL_RANGE_TYPE_L4_SRC_PORT_RANGE", "1,100") + } - self.setup_db(dvs) - db = swsscommon.DBConnector(4, dvs.redis_sock, 0) - adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) + self.create_acl_rule("test_aclv6", "acl_test_rule", config_qualifiers) + self.verify_acl_rule(expected_sai_qualifiers) - # get ACL_TABLE in config db - tbl = swsscommon.Table(db, "ACL_TABLE") - tbl._del("test-aclv6") + self.remove_acl_rule("test_aclv6", "acl_test_rule") + self.verify_no_acl_rules() - time.sleep(1) + def test_V6AclRuleL4DstPortRange(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE") - keys = atbl.getKeys() - # only the default table was left - assert len(keys) >= 1 + config_qualifiers = {"L4_DST_PORT_RANGE": "101-200"} + expected_sai_qualifiers = { + "SAI_ACL_ENTRY_ATTR_FIELD_ACL_RANGE_TYPE": self.get_acl_range_comparator("SAI_ACL_RANGE_TYPE_L4_DST_PORT_RANGE", "101,200") + } - def test_InsertAclRuleBetweenPriorities(self, dvs, testlog): - self.setup_db(dvs) - db = swsscommon.DBConnector(4, dvs.redis_sock, 0) - adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) + self.create_acl_rule("test_aclv6", "acl_test_rule", config_qualifiers) + self.verify_acl_rule(expected_sai_qualifiers) - bind_ports = ["Ethernet0", "Ethernet4"] - # create ACL_TABLE in config db - tbl = swsscommon.Table(db, "ACL_TABLE") - fvs = swsscommon.FieldValuePairs([("policy_desc", "test"), ("type", "L3"), ("ports", ",".join(bind_ports))]) - tbl.set("test_insert", fvs) - - time.sleep(2) - - num_rules = 0 - #create ACL rules - tbl = swsscommon.Table(db, "ACL_RULE") - rules = [ [("PRIORITY", "10"), ("PACKET_ACTION", "DROP"), ("SRC_IP", "10.0.0.0/32")], - [("PRIORITY", "20"), ("PACKET_ACTION", "DROP"), ("DST_IP", "104.44.94.0/23")], - [("PRIORITY", "30"), ("PACKET_ACTION", "DROP"), ("DST_IP", "192.168.0.16/32")], - [("PRIORITY", "40"), ("PACKET_ACTION", "FORWARD"), ("DST_IP", "100.64.0.0/10")] ] - #used to verify how ACL rules are programmed in ASICDB - verifs = [ {'SAI_ACL_ENTRY_ATTR_PRIORITY': '10', - 'SAI_ACL_ENTRY_ATTR_FIELD_SRC_IP': '10.0.0.0&mask:255.255.255.255', - 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_DROP'}, - {'SAI_ACL_ENTRY_ATTR_PRIORITY': '20', - 'SAI_ACL_ENTRY_ATTR_FIELD_DST_IP': '104.44.94.0&mask:255.255.254.0', - 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_DROP'}, - {'SAI_ACL_ENTRY_ATTR_PRIORITY': '30', - 'SAI_ACL_ENTRY_ATTR_FIELD_DST_IP': '192.168.0.16&mask:255.255.255.255', - 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_DROP'}, - {'SAI_ACL_ENTRY_ATTR_PRIORITY': '40', - 'SAI_ACL_ENTRY_ATTR_FIELD_DST_IP': '100.64.0.0&mask:255.192.0.0', - 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_FORWARD'} ] - #insert rules - for rule in rules: - fvs = swsscommon.FieldValuePairs(rule) - num_rules += 1 - tbl.set( "test_insert|acl_test_rule%s" % num_rules, fvs) - - time.sleep(1) - - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() - - #assert that first set of rules are programmed - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == num_rules - - #insert new rule with odd priority - tbl = swsscommon.Table(db, "ACL_RULE") - insertrule = [("PRIORITY", "21"), ("PACKET_ACTION", "DROP"), ("ETHER_TYPE", "4660")] - #create verification for that rule - verifs.append({'SAI_ACL_ENTRY_ATTR_PRIORITY': '21', - 'SAI_ACL_ENTRY_ATTR_FIELD_ETHER_TYPE': '4660&mask:0xffff', - 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_DROP'}) - rules.append(insertrule) - fvs = swsscommon.FieldValuePairs(insertrule) - num_rules += 1 - tbl.set("test_insert|acl_test_rule%s" % num_rules, fvs) - - time.sleep(1) - - #assert all rules are programmed - keys = atbl.getKeys() - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == num_rules - - #match each entry to its corresponding verification - matched_rules = 0 - for entry in acl_entry: - (status, fvs) = atbl.get(entry) - assert status == True - assert len(fvs) == 6 - #helper function - if self.check_rule_existence(dict(fvs), rules, verifs): - matched_rules += 1 - - assert num_rules == matched_rules - - #cleanup - while num_rules > 0: - tbl._del("test_insert|acl_test_rule%s" % num_rules) - num_rules -= 1 - - time.sleep(1) - - (status, fvs) = atbl.get(acl_entry[0]) - assert status == False - - tbl = swsscommon.Table(db, "ACL_TABLE") - tbl._del("test_insert") - - time.sleep(1) - - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE") - keys = atbl.getKeys() - # only the default table was left - assert len(keys) >= 1 - - def test_RulesWithDiffMaskLengths(self, dvs, testlog): - self.setup_db(dvs) - db = swsscommon.DBConnector(4, dvs.redis_sock, 0) - adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) + self.remove_acl_rule("test_aclv6", "acl_test_rule") + self.verify_no_acl_rules() + + def test_V6AclTableDeletion(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) + + self.remove_acl_table("test_aclv6") + self.verify_no_acl_tables() + + def test_InsertAclRuleBetweenPriorities(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) bind_ports = ["Ethernet0", "Ethernet4"] - # create ACL_TABLE in config db - tbl = swsscommon.Table(db, "ACL_TABLE") - fvs = swsscommon.FieldValuePairs([("policy_desc", "test"), ("type", "L3"), ("ports", ",".join(bind_ports))]) - tbl.set("test_subnet", fvs) - - time.sleep(2) + self.create_acl_table("test_priorities", "L3", bind_ports) - subnet_mask_rules = 0 - #create ACL rules - tbl = swsscommon.Table(db, "ACL_RULE") - rules = [ [("PRIORITY", "10"), ("PACKET_ACTION", "FORWARD"), ("SRC_IP", "23.103.0.0/18")], - [("PRIORITY", "20"), ("PACKET_ACTION", "FORWARD"), ("SRC_IP", "104.44.94.0/23")], - [("PRIORITY", "30"), ("PACKET_ACTION", "FORWARD"), ("DST_IP", "172.16.0.0/12")], - [("PRIORITY", "40"), ("PACKET_ACTION", "FORWARD"), ("DST_IP", "100.64.0.0/10")], - [("PRIORITY", "50"), ("PACKET_ACTION", "FORWARD"), ("DST_IP", "104.146.32.0/19")], - [("PRIORITY", "60"), ("PACKET_ACTION", "FORWARD"), ("SRC_IP", "21.0.0.0/8")] ] - #used to verify how ACL rules are programmed in ASICDB - #order must match the list of rules - verifs = [ {'SAI_ACL_ENTRY_ATTR_PRIORITY': '10', - 'SAI_ACL_ENTRY_ATTR_FIELD_SRC_IP': '23.103.0.0&mask:255.255.192.0', - 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_FORWARD'}, - {'SAI_ACL_ENTRY_ATTR_PRIORITY': '20', - 'SAI_ACL_ENTRY_ATTR_FIELD_SRC_IP': '104.44.94.0&mask:255.255.254.0', - 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_FORWARD'}, - {'SAI_ACL_ENTRY_ATTR_PRIORITY': '30', - 'SAI_ACL_ENTRY_ATTR_FIELD_DST_IP': '172.16.0.0&mask:255.240.0.0', - 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_FORWARD'}, - {'SAI_ACL_ENTRY_ATTR_PRIORITY': '40', - 'SAI_ACL_ENTRY_ATTR_FIELD_DST_IP': '100.64.0.0&mask:255.192.0.0', - 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_FORWARD'}, - {'SAI_ACL_ENTRY_ATTR_PRIORITY': '50', - 'SAI_ACL_ENTRY_ATTR_FIELD_DST_IP': '104.146.32.0&mask:255.255.224.0', - 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_FORWARD'}, - {'SAI_ACL_ENTRY_ATTR_PRIORITY': '60', - 'SAI_ACL_ENTRY_ATTR_FIELD_SRC_IP': '21.0.0.0&mask:255.0.0.0', - 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_FORWARD'} ] - #insert rules - for rule in rules: - fvs = swsscommon.FieldValuePairs(rule) - subnet_mask_rules += 1 - tbl.set( "test_subnet|acl_test_rule%s" % subnet_mask_rules, fvs ) + rule_priorities = ["10", "20", "30", "40"] - time.sleep(1) + config_qualifiers = { + "10": {"SRC_IP": "10.0.0.0/32"}, + "20": {"DST_IP": "104.44.94.0/23"}, + "30": {"DST_IP": "192.168.0.16/32"}, + "40": {"DST_IP": "100.64.0.0/10"}, + } - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() + config_actions = { + "10": "DROP", + "20": "DROP", + "30": "DROP", + "40": "FORWARD", + } - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == subnet_mask_rules + expected_sai_qualifiers = { + "10": {"SAI_ACL_ENTRY_ATTR_FIELD_SRC_IP": self.get_simple_qualifier_comparator("10.0.0.0&mask:255.255.255.255")}, + "20": {"SAI_ACL_ENTRY_ATTR_FIELD_DST_IP": self.get_simple_qualifier_comparator("104.44.94.0&mask:255.255.254.0")}, + "30": {"SAI_ACL_ENTRY_ATTR_FIELD_DST_IP": self.get_simple_qualifier_comparator("192.168.0.16&mask:255.255.255.255")}, + "40": {"SAI_ACL_ENTRY_ATTR_FIELD_DST_IP": self.get_simple_qualifier_comparator("100.64.0.0&mask:255.192.0.0")}, + } - #match each entry to its corresponding verification - matched_masks = 0 - for entry in acl_entry: - (status, fvs) = atbl.get(entry) - assert status == True - assert len(fvs) == 6 - #helper function - if self.check_rule_existence(dict(fvs), rules, verifs): - matched_masks += 1 + for rule in rule_priorities: + self.create_acl_rule("test_priorities", "acl_test_rule_{}".format(rule), + config_qualifiers[rule], action=config_actions[rule], + priority=rule) - assert matched_masks == subnet_mask_rules + self.verify_acl_rule_set(rule_priorities, config_actions, expected_sai_qualifiers) - while subnet_mask_rules > 0: - tbl._del("test_subnet|acl_test_rule%s" % subnet_mask_rules) - subnet_mask_rules -= 1 + odd_priority = "21" + odd_rule = {"ETHER_TYPE": "4660"} + odd_sai_qualifier = {"SAI_ACL_ENTRY_ATTR_FIELD_ETHER_TYPE": self.get_simple_qualifier_comparator("4660&mask:0xffff")} - time.sleep(1) + rule_priorities.append(odd_priority) + config_actions[odd_priority] = "DROP" + expected_sai_qualifiers[odd_priority] = odd_sai_qualifier - (status, fvs) = atbl.get(acl_entry[0]) - assert status == False + self.create_acl_rule("test_priorities", "acl_test_rule_{}".format(odd_priority), + odd_rule, action="DROP", priority=odd_priority) + self.verify_acl_rule_set(rule_priorities, config_actions, expected_sai_qualifiers) - tbl = swsscommon.Table(db, "ACL_TABLE") - tbl._del("test_subnet") + for rule in rule_priorities: + self.remove_acl_rule("test_priorities", "acl_test_rule_{}".format(rule)) + self.verify_no_acl_rules() - time.sleep(1) + self.remove_acl_table("test_priorities") + self.verify_no_acl_tables() - atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE") - keys = atbl.getKeys() - assert len(keys) >= 1 + def test_RulesWithDiffMaskLengths(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) + bind_ports = ["Ethernet0", "Ethernet4"] + self.create_acl_table("test_masks", "L3", bind_ports) - def test_AclRuleIcmp(self, dvs, testlog): - self.setup_db(dvs) + rule_priorities = ["10", "20", "30", "40", "50", "60"] - acl_table = "TEST_TABLE" - acl_rule = "TEST_RULE" + config_qualifiers = { + "10": {"SRC_IP": "23.103.0.0/18"}, + "20": {"SRC_IP": "104.44.94.0/23"}, + "30": {"DST_IP": "172.16.0.0/12"}, + "40": {"DST_IP": "100.64.0.0/10"}, + "50": {"DST_IP": "104.146.32.0/19"}, + "60": {"SRC_IP": "21.0.0.0/8"}, + } - self.create_acl_table(acl_table, "L3", ["Ethernet0", "Ethernet4"]) + config_actions = { + "10": "FORWARD", + "20": "FORWARD", + "30": "FORWARD", + "40": "FORWARD", + "50": "FORWARD", + "60": "FORWARD", + } - self.create_acl_rule(acl_table, acl_rule, "ICMP_TYPE", "8") + expected_sai_qualifiers = { + "10": {"SAI_ACL_ENTRY_ATTR_FIELD_SRC_IP": self.get_simple_qualifier_comparator("23.103.0.0&mask:255.255.192.0")}, + "20": {"SAI_ACL_ENTRY_ATTR_FIELD_SRC_IP": self.get_simple_qualifier_comparator("104.44.94.0&mask:255.255.254.0")}, + "30": {"SAI_ACL_ENTRY_ATTR_FIELD_DST_IP": self.get_simple_qualifier_comparator("172.16.0.0&mask:255.240.0.0")}, + "40": {"SAI_ACL_ENTRY_ATTR_FIELD_DST_IP": self.get_simple_qualifier_comparator("100.64.0.0&mask:255.192.0.0")}, + "50": {"SAI_ACL_ENTRY_ATTR_FIELD_DST_IP": self.get_simple_qualifier_comparator("104.146.32.0&mask:255.255.224.0")}, + "60": {"SAI_ACL_ENTRY_ATTR_FIELD_SRC_IP": self.get_simple_qualifier_comparator("21.0.0.0&mask:255.0.0.0")}, + } - self.verify_acl_rule(dvs, "SAI_ACL_ENTRY_ATTR_FIELD_ICMP_TYPE", "8&mask:0xff") + for rule in rule_priorities: + self.create_acl_rule("test_masks", "acl_test_rule_{}".format(rule), + config_qualifiers[rule], action=config_actions[rule], + priority=rule) + self.verify_acl_rule_set(rule_priorities, config_actions, expected_sai_qualifiers) - self.remove_acl_rule(acl_table, acl_rule) + for rule in rule_priorities: + self.remove_acl_rule("test_masks", "acl_test_rule_{}".format(rule)) + self.verify_no_acl_rules() - self.create_acl_rule(acl_table, acl_rule, "ICMP_CODE", "9") + self.remove_acl_table("test_masks") + self.verify_no_acl_tables() - self.verify_acl_rule(dvs, "SAI_ACL_ENTRY_ATTR_FIELD_ICMP_CODE", "9&mask:0xff") + def test_AclRuleIcmp(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) - self.remove_acl_rule(acl_table, acl_rule) + bind_ports = ["Ethernet0", "Ethernet4"] + self.create_acl_table("test_icmp", "L3", bind_ports) - self.remove_acl_table(acl_table) + config_qualifiers = { + "ICMP_TYPE": "8", + "ICMP_CODE": "9" + } - def test_AclRuleIcmpV6(self, dvs, testlog): - self.setup_db(dvs) + expected_sai_qualifiers = { + "SAI_ACL_ENTRY_ATTR_FIELD_ICMP_TYPE": self.get_simple_qualifier_comparator("8&mask:0xff"), + "SAI_ACL_ENTRY_ATTR_FIELD_ICMP_CODE": self.get_simple_qualifier_comparator("9&mask:0xff") + } + + self.create_acl_rule("test_icmp", "test_icmp_fields", config_qualifiers) + self.verify_acl_rule(expected_sai_qualifiers) - acl_table = "TEST_TABLE" - acl_rule = "TEST_RULE" + self.remove_acl_rule("test_icmp", "test_icmp_fields") + self.verify_no_acl_rules() - self.create_acl_table(acl_table, "L3V6", ["Ethernet0", "Ethernet4"]) + self.remove_acl_table("test_icmp") + self.verify_no_acl_tables() - self.create_acl_rule(acl_table, acl_rule, "ICMPV6_TYPE", "8") + def test_AclRuleIcmpV6(self, asic_db, config_db, state_db): + self.setup_db(asic_db, config_db, state_db) - self.verify_acl_rule(dvs, "SAI_ACL_ENTRY_ATTR_FIELD_ICMPV6_TYPE", "8&mask:0xff") + bind_ports = ["Ethernet0", "Ethernet4"] + self.create_acl_table("test_icmpv6", "L3V6", bind_ports) - self.remove_acl_rule(acl_table, acl_rule) + config_qualifiers = { + "ICMPV6_TYPE": "8", + "ICMPV6_CODE": "9" + } - self.create_acl_rule(acl_table, acl_rule, "ICMPV6_CODE", "9") + expected_sai_qualifiers = { + "SAI_ACL_ENTRY_ATTR_FIELD_ICMPV6_TYPE": self.get_simple_qualifier_comparator("8&mask:0xff"), + "SAI_ACL_ENTRY_ATTR_FIELD_ICMPV6_CODE": self.get_simple_qualifier_comparator("9&mask:0xff") + } - self.verify_acl_rule(dvs, "SAI_ACL_ENTRY_ATTR_FIELD_ICMPV6_CODE", "9&mask:0xff") + self.create_acl_rule("test_icmpv6", "test_icmpv6_fields", config_qualifiers) + self.verify_acl_rule(expected_sai_qualifiers) - self.remove_acl_rule(acl_table, acl_rule) + self.remove_acl_rule("test_icmpv6", "test_icmpv6_fields") + self.verify_no_acl_rules() - self.remove_acl_table(acl_table) + self.remove_acl_table("test_icmpv6") + self.verify_no_acl_tables() - def test_AclRuleRedirectToNexthop(self, dvs, testlog): + def test_AclRuleRedirectToNextHop(self, dvs, asic_db, config_db, state_db): dvs.setup_db() - self.setup_db(dvs) + self.setup_db(asic_db, config_db, state_db) - # bring up interface + # Bring up an IP interface with a neighbor dvs.set_interface_status("Ethernet4", "up") - - # assign IP to interface dvs.add_ip_address("Ethernet4", "10.0.0.1/24") - - # add neighbor dvs.add_neighbor("Ethernet4", "10.0.0.2", "00:01:02:03:04:05") - atbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_NEXT_HOP") - keys = atbl.getKeys() - assert len(keys) == 1 - nhi_oid = keys[0] - - # create ACL_TABLE in config db - tbl = swsscommon.Table(self.cdb, "ACL_TABLE") - fvs = swsscommon.FieldValuePairs([("policy_desc", "test"), ("type", "L3"), ("ports", "Ethernet0")]) - tbl.set("test_redirect", fvs) - - time.sleep(2) - - # create acl rule - tbl = swsscommon.Table(self.cdb, "ACL_RULE") - fvs = swsscommon.FieldValuePairs([ - ("priority", "100"), - ("L4_SRC_PORT", "65000"), - ("PACKET_ACTION", "REDIRECT:10.0.0.2@Ethernet4")]) - tbl.set("test_redirect|test_rule1", fvs) - - time.sleep(1) - - test_acl_table_id = self.get_acl_table_id(dvs) - - # check acl table in asic db - atbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() - - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 1 - - (status, fvs) = atbl.get(acl_entry[0]) - assert status == True - assert len(fvs) == 6 - for fv in fvs: - if fv[0] == "SAI_ACL_ENTRY_ATTR_TABLE_ID": - assert fv[1] == test_acl_table_id - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ADMIN_STATE": - assert fv[1] == "true" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_PRIORITY": - assert fv[1] == "100" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_COUNTER": - assert True - elif fv[0] == "SAI_ACL_ENTRY_ATTR_FIELD_L4_SRC_PORT": - assert fv[1] == "65000&mask:0xffff" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_REDIRECT": - assert fv[1] == nhi_oid - else: - assert False + next_hop_id = self.asic_db.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_NEXT_HOP", 1)[0] - # remove acl rule - tbl._del("test_redirect|test_rule1") + bind_ports = ["Ethernet0"] + self.create_acl_table("test_redirect", "L3", bind_ports) - time.sleep(1) + config_qualifiers = {"L4_SRC_PORT": "65000"} + expected_sai_qualifiers = {"SAI_ACL_ENTRY_ATTR_FIELD_L4_SRC_PORT": self.get_simple_qualifier_comparator("65000&mask:0xffff")} - (status, fvs) = atbl.get(acl_entry[0]) - assert status == False + self.create_acl_rule("test_redirect", "redirect_rule", config_qualifiers, action="REDIRECT:10.0.0.2@Ethernet4", priority="20") - tbl = swsscommon.Table(self.cdb, "ACL_TABLE") - tbl._del("test_redirect") + acl_rule_id = self.get_acl_rule_id() + entry = self.asic_db.wait_for_entry("ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY", acl_rule_id) + self._check_acl_entry(entry, expected_sai_qualifiers, "REDIRECT:10.0.0.2@Ethernet4", "20") + assert entry.get("SAI_ACL_ENTRY_ATTR_ACTION_REDIRECT", None) == next_hop_id - time.sleep(1) + self.remove_acl_rule("test_redirect", "redirect_rule") + self.verify_no_acl_rules() - atbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE") - keys = atbl.getKeys() - assert len(keys) >= 1 + self.remove_acl_table("test_redirect") + self.verify_no_acl_tables() - # remove neighbor + # Clean up the IP interface and neighbor dvs.remove_neighbor("Ethernet4", "10.0.0.2") - - # remove interface ip dvs.remove_ip_address("Ethernet4", "10.0.0.1/24") - - # bring down interface dvs.set_interface_status("Ethernet4", "down") class TestAclRuleValidation(BaseTestAcl): - """ Test class for cases that check if orchagent corectly validates - ACL rules input + """ + Test class for cases that check if orchagent corectly validates + ACL rules input """ SWITCH_CAPABILITY_TABLE = "SWITCH_CAPABILITY" def get_acl_actions_supported(self, stage): - capability_table = swsscommon.Table(self.sdb, self.SWITCH_CAPABILITY_TABLE) - keys = capability_table.getKeys() - # one switch available - assert len(keys) == 1 - status, fvs = capability_table.get(keys[0]) - assert status == True + switch_id = self.state_db.wait_for_n_keys(self.SWITCH_CAPABILITY_TABLE, 1)[0] + switch = self.state_db.wait_for_entry(self.SWITCH_CAPABILITY_TABLE, switch_id) field = "ACL_ACTIONS|{}".format(stage.upper()) - fvs = dict(fvs) - values_list = fvs.get(field, None) + supported_actions = switch.get(field, None) - if values_list is not None: - values_list = values_list.split(",") + if supported_actions: + supported_actions = supported_actions.split(",") + else: + supported_actions = [] - return values_list + return supported_actions - def test_AclActionValidation(self, dvs, testlog): - """ The test overrides R/O SAI_SWITCH_ATTR_ACL_STAGE_INGRESS/EGRESS switch attributes - to check the case when orchagent refuses to process rules with action that is not supported - by the ASIC + def test_AclActionValidation(self, dvs, asic_db, config_db, state_db): + """ + The test overrides R/O SAI_SWITCH_ATTR_ACL_STAGE_INGRESS/EGRESS switch attributes + to check the case when orchagent refuses to process rules with action that is not + supported by the ASIC. """ - self.setup_db(dvs) + self.setup_db(asic_db, config_db, state_db) stage_name_map = { "ingress": "SAI_SWITCH_ATTR_ACL_STAGE_INGRESS", @@ -1414,7 +637,7 @@ def test_AclActionValidation(self, dvs, testlog): action_values = self.get_acl_actions_supported(stage) # virtual switch supports all actions - assert action_values is not None + assert action_values assert "PACKET_ACTION" in action_values sai_acl_stage = stage_name_map[stage] @@ -1429,8 +652,6 @@ def test_AclActionValidation(self, dvs, testlog): # restart SWSS so orchagent will query updated switch attributes dvs.stop_swss() dvs.start_swss() - # wait for daemons to start - time.sleep(2) # reinit ASIC DB validator object dvs.init_asicdb_validator() @@ -1448,20 +669,17 @@ def test_AclActionValidation(self, dvs, testlog): bind_ports = ["Ethernet0", "Ethernet4"] self.create_acl_table(acl_table, "L3", bind_ports, stage=stage) - self.create_acl_rule(acl_table, acl_rule, "ICMP_TYPE", "8") - atbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = atbl.getKeys() + config_qualifiers = { + "ICMP_TYPE": "8" + } - # verify there are no non-default ACL rules created - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 0 + self.create_acl_rule(acl_table, acl_rule, config_qualifiers) + self.verify_no_acl_rules() + self.remove_acl_rule(acl_table, acl_rule) self.remove_acl_table(acl_table) - # remove rules from CFG DB - self.remove_acl_rule(acl_table, acl_rule) dvs.runcmd("supervisorctl restart syncd") dvs.stop_swss() dvs.start_swss() - time.sleep(5)