From 4c7f8889e982b825943c8795dcfd98ab7902f944 Mon Sep 17 00:00:00 2001 From: junchao Date: Fri, 17 Sep 2021 10:49:06 +0800 Subject: [PATCH 1/2] Use select in xcvrd tasks --- sonic-xcvrd/tests/test_xcvrd.py | 138 +++++------ sonic-xcvrd/xcvrd/xcvrd.py | 218 +++++------------- .../xcvrd/xcvrd_utilities/port_mapping.py | 89 +++++++ .../xcvrd/xcvrd_utilities/y_cable_helper.py | 11 +- 4 files changed, 220 insertions(+), 236 deletions(-) diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index 4035bffb2..75691887d 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -32,6 +32,8 @@ from xcvrd.xcvrd import * from xcvrd.xcvrd_utilities.y_cable_helper import * from xcvrd.xcvrd_utilities.sfp_status_helper import * +from xcvrd.xcvrd_utilities.port_mapping import * +#from xcvrd.xcvrd_utilities.port_mapping import PortMapping, subscribe_port_config_change, handle_port_config_change class TestXcvrdScript(object): @@ -409,62 +411,61 @@ def test_is_error_sfp_status(self): @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) @patch('swsscommon.swsscommon.SubscriberStateTable') @patch('swsscommon.swsscommon.Select.select') - def test_DaemonXcvrd_wait_for_port_config_done(self, mock_select, mock_sub_table): - mock_selectable = MagicMock() - mock_selectable.pop = MagicMock(side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('PortConfigDone', None, None)]) - mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) - mock_sub_table.return_value = mock_selectable - xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) - xcvrd.wait_for_port_config_done('') - assert swsscommon.Select.select.call_count == 2 - - @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) - @patch('swsscommon.swsscommon.SubscriberStateTable') - @patch('swsscommon.swsscommon.Select.select') - def test_DaemonXcvrd_handle_port_config_change(self, mock_select, mock_sub_table): + def test_handle_port_config_change(self, mock_select, mock_sub_table): mock_selectable = MagicMock() mock_selectable.pop = MagicMock(side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (None, None, None)]) mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) mock_sub_table.return_value = mock_selectable - xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) - xcvrd.stop_event.is_set = MagicMock(side_effect=[False, True]) - mock_observer = MagicMock() - mock_observer.notify_port_change_event = MagicMock() - xcvrd.subscribe_port_change_event(mock_observer) - - xcvrd.handle_port_config_change() - assert mock_observer.notify_port_change_event.call_count == 1 - assert xcvrd.port_mapping.logical_port_list.count('Ethernet0') - assert xcvrd.port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0 - assert xcvrd.port_mapping.get_physical_to_logical(1) == ['Ethernet0'] - assert xcvrd.port_mapping.get_logical_to_physical('Ethernet0') == [1] - - xcvrd.stop_event.is_set = MagicMock(side_effect=[False, True]) + + sel, asic_context = subscribe_port_config_change() + port_mapping = PortMapping() + stop_event = threading.Event() + stop_event.is_set = MagicMock(return_value=False) + logger = MagicMock() + handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_mapping.handle_port_change_event) + + assert port_mapping.logical_port_list.count('Ethernet0') + assert port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0 + assert port_mapping.get_physical_to_logical(1) == ['Ethernet0'] + assert port_mapping.get_logical_to_physical('Ethernet0') == [1] + mock_selectable.pop = MagicMock(side_effect=[('Ethernet0', swsscommon.DEL_COMMAND, (('index', '1'), )), (None, None, None)]) - xcvrd.handle_port_config_change() - assert mock_observer.notify_port_change_event.call_count == 2 - assert not xcvrd.port_mapping.logical_port_list - assert not xcvrd.port_mapping.logical_to_physical - assert not xcvrd.port_mapping.physical_to_logical - assert not xcvrd.port_mapping.logical_to_asic + handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_mapping.handle_port_change_event) + assert not port_mapping.logical_port_list + assert not port_mapping.logical_to_physical + assert not port_mapping.physical_to_logical + assert not port_mapping.logical_to_asic + @patch('swsscommon.swsscommon.Table') - def test_DaemonXcvrd_init_port_mapping(self, mock_swsscommon_table): + def test_get_port_mapping(self, mock_swsscommon_table): mock_table = MagicMock() mock_table.getKeys = MagicMock(return_value=['Ethernet0', 'Ethernet4']) mock_table.get = MagicMock(side_effect=[(True, (('index', 1), )), (True, (('index', 2), ))]) mock_swsscommon_table.return_value = mock_table - xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) - xcvrd.init_port_mapping() - assert xcvrd.port_mapping.logical_port_list.count('Ethernet0') - assert xcvrd.port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0 - assert xcvrd.port_mapping.get_physical_to_logical(1) == ['Ethernet0'] - assert xcvrd.port_mapping.get_logical_to_physical('Ethernet0') == [1] + port_mapping = get_port_mapping() + assert port_mapping.logical_port_list.count('Ethernet0') + assert port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0 + assert port_mapping.get_physical_to_logical(1) == ['Ethernet0'] + assert port_mapping.get_logical_to_physical('Ethernet0') == [1] - assert xcvrd.port_mapping.logical_port_list.count('Ethernet4') - assert xcvrd.port_mapping.get_asic_id_for_logical_port('Ethernet4') == 0 - assert xcvrd.port_mapping.get_physical_to_logical(2) == ['Ethernet4'] - assert xcvrd.port_mapping.get_logical_to_physical('Ethernet4') == [2] + assert port_mapping.logical_port_list.count('Ethernet4') + assert port_mapping.get_asic_id_for_logical_port('Ethernet4') == 0 + assert port_mapping.get_physical_to_logical(2) == ['Ethernet4'] + assert port_mapping.get_logical_to_physical('Ethernet4') == [2] + + + @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) + @patch('swsscommon.swsscommon.SubscriberStateTable') + @patch('swsscommon.swsscommon.Select.select') + def test_DaemonXcvrd_wait_for_port_config_done(self, mock_select, mock_sub_table): + mock_selectable = MagicMock() + mock_selectable.pop = MagicMock(side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('PortConfigDone', None, None)]) + mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) + mock_sub_table.return_value = mock_selectable + xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) + xcvrd.wait_for_port_config_done('') + assert swsscommon.Select.select.call_count == 2 @patch('xcvrd.xcvrd.DaemonXcvrd.load_platform_util', MagicMock()) @patch('sonic_py_common.device_info.get_paths_to_platform_and_hwsku_dirs', MagicMock(return_value=('/tmp', None))) @@ -479,20 +480,19 @@ def test_DaemonXcvrd_init_deinit(self): @patch('xcvrd.xcvrd.DaemonXcvrd.init') @patch('xcvrd.xcvrd.DaemonXcvrd.deinit') - @patch('xcvrd.xcvrd.DaemonXcvrd.handle_port_config_change') @patch('xcvrd.xcvrd.DomInfoUpdateTask.task_run') @patch('xcvrd.xcvrd.SfpStateUpdateTask.task_run') @patch('xcvrd.xcvrd.DomInfoUpdateTask.task_stop') @patch('xcvrd.xcvrd.SfpStateUpdateTask.task_stop') - def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, mock_task_run2, mock_handle_port_config_change, mock_deinit, mock_init): + def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, mock_task_run2, mock_deinit, mock_init): xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) + xcvrd.stop_event.wait = MagicMock() xcvrd.run() # TODO: more check assert mock_task_stop1.call_count == 1 assert mock_task_stop2.call_count == 1 assert mock_task_run1.call_count == 1 assert mock_task_run2.call_count == 1 - assert mock_handle_port_config_change.call_count == 1 assert mock_deinit.call_count == 1 assert mock_init.call_count == 1 @@ -501,21 +501,21 @@ def test_DomInfoUpdateTask_handle_port_change_event(self): port_mapping = PortMapping() task = DomInfoUpdateTask(port_mapping) port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) - task.notify_port_change_event(port_change_event) - task.handle_port_change_event() + task.on_port_config_change(port_change_event) assert task.port_mapping.logical_port_list.count('Ethernet0') assert task.port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0 assert task.port_mapping.get_physical_to_logical(1) == ['Ethernet0'] assert task.port_mapping.get_logical_to_physical('Ethernet0') == [1] port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_REMOVE) - task.notify_port_change_event(port_change_event) - task.handle_port_change_event() + task.on_port_config_change(port_change_event) assert not task.port_mapping.logical_port_list assert not task.port_mapping.logical_to_physical assert not task.port_mapping.physical_to_logical assert not task.port_mapping.logical_to_asic + @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None))) + @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock()) def test_DomInfoUpdateTask_task_run_stop(self): port_mapping = PortMapping() task = DomInfoUpdateTask(port_mapping) @@ -527,12 +527,18 @@ def test_DomInfoUpdateTask_task_run_stop(self): @patch('xcvrd.xcvrd_utilities.sfp_status_helper.detect_port_in_error_status') @patch('xcvrd.xcvrd.post_port_dom_info_to_db') @patch('xcvrd.xcvrd.post_port_dom_threshold_info_to_db') - def test_DomInfoUpdateTask_task_worker(self, mock_post_dom_th, mock_post_dom_info, mock_detect_error): + @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) + @patch('swsscommon.swsscommon.SubscriberStateTable') + @patch('swsscommon.swsscommon.Select.select') + def test_DomInfoUpdateTask_task_worker(self, mock_select, mock_sub_table, mock_post_dom_th, mock_post_dom_info, mock_detect_error): + mock_selectable = MagicMock() + mock_selectable.pop = MagicMock(side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (None, None, None), (None, None, None)]) + mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) + mock_sub_table.return_value = mock_selectable + port_mapping = PortMapping() task = DomInfoUpdateTask(port_mapping) task.task_stopping_event.wait = MagicMock(side_effect=[False, True]) - port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) - task.notify_port_change_event(port_change_event) mock_detect_error.return_value = True task.task_worker([False]) assert task.port_mapping.logical_port_list.count('Ethernet0') @@ -559,10 +565,9 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_table_helper): port_mapping = PortMapping() task = SfpStateUpdateTask(port_mapping) port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) - task.notify_port_change_event(port_change_event) wait_time = 5 while wait_time > 0: - task.handle_port_change_event(task.event_queue, stopping_event, [False]) + task.on_port_config_change(stopping_event, [False], port_change_event) if task.port_mapping.logical_port_list: break wait_time -= 1 @@ -573,10 +578,9 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_table_helper): assert task.port_mapping.get_logical_to_physical('Ethernet0') == [1] port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_REMOVE) - task.notify_port_change_event(port_change_event) wait_time = 5 while wait_time > 0: - task.handle_port_change_event(task.event_queue, stopping_event, [False]) + task.on_port_config_change(stopping_event, [False], port_change_event) if not task.port_mapping.logical_port_list: break wait_time -= 1 @@ -638,6 +642,8 @@ def test_SfpStateUpdateTask_mapping_event_from_change_event(self): @patch('time.sleep', MagicMock()) @patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock()) @patch('xcvrd.xcvrd._wrapper_soak_sfp_insert_event', MagicMock()) + @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None))) + @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock()) @patch('os.kill') @patch('xcvrd.xcvrd.SfpStateUpdateTask._mapping_event_from_change_event') @patch('xcvrd.xcvrd._wrapper_get_transceiver_change_event') @@ -656,7 +662,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_ mock_mapping_event.return_value = SYSTEM_NOT_READY # Test state machine: STATE_INIT + SYSTEM_NOT_READY event => STATE_INIT + SYSTEM_NOT_READY event ... => STATE_EXIT - task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + task.task_worker(stop_event, sfp_error_event, [False]) assert mock_os_kill.call_count == 1 assert sfp_error_event.is_set() @@ -664,7 +670,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_ mock_os_kill.reset_mock() sfp_error_event.clear() # Test state machine: STATE_INIT + SYSTEM_FAIL event => STATE_INIT + SYSTEM_FAIL event ... => STATE_EXIT - task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + task.task_worker(stop_event, sfp_error_event, [False]) assert mock_os_kill.call_count == 1 assert sfp_error_event.is_set() @@ -672,7 +678,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_ mock_os_kill.reset_mock() sfp_error_event.clear() # Test state machine: STATE_INIT + SYSTEM_BECOME_READY event => STATE_NORMAL + SYSTEM_NOT_READY event ... => STATE_EXIT - task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + task.task_worker(stop_event, sfp_error_event, [False]) assert mock_os_kill.call_count == 1 assert not sfp_error_event.is_set() @@ -681,7 +687,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_ sfp_error_event.clear() # Test state machine: STATE_INIT + SYSTEM_BECOME_READY event => STATE_NORMAL + SYSTEM_FAIL event ... => STATE_INIT # + SYSTEM_FAIL event ... => STATE_EXIT - task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + task.task_worker(stop_event, sfp_error_event, [False]) assert mock_os_kill.call_count == 1 assert sfp_error_event.is_set() @@ -692,7 +698,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_ mock_post_sfp_info.return_value = SFP_EEPROM_NOT_READY stop_event.is_set = MagicMock(side_effect=[False, True]) # Test state machine: handle SFP insert event, but EEPROM read failure - task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + task.task_worker(stop_event, sfp_error_event, [False]) assert mock_updata_status.call_count == 1 assert mock_post_sfp_info.call_count == 2 # first call and retry call assert mock_post_dom_info.call_count == 0 @@ -706,7 +712,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_ mock_updata_status.reset_mock() mock_post_sfp_info.reset_mock() # Test state machine: handle SFP insert event, and EEPROM read success - task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + task.task_worker(stop_event, sfp_error_event, [False]) assert mock_updata_status.call_count == 1 assert mock_post_sfp_info.call_count == 1 assert mock_post_dom_info.call_count == 1 @@ -717,7 +723,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_ mock_change_event.return_value = (True, {1:SFP_STATUS_REMOVED}, {}) mock_updata_status.reset_mock() # Test state machine: handle SFP remove event - task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + task.task_worker(stop_event, sfp_error_event, [False]) assert mock_updata_status.call_count == 1 assert mock_del_dom.call_count == 1 @@ -727,7 +733,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_ mock_updata_status.reset_mock() mock_del_dom.reset_mock() # Test state machine: handle SFP error event - task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + task.task_worker(stop_event, sfp_error_event, [False]) assert mock_updata_status.call_count == 1 assert mock_del_dom.call_count == 1 diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index b9818927a..3e69009f0 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -8,10 +8,10 @@ try: import ast import copy + import functools import json import multiprocessing import os - import queue import signal import sys import threading @@ -23,7 +23,7 @@ from .xcvrd_utilities import sfp_status_helper from .xcvrd_utilities import y_cable_helper - from .xcvrd_utilities.port_mapping import PortMapping, PortChangeEvent + from .xcvrd_utilities import port_mapping except ImportError as e: raise ImportError(str(e) + " - required module not found") @@ -40,8 +40,6 @@ TRANSCEIVER_DOM_SENSOR_TABLE = 'TRANSCEIVER_DOM_SENSOR' TRANSCEIVER_STATUS_TABLE = 'TRANSCEIVER_STATUS' -SELECT_TIMEOUT_MSECS = 1000 - # Mgminit time required as per CMIS spec MGMT_INIT_TIME_DELAY_SECS = 2 @@ -833,7 +831,6 @@ class DomInfoUpdateTask(object): def __init__(self, port_mapping): self.task_thread = None self.task_stopping_event = threading.Event() - self.event_queue = queue.Queue() self.port_mapping = copy.deepcopy(port_mapping) def task_worker(self, y_cable_presence): @@ -841,6 +838,7 @@ def task_worker(self, y_cable_presence): mux_tbl = {} dom_info_cache = {} dom_th_info_cache = {} + sel, asic_context = port_mapping.subscribe_port_config_change() # Start loop to update dom info in DB periodically while not self.task_stopping_event.wait(DOM_INFO_UPDATE_PERIOD_SECS): @@ -849,7 +847,7 @@ def task_worker(self, y_cable_presence): dom_th_info_cache.clear() # Handle port change event from main thread - self.handle_port_change_event() + port_mapping.handle_port_config_change(sel, asic_context, self.task_stopping_event, self.port_mapping, helper_logger, self.on_port_config_change) logical_port_list = self.port_mapping.logical_port_list for logical_port_name in logical_port_list: # Get the asic to which this port belongs @@ -877,26 +875,10 @@ def task_stop(self): self.task_stopping_event.set() self.task_thread.join() - def notify_port_change_event(self, port_change_event): - """Called by main thread. When main thread detects a port configuration add/remove, it puts an event to - the event queue - - Args: - port_change_event (object): port change event - """ - self.event_queue.put_nowait(port_change_event) - - def handle_port_change_event(self): - """Handle port change event until the queue is empty, update local port mapping and DB data accordingly. - """ - while True: - try: - port_change_event = self.event_queue.get_nowait() - if port_change_event.event_type == PortChangeEvent.PORT_REMOVE: - self.on_remove_logical_port(port_change_event) - self.port_mapping.handle_port_change_event(port_change_event) - except queue.Empty: - break + def on_port_config_change(self, port_change_event): + if port_change_event.event_type == port_mapping.PortChangeEvent.PORT_REMOVE: + self.on_remove_logical_port(port_change_event) + self.port_mapping.handle_port_change_event(port_change_event) def on_remove_logical_port(self, port_change_event): """Called when a logical port is removed from CONFIG_DB @@ -921,7 +903,6 @@ class SfpStateUpdateTask(object): def __init__(self, port_mapping): self.task_process = None self.task_stopping_event = multiprocessing.Event() - self.event_queue = multiprocessing.Queue() self.port_mapping = copy.deepcopy(port_mapping) # A set to hold those SFPs who fail to read EEPROM self.retry_eeprom_set = set() @@ -956,7 +937,7 @@ def _mapping_event_from_change_event(self, status, port_dict): helper_logger.log_debug("mapping from {} {} to {}".format(status, port_dict, event)) return event - def task_worker(self, stopping_event, sfp_error_event, y_cable_presence, event_queue): + def task_worker(self, stopping_event, sfp_error_event, y_cable_presence): helper_logger.log_info("Start SFP monitoring loop") transceiver_dict = {} @@ -1029,8 +1010,10 @@ def task_worker(self, stopping_event, sfp_error_event, y_cable_presence, event_q retry = 0 timeout = RETRY_PERIOD_FOR_SYSTEM_READY_MSECS state = STATE_INIT + sel, asic_context = port_mapping.subscribe_port_config_change() + port_change_event_handler = functools.partial(self.on_port_config_change, stopping_event=stopping_event, y_cable_presence=y_cable_presence) while not stopping_event.is_set(): - self.handle_port_change_event(event_queue, stopping_event, y_cable_presence) + port_mapping.handle_port_config_change(sel, asic_context, stopping_event, self.port_mapping, helper_logger, port_change_event_handler) # Retry those logical ports whose EEPROM reading failed or timeout when the SFP is inserted self.retry_eeprom_reading() @@ -1207,48 +1190,32 @@ def task_run(self, sfp_error_event, y_cable_presence): return self.task_process = multiprocessing.Process(target=self.task_worker, args=( - self.task_stopping_event, sfp_error_event, y_cable_presence, self.event_queue)) + self.task_stopping_event, sfp_error_event, y_cable_presence)) self.task_process.start() def task_stop(self): self.task_stopping_event.set() os.kill(self.task_process.pid, signal.SIGKILL) - def notify_port_change_event(self, port_change_event): - """Called by main thread. When main thread detects a port configuration add/remove, it puts an event to - the event queue - - Args: - port_change_event (object): port change event - """ - self.event_queue.put_nowait(port_change_event) - - def handle_port_change_event(self, event_queue, stopping_event, y_cable_presence): - """Handle port change event until the queue is empty, update local port mapping and DB data accordingly. - """ - while True: - try: - port_change_event = event_queue.get_nowait() - if port_change_event.event_type == PortChangeEvent.PORT_REMOVE: - self.on_remove_logical_port(port_change_event) - # Update y_cable related database once a logical port is removed - y_cable_helper.change_ports_status_for_y_cable_change_event( - {port_change_event.port_name:sfp_status_helper.SFP_STATUS_REMOVED}, - self.port_mapping, - y_cable_presence, - stopping_event) - self.port_mapping.handle_port_change_event(port_change_event) - elif port_change_event.event_type == PortChangeEvent.PORT_ADD: - self.port_mapping.handle_port_change_event(port_change_event) - logical_port_event_dict = self.on_add_logical_port(port_change_event) - # Update y_cable related database once a logical port is added - y_cable_helper.change_ports_status_for_y_cable_change_event( - logical_port_event_dict, - self.port_mapping, - y_cable_presence, - stopping_event) - except queue.Empty: - break + def on_port_config_change(self, stopping_event, y_cable_presence, port_change_event): + if port_change_event.event_type == port_mapping.PortChangeEvent.PORT_REMOVE: + self.on_remove_logical_port(port_change_event) + # Update y_cable related database once a logical port is removed + y_cable_helper.change_ports_status_for_y_cable_change_event( + {port_change_event.port_name:sfp_status_helper.SFP_STATUS_REMOVED}, + self.port_mapping, + y_cable_presence, + stopping_event) + self.port_mapping.handle_port_change_event(port_change_event) + elif port_change_event.event_type == port_mapping.PortChangeEvent.PORT_ADD: + self.port_mapping.handle_port_change_event(port_change_event) + logical_port_event_dict = self.on_add_logical_port(port_change_event) + # Update y_cable related database once a logical port is added + y_cable_helper.change_ports_status_for_y_cable_change_event( + logical_port_event_dict, + self.port_mapping, + y_cable_presence, + stopping_event) def on_remove_logical_port(self, port_change_event): """Called when a logical port is removed from CONFIG_DB. @@ -1401,8 +1368,6 @@ def __init__(self, log_identifier): self.stop_event = threading.Event() self.sfp_error_event = multiprocessing.Event() self.y_cable_presence = [False] - self.port_change_event_observers = [] - self.port_mapping = PortMapping() # Signal handler def signal_handler(self, sig, frame): @@ -1423,87 +1388,22 @@ def wait_for_port_config_done(self, namespace): appl_db = daemon_base.db_connect("APPL_DB", namespace=namespace) sel = swsscommon.Select() - sst = swsscommon.SubscriberStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME) - sel.addSelectable(sst) + port_tbl = swsscommon.SubscriberStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME) + sel.addSelectable(port_tbl) # Make sure this daemon started after all port configured while not self.stop_event.is_set(): - (state, c) = sel.select(SELECT_TIMEOUT_MSECS) + (state, c) = sel.select(port_mapping.SELECT_TIMEOUT_MSECS) if state == swsscommon.Select.TIMEOUT: continue if state != swsscommon.Select.OBJECT: self.log_warning("sel.select() did not return swsscommon.Select.OBJECT") continue - (key, op, fvp) = sst.pop() + (key, op, fvp) = port_tbl.pop() if key in ["PortConfigDone", "PortInitDone"]: break - def handle_port_config_change(self): - """Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers - """ - sel = swsscommon.Select() - asic_context = {} - namespaces = multi_asic.get_front_end_namespaces() - for namespace in namespaces: - config_db = daemon_base.db_connect("CONFIG_DB", namespace=namespace) - asic_id = multi_asic.get_asic_index_from_namespace(namespace) - sst = swsscommon.SubscriberStateTable(config_db, swsscommon.CFG_PORT_TABLE_NAME) - asic_context[sst] = asic_id - sel.addSelectable(sst) - - while not self.stop_event.is_set(): - (state, _) = sel.select(SELECT_TIMEOUT_MSECS) - if state == swsscommon.Select.TIMEOUT: - continue - if state != swsscommon.Select.OBJECT: - self.log_warning("sel.select() did not return swsscommon.Select.OBJECT") - continue - - for sst in asic_context.keys(): - while True: - (key, op, fvp) = sst.pop() - if not key: - break - if op == swsscommon.SET_COMMAND: - # Only process new logical port, here we assume that the physical index for a logical port should - # never change, if user configure a new value for the "index" field for exsitng locial port, it is - # an error - if not self.port_mapping.is_logical_port(key): - fvp = dict(fvp) - if 'index' in fvp: - port_change_event = PortChangeEvent(key, fvp['index'], asic_context[sst], PortChangeEvent.PORT_ADD) - self.notify_port_change_event(port_change_event) - elif op == swsscommon.DEL_COMMAND: - if self.port_mapping.is_logical_port(key): - port_change_event = PortChangeEvent(key, - self.port_mapping.get_logical_to_physical(key)[0], - asic_context[sst], - PortChangeEvent.PORT_REMOVE) - self.notify_port_change_event(port_change_event) - else: - self.log_warning("Invalid DB operation: {}".format(op)) - - def notify_port_change_event(self, port_change_event): - """Notify observers that there is a port change event - - Args: - port_change_event (object): port change event - """ - - self.log_notice('Sending port change event {} to observers'.format(port_change_event)) - for observer in self.port_change_event_observers: - observer.notify_port_change_event(port_change_event) - self.port_mapping.handle_port_change_event(port_change_event) - - def subscribe_port_change_event(self, observer): - """Subscribe port change event - - Args: - observer (object): observer who listen to port change event - """ - self.port_change_event_observers.append(observer) - def load_media_settings(self): global g_dict (platform_path, _) = device_info.get_paths_to_platform_and_hwsku_dirs() @@ -1515,20 +1415,6 @@ def load_media_settings(self): with open(media_settings_file_path, "r") as media_file: g_dict = json.load(media_file) - - def init_port_mapping(self): - """Initialize port mapping from CONFIG_DB for the first run - """ - namespaces = multi_asic.get_front_end_namespaces() - for namespace in namespaces: - asic_id = multi_asic.get_asic_index_from_namespace(namespace) - config_db = daemon_base.db_connect("CONFIG_DB", namespace=namespace) - port_table = swsscommon.Table(config_db, swsscommon.CFG_PORT_TABLE_NAME) - for key in port_table.getKeys(): - _, port_config = port_table.get(key) - port_config_dict = dict(port_config) - port_change_event = PortChangeEvent(key, port_config_dict['index'], asic_id, PortChangeEvent.PORT_ADD) - self.port_mapping.handle_port_change_event(port_change_event) # Initialize daemon def init(self): @@ -1579,38 +1465,42 @@ def init(self): for namespace in xcvr_table_helper.namespaces: self.wait_for_port_config_done(namespace) - self.init_port_mapping() + + port_mapping_data = port_mapping.get_port_mapping() # Post all the current interface dom/sfp info to STATE_DB self.log_info("Post all port DOM/SFP info to DB") - post_port_sfp_dom_info_to_db(is_warm_start, self.port_mapping, self.stop_event) + post_port_sfp_dom_info_to_db(is_warm_start, port_mapping_data, self.stop_event) # Init port sfp status table self.log_info("Init port sfp status table") - init_port_sfp_status_tbl(self.port_mapping, self.stop_event) + init_port_sfp_status_tbl(port_mapping_data, self.stop_event) # Init port y_cable status table y_cable_helper.init_ports_status_for_y_cable( - platform_sfputil, platform_chassis, self.y_cable_presence, self.port_mapping, self.stop_event) + platform_sfputil, platform_chassis, self.y_cable_presence, port_mapping_data, self.stop_event) + + return port_mapping_data # Deinitialize daemon def deinit(self): self.log_info("Start daemon deinit...") # Delete all the information from DB and then exit - logical_port_list = self.port_mapping.logical_port_list + port_mapping_data = port_mapping.get_port_mapping() + logical_port_list = port_mapping_data.logical_port_list for logical_port_name in logical_port_list: # Get the asic to which this port belongs - asic_index = self.port_mapping.get_asic_id_for_logical_port(logical_port_name) + asic_index = port_mapping_data.get_asic_id_for_logical_port(logical_port_name) if asic_index is None: helper_logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) continue - del_port_sfp_dom_info_from_db(logical_port_name, self.port_mapping, xcvr_table_helper.get_int_tbl(asic_index), xcvr_table_helper.get_dom_tbl(asic_index)) + del_port_sfp_dom_info_from_db(logical_port_name, port_mapping_data, xcvr_table_helper.get_int_tbl(asic_index), xcvr_table_helper.get_dom_tbl(asic_index)) delete_port_from_status_table(logical_port_name, xcvr_table_helper.get_status_tbl(asic_index)) if self.y_cable_presence[0] is True: - y_cable_helper.delete_ports_status_for_y_cable(self.port_mapping) + y_cable_helper.delete_ports_status_for_y_cable(port_mapping_data) del globals()['platform_chassis'] @@ -1620,28 +1510,26 @@ def run(self): self.log_info("Starting up...") # Start daemon initialization sequence - self.init() + port_mapping_data = self.init() # Start the dom sensor info update thread - dom_info_update = DomInfoUpdateTask(self.port_mapping) - self.subscribe_port_change_event(dom_info_update) + dom_info_update = DomInfoUpdateTask(port_mapping_data) dom_info_update.task_run(self.y_cable_presence) # Start the sfp state info update process - sfp_state_update = SfpStateUpdateTask(self.port_mapping) - self.subscribe_port_change_event(sfp_state_update) + sfp_state_update = SfpStateUpdateTask(port_mapping_data) sfp_state_update.task_run(self.sfp_error_event, self.y_cable_presence) # Start the Y-cable state info update process if Y cable presence established y_cable_state_update = None if self.y_cable_presence[0] is True: - y_cable_state_update = y_cable_helper.YCableTableUpdateTask(self.port_mapping) + y_cable_state_update = y_cable_helper.YCableTableUpdateTask(port_mapping_data) y_cable_state_update.task_run() # Start main loop self.log_info("Start daemon main loop") - self.handle_port_config_change() + self.stop_event.wait() self.log_info("Stop daemon main loop") diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py index d2ecd7314..bf44fb70b 100644 --- a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py @@ -1,3 +1,10 @@ +from sonic_py_common import daemon_base +from sonic_py_common import multi_asic +from swsscommon import swsscommon + +SELECT_TIMEOUT_MSECS = 1000 + + class PortChangeEvent: PORT_ADD = 0 PORT_REMOVE = 1 @@ -75,3 +82,85 @@ def logical_port_name_to_physical_port_list(self, port_name): return self.get_logical_to_physical(port_name) else: return None + + +def subscribe_port_config_change(): + sel = swsscommon.Select() + asic_context = {} + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + config_db = daemon_base.db_connect("CONFIG_DB", namespace=namespace) + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + port_tbl = swsscommon.SubscriberStateTable(config_db, swsscommon.CFG_PORT_TABLE_NAME) + asic_context[port_tbl] = asic_id + sel.addSelectable(port_tbl) + return sel, asic_context + + +def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_change_event_handler): + """Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers + """ + if not stop_event.is_set(): + (state, _) = sel.select(SELECT_TIMEOUT_MSECS) + if state == swsscommon.Select.TIMEOUT: + return + if state != swsscommon.Select.OBJECT: + logger.log_warning('sel.select() did not return swsscommon.Select.OBJECT') + return + + read_port_config_change(asic_context, port_mapping, logger, port_change_event_handler) + + +def read_port_config_change(asic_context, port_mapping, logger, port_change_event_handler): + for port_tbl in asic_context.keys(): + while True: + (key, op, fvp) = port_tbl.pop() + if not key: + break + if op == swsscommon.SET_COMMAND: + fvp = dict(fvp) + if 'index' not in fvp: + continue + + new_physical_index = int(fvp['index']) + if not port_mapping.is_logical_port(key): + # New logical port created + port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_ADD) + port_change_event_handler(port_change_event) + else: + current_physical_index = port_mapping.get_logical_to_physical(key)[0] + if current_physical_index != new_physical_index: + port_change_event = PortChangeEvent(key, + current_physical_index, + asic_context[port_tbl], + PortChangeEvent.PORT_REMOVE) + port_change_event_handler(port_change_event) + + port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_ADD) + port_change_event_handler(port_change_event) + elif op == swsscommon.DEL_COMMAND: + if port_mapping.is_logical_port(key): + port_change_event = PortChangeEvent(key, + port_mapping.get_logical_to_physical(key)[0], + asic_context[port_tbl], + PortChangeEvent.PORT_REMOVE) + port_change_event_handler(port_change_event) + else: + logger.log_warning('Invalid DB operation: {}'.format(op)) + + +def get_port_mapping(): + """Get port mapping from CONFIG_DB + """ + port_mapping = PortMapping() + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + config_db = daemon_base.db_connect("CONFIG_DB", namespace=namespace) + port_table = swsscommon.Table(config_db, swsscommon.CFG_PORT_TABLE_NAME) + for key in port_table.getKeys(): + _, port_config = port_table.get(key) + port_config_dict = dict(port_config) + port_change_event = PortChangeEvent(key, port_config_dict['index'], asic_id, PortChangeEvent.PORT_ADD) + port_mapping.handle_port_change_event(port_change_event) + return port_mapping \ No newline at end of file diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/y_cable_helper.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/y_cable_helper.py index 1e070909e..46f4ab3ae 100644 --- a/sonic-xcvrd/xcvrd/xcvrd_utilities/y_cable_helper.py +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/y_cable_helper.py @@ -17,6 +17,7 @@ from sonic_y_cable import y_cable_vendor_mapping from swsscommon import swsscommon from . import sfp_status_helper +from .port_mapping import read_port_config_change SELECT_TIMEOUT = 1000 @@ -1436,16 +1437,12 @@ def __init__(self, port_mapping): self.task_cli_thread = None self.task_download_firmware_thread = {} self.task_stopping_event = threading.Event() - self.event_queue = queue.Queue() self.port_mapping = copy.deepcopy(port_mapping) if multi_asic.is_multi_asic(): # Load the namespace details first from the database_global.json file. swsscommon.SonicDBConfig.initializeGlobalConfig() - def notify_port_change_event(self, port_change_event): - self.event_queue.put_nowait(port_change_event) - def handle_port_change_event(self): while True: try: @@ -1461,6 +1458,7 @@ def task_worker(self): y_cable_tbl_keys = {} mux_cable_command_tbl, y_cable_command_tbl = {}, {} mux_metrics_tbl = {} + asic_context = {} sel = swsscommon.Select() @@ -1483,8 +1481,11 @@ def task_worker(self): mux_metrics_tbl[asic_id] = swsscommon.Table( state_db[asic_id], swsscommon.STATE_MUX_METRICS_TABLE_NAME) y_cable_tbl_keys[asic_id] = y_cable_tbl[asic_id].getKeys() + port_tbl = swsscommon.SubscriberStateTable(config_db[asic_id], swsscommon.CFG_PORT_TABLE_NAME) + asic_context[port_tbl] = asic_id sel.addSelectable(status_tbl[asic_id]) sel.addSelectable(mux_cable_command_tbl[asic_id]) + sel.addSelectable(port_tbl) # Listen indefinitely for changes to the HW_MUX_CABLE_TABLE in the Application DB's while True: @@ -1510,7 +1511,7 @@ def task_worker(self): # Get the corresponding namespace from redisselect db connector object namespace = redisSelectObj.getDbConnector().getNamespace() asic_index = multi_asic.get_asic_index_from_namespace(namespace) - self.handle_port_change_event() + read_port_config_change(asic_context, self.port_mapping, helper_logger, self.port_mapping.handle_port_change_event) while True: (port, op, fvp) = status_tbl[asic_index].pop() From da0761c1aeffc1afffe417deca9e819638912d67 Mon Sep 17 00:00:00 2001 From: junchao Date: Fri, 17 Sep 2021 10:49:06 +0800 Subject: [PATCH 2/2] Use select in xcvrd tasks --- sonic-xcvrd/tests/test_xcvrd.py | 157 ++++++----- sonic-xcvrd/xcvrd/xcvrd.py | 246 +++++------------- .../xcvrd/xcvrd_utilities/port_mapping.py | 89 +++++++ .../xcvrd/xcvrd_utilities/y_cable_helper.py | 11 +- 4 files changed, 250 insertions(+), 253 deletions(-) diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index 4035bffb2..d974a6699 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -32,6 +32,8 @@ from xcvrd.xcvrd import * from xcvrd.xcvrd_utilities.y_cable_helper import * from xcvrd.xcvrd_utilities.sfp_status_helper import * +from xcvrd.xcvrd_utilities.port_mapping import * +#from xcvrd.xcvrd_utilities.port_mapping import PortMapping, subscribe_port_config_change, handle_port_config_change class TestXcvrdScript(object): @@ -409,62 +411,61 @@ def test_is_error_sfp_status(self): @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) @patch('swsscommon.swsscommon.SubscriberStateTable') @patch('swsscommon.swsscommon.Select.select') - def test_DaemonXcvrd_wait_for_port_config_done(self, mock_select, mock_sub_table): - mock_selectable = MagicMock() - mock_selectable.pop = MagicMock(side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('PortConfigDone', None, None)]) - mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) - mock_sub_table.return_value = mock_selectable - xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) - xcvrd.wait_for_port_config_done('') - assert swsscommon.Select.select.call_count == 2 - - @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) - @patch('swsscommon.swsscommon.SubscriberStateTable') - @patch('swsscommon.swsscommon.Select.select') - def test_DaemonXcvrd_handle_port_config_change(self, mock_select, mock_sub_table): + def test_handle_port_config_change(self, mock_select, mock_sub_table): mock_selectable = MagicMock() mock_selectable.pop = MagicMock(side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (None, None, None)]) mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) mock_sub_table.return_value = mock_selectable - xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) - xcvrd.stop_event.is_set = MagicMock(side_effect=[False, True]) - mock_observer = MagicMock() - mock_observer.notify_port_change_event = MagicMock() - xcvrd.subscribe_port_change_event(mock_observer) - - xcvrd.handle_port_config_change() - assert mock_observer.notify_port_change_event.call_count == 1 - assert xcvrd.port_mapping.logical_port_list.count('Ethernet0') - assert xcvrd.port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0 - assert xcvrd.port_mapping.get_physical_to_logical(1) == ['Ethernet0'] - assert xcvrd.port_mapping.get_logical_to_physical('Ethernet0') == [1] - - xcvrd.stop_event.is_set = MagicMock(side_effect=[False, True]) + + sel, asic_context = subscribe_port_config_change() + port_mapping = PortMapping() + stop_event = threading.Event() + stop_event.is_set = MagicMock(return_value=False) + logger = MagicMock() + handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_mapping.handle_port_change_event) + + assert port_mapping.logical_port_list.count('Ethernet0') + assert port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0 + assert port_mapping.get_physical_to_logical(1) == ['Ethernet0'] + assert port_mapping.get_logical_to_physical('Ethernet0') == [1] + mock_selectable.pop = MagicMock(side_effect=[('Ethernet0', swsscommon.DEL_COMMAND, (('index', '1'), )), (None, None, None)]) - xcvrd.handle_port_config_change() - assert mock_observer.notify_port_change_event.call_count == 2 - assert not xcvrd.port_mapping.logical_port_list - assert not xcvrd.port_mapping.logical_to_physical - assert not xcvrd.port_mapping.physical_to_logical - assert not xcvrd.port_mapping.logical_to_asic + handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_mapping.handle_port_change_event) + assert not port_mapping.logical_port_list + assert not port_mapping.logical_to_physical + assert not port_mapping.physical_to_logical + assert not port_mapping.logical_to_asic + @patch('swsscommon.swsscommon.Table') - def test_DaemonXcvrd_init_port_mapping(self, mock_swsscommon_table): + def test_get_port_mapping(self, mock_swsscommon_table): mock_table = MagicMock() mock_table.getKeys = MagicMock(return_value=['Ethernet0', 'Ethernet4']) mock_table.get = MagicMock(side_effect=[(True, (('index', 1), )), (True, (('index', 2), ))]) mock_swsscommon_table.return_value = mock_table - xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) - xcvrd.init_port_mapping() - assert xcvrd.port_mapping.logical_port_list.count('Ethernet0') - assert xcvrd.port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0 - assert xcvrd.port_mapping.get_physical_to_logical(1) == ['Ethernet0'] - assert xcvrd.port_mapping.get_logical_to_physical('Ethernet0') == [1] + port_mapping = get_port_mapping() + assert port_mapping.logical_port_list.count('Ethernet0') + assert port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0 + assert port_mapping.get_physical_to_logical(1) == ['Ethernet0'] + assert port_mapping.get_logical_to_physical('Ethernet0') == [1] - assert xcvrd.port_mapping.logical_port_list.count('Ethernet4') - assert xcvrd.port_mapping.get_asic_id_for_logical_port('Ethernet4') == 0 - assert xcvrd.port_mapping.get_physical_to_logical(2) == ['Ethernet4'] - assert xcvrd.port_mapping.get_logical_to_physical('Ethernet4') == [2] + assert port_mapping.logical_port_list.count('Ethernet4') + assert port_mapping.get_asic_id_for_logical_port('Ethernet4') == 0 + assert port_mapping.get_physical_to_logical(2) == ['Ethernet4'] + assert port_mapping.get_logical_to_physical('Ethernet4') == [2] + + + @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) + @patch('swsscommon.swsscommon.SubscriberStateTable') + @patch('swsscommon.swsscommon.Select.select') + def test_DaemonXcvrd_wait_for_port_config_done(self, mock_select, mock_sub_table): + mock_selectable = MagicMock() + mock_selectable.pop = MagicMock(side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('PortConfigDone', None, None)]) + mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) + mock_sub_table.return_value = mock_selectable + xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) + xcvrd.wait_for_port_config_done('') + assert swsscommon.Select.select.call_count == 2 @patch('xcvrd.xcvrd.DaemonXcvrd.load_platform_util', MagicMock()) @patch('sonic_py_common.device_info.get_paths_to_platform_and_hwsku_dirs', MagicMock(return_value=('/tmp', None))) @@ -479,20 +480,20 @@ def test_DaemonXcvrd_init_deinit(self): @patch('xcvrd.xcvrd.DaemonXcvrd.init') @patch('xcvrd.xcvrd.DaemonXcvrd.deinit') - @patch('xcvrd.xcvrd.DaemonXcvrd.handle_port_config_change') @patch('xcvrd.xcvrd.DomInfoUpdateTask.task_run') @patch('xcvrd.xcvrd.SfpStateUpdateTask.task_run') @patch('xcvrd.xcvrd.DomInfoUpdateTask.task_stop') @patch('xcvrd.xcvrd.SfpStateUpdateTask.task_stop') - def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, mock_task_run2, mock_handle_port_config_change, mock_deinit, mock_init): + def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, mock_task_run2, mock_deinit, mock_init): + mock_init.return_value = (PortMapping(), set()) xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) + xcvrd.stop_event.wait = MagicMock() xcvrd.run() # TODO: more check assert mock_task_stop1.call_count == 1 assert mock_task_stop2.call_count == 1 assert mock_task_run1.call_count == 1 assert mock_task_run2.call_count == 1 - assert mock_handle_port_config_change.call_count == 1 assert mock_deinit.call_count == 1 assert mock_init.call_count == 1 @@ -501,21 +502,21 @@ def test_DomInfoUpdateTask_handle_port_change_event(self): port_mapping = PortMapping() task = DomInfoUpdateTask(port_mapping) port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) - task.notify_port_change_event(port_change_event) - task.handle_port_change_event() + task.on_port_config_change(port_change_event) assert task.port_mapping.logical_port_list.count('Ethernet0') assert task.port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0 assert task.port_mapping.get_physical_to_logical(1) == ['Ethernet0'] assert task.port_mapping.get_logical_to_physical('Ethernet0') == [1] port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_REMOVE) - task.notify_port_change_event(port_change_event) - task.handle_port_change_event() + task.on_port_config_change(port_change_event) assert not task.port_mapping.logical_port_list assert not task.port_mapping.logical_to_physical assert not task.port_mapping.physical_to_logical assert not task.port_mapping.logical_to_asic + @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None))) + @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock()) def test_DomInfoUpdateTask_task_run_stop(self): port_mapping = PortMapping() task = DomInfoUpdateTask(port_mapping) @@ -527,12 +528,18 @@ def test_DomInfoUpdateTask_task_run_stop(self): @patch('xcvrd.xcvrd_utilities.sfp_status_helper.detect_port_in_error_status') @patch('xcvrd.xcvrd.post_port_dom_info_to_db') @patch('xcvrd.xcvrd.post_port_dom_threshold_info_to_db') - def test_DomInfoUpdateTask_task_worker(self, mock_post_dom_th, mock_post_dom_info, mock_detect_error): + @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) + @patch('swsscommon.swsscommon.SubscriberStateTable') + @patch('swsscommon.swsscommon.Select.select') + def test_DomInfoUpdateTask_task_worker(self, mock_select, mock_sub_table, mock_post_dom_th, mock_post_dom_info, mock_detect_error): + mock_selectable = MagicMock() + mock_selectable.pop = MagicMock(side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (None, None, None), (None, None, None)]) + mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) + mock_sub_table.return_value = mock_selectable + port_mapping = PortMapping() task = DomInfoUpdateTask(port_mapping) task.task_stopping_event.wait = MagicMock(side_effect=[False, True]) - port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) - task.notify_port_change_event(port_change_event) mock_detect_error.return_value = True task.task_worker([False]) assert task.port_mapping.logical_port_list.count('Ethernet0') @@ -557,12 +564,12 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_table_helper): mock_table_helper.get_dom_tbl = MagicMock(return_value=mock_table) stopping_event = multiprocessing.Event() port_mapping = PortMapping() - task = SfpStateUpdateTask(port_mapping) + retry_eeprom_set = set() + task = SfpStateUpdateTask(port_mapping, retry_eeprom_set) port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) - task.notify_port_change_event(port_change_event) wait_time = 5 while wait_time > 0: - task.handle_port_change_event(task.event_queue, stopping_event, [False]) + task.on_port_config_change(stopping_event, [False], port_change_event) if task.port_mapping.logical_port_list: break wait_time -= 1 @@ -573,10 +580,9 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_table_helper): assert task.port_mapping.get_logical_to_physical('Ethernet0') == [1] port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_REMOVE) - task.notify_port_change_event(port_change_event) wait_time = 5 while wait_time > 0: - task.handle_port_change_event(task.event_queue, stopping_event, [False]) + task.on_port_config_change(stopping_event, [False], port_change_event) if not task.port_mapping.logical_port_list: break wait_time -= 1 @@ -588,7 +594,8 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_table_helper): def test_SfpStateUpdateTask_task_run_stop(self): port_mapping = PortMapping() - task = SfpStateUpdateTask(port_mapping) + retry_eeprom_set = set() + task = SfpStateUpdateTask(port_mapping, retry_eeprom_set) sfp_error_event = multiprocessing.Event() task.task_run(sfp_error_event, [False]) assert wait_until(5, 1, task.task_process.is_alive) @@ -599,7 +606,8 @@ def test_SfpStateUpdateTask_task_run_stop(self): @patch('xcvrd.xcvrd.post_port_sfp_info_to_db') def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_post_sfp_info): port_mapping = PortMapping() - task = SfpStateUpdateTask(port_mapping) + retry_eeprom_set = set() + task = SfpStateUpdateTask(port_mapping, retry_eeprom_set) task.retry_eeprom_reading() assert mock_post_sfp_info.call_count == 0 @@ -620,7 +628,8 @@ def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_post_sfp_info): def test_SfpStateUpdateTask_mapping_event_from_change_event(self): port_mapping = PortMapping() - task = SfpStateUpdateTask(port_mapping) + retry_eeprom_set = set() + task = SfpStateUpdateTask(port_mapping, retry_eeprom_set) port_dict = {} assert task._mapping_event_from_change_event(False, port_dict) == SYSTEM_FAIL assert port_dict[EVENT_ON_ALL_SFP] == SYSTEM_FAIL @@ -638,6 +647,8 @@ def test_SfpStateUpdateTask_mapping_event_from_change_event(self): @patch('time.sleep', MagicMock()) @patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock()) @patch('xcvrd.xcvrd._wrapper_soak_sfp_insert_event', MagicMock()) + @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None))) + @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock()) @patch('os.kill') @patch('xcvrd.xcvrd.SfpStateUpdateTask._mapping_event_from_change_event') @patch('xcvrd.xcvrd._wrapper_get_transceiver_change_event') @@ -649,14 +660,15 @@ def test_SfpStateUpdateTask_mapping_event_from_change_event(self): @patch('xcvrd.xcvrd.update_port_transceiver_status_table') def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_info, mock_post_dom_info, mock_post_dom_th, mock_update_media_setting, mock_del_dom, mock_change_event, mock_mapping_event, mock_os_kill): port_mapping = PortMapping() - task = SfpStateUpdateTask(port_mapping) + retry_eeprom_set = set() + task = SfpStateUpdateTask(port_mapping, retry_eeprom_set) stop_event = multiprocessing.Event() sfp_error_event = multiprocessing.Event() mock_change_event.return_value = (True, {0:0}, {}) mock_mapping_event.return_value = SYSTEM_NOT_READY # Test state machine: STATE_INIT + SYSTEM_NOT_READY event => STATE_INIT + SYSTEM_NOT_READY event ... => STATE_EXIT - task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + task.task_worker(stop_event, sfp_error_event, [False]) assert mock_os_kill.call_count == 1 assert sfp_error_event.is_set() @@ -664,7 +676,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_ mock_os_kill.reset_mock() sfp_error_event.clear() # Test state machine: STATE_INIT + SYSTEM_FAIL event => STATE_INIT + SYSTEM_FAIL event ... => STATE_EXIT - task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + task.task_worker(stop_event, sfp_error_event, [False]) assert mock_os_kill.call_count == 1 assert sfp_error_event.is_set() @@ -672,7 +684,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_ mock_os_kill.reset_mock() sfp_error_event.clear() # Test state machine: STATE_INIT + SYSTEM_BECOME_READY event => STATE_NORMAL + SYSTEM_NOT_READY event ... => STATE_EXIT - task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + task.task_worker(stop_event, sfp_error_event, [False]) assert mock_os_kill.call_count == 1 assert not sfp_error_event.is_set() @@ -681,7 +693,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_ sfp_error_event.clear() # Test state machine: STATE_INIT + SYSTEM_BECOME_READY event => STATE_NORMAL + SYSTEM_FAIL event ... => STATE_INIT # + SYSTEM_FAIL event ... => STATE_EXIT - task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + task.task_worker(stop_event, sfp_error_event, [False]) assert mock_os_kill.call_count == 1 assert sfp_error_event.is_set() @@ -692,7 +704,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_ mock_post_sfp_info.return_value = SFP_EEPROM_NOT_READY stop_event.is_set = MagicMock(side_effect=[False, True]) # Test state machine: handle SFP insert event, but EEPROM read failure - task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + task.task_worker(stop_event, sfp_error_event, [False]) assert mock_updata_status.call_count == 1 assert mock_post_sfp_info.call_count == 2 # first call and retry call assert mock_post_dom_info.call_count == 0 @@ -706,7 +718,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_ mock_updata_status.reset_mock() mock_post_sfp_info.reset_mock() # Test state machine: handle SFP insert event, and EEPROM read success - task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + task.task_worker(stop_event, sfp_error_event, [False]) assert mock_updata_status.call_count == 1 assert mock_post_sfp_info.call_count == 1 assert mock_post_dom_info.call_count == 1 @@ -717,7 +729,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_ mock_change_event.return_value = (True, {1:SFP_STATUS_REMOVED}, {}) mock_updata_status.reset_mock() # Test state machine: handle SFP remove event - task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + task.task_worker(stop_event, sfp_error_event, [False]) assert mock_updata_status.call_count == 1 assert mock_del_dom.call_count == 1 @@ -727,7 +739,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_ mock_updata_status.reset_mock() mock_del_dom.reset_mock() # Test state machine: handle SFP error event - task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + task.task_worker(stop_event, sfp_error_event, [False]) assert mock_updata_status.call_count == 1 assert mock_del_dom.call_count == 1 @@ -756,7 +768,8 @@ class MockTable: mock_table_helper.get_dom_tbl = MagicMock(return_value=dom_tbl) port_mapping = PortMapping() - task = SfpStateUpdateTask(port_mapping) + retry_eeprom_set = set() + task = SfpStateUpdateTask(port_mapping, retry_eeprom_set) port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) task.port_mapping.handle_port_change_event(port_change_event) # SFP information is in the DB, copy the SFP information for the newly added logical port diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index b9818927a..a07aa1a08 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -8,10 +8,10 @@ try: import ast import copy + import functools import json import multiprocessing import os - import queue import signal import sys import threading @@ -23,7 +23,7 @@ from .xcvrd_utilities import sfp_status_helper from .xcvrd_utilities import y_cable_helper - from .xcvrd_utilities.port_mapping import PortMapping, PortChangeEvent + from .xcvrd_utilities import port_mapping except ImportError as e: raise ImportError(str(e) + " - required module not found") @@ -40,8 +40,6 @@ TRANSCEIVER_DOM_SENSOR_TABLE = 'TRANSCEIVER_DOM_SENSOR' TRANSCEIVER_STATUS_TABLE = 'TRANSCEIVER_STATUS' -SELECT_TIMEOUT_MSECS = 1000 - # Mgminit time required as per CMIS spec MGMT_INIT_TIME_DELAY_SECS = 2 @@ -49,7 +47,7 @@ SFP_INSERT_EVENT_POLL_PERIOD_MSECS = 1000 DOM_INFO_UPDATE_PERIOD_SECS = 60 -STATE_MACHINE_UPDATE_PERIOD_MSECS = 6000 +STATE_MACHINE_UPDATE_PERIOD_MSECS = 60000 TIME_FOR_SFP_READY_SECS = 1 EVENT_ON_ALL_SFP = '-1' @@ -496,6 +494,7 @@ def post_port_dom_info_to_db(logical_port_name, port_mapping, table, stop_event= def post_port_sfp_dom_info_to_db(is_warm_start, port_mapping, stop_event=threading.Event()): # Connect to STATE_DB and create transceiver dom/sfp info tables transceiver_dict = {} + retry_eeprom_set = set() # Post all the current interface dom/sfp info to STATE_DB logical_port_list = port_mapping.logical_port_list @@ -508,14 +507,19 @@ def post_port_sfp_dom_info_to_db(is_warm_start, port_mapping, stop_event=threadi if asic_index is None: helper_logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) continue - post_port_sfp_info_to_db(logical_port_name, port_mapping, xcvr_table_helper.get_int_tbl(asic_index), transceiver_dict, stop_event) - post_port_dom_info_to_db(logical_port_name, port_mapping, xcvr_table_helper.get_dom_tbl(asic_index), stop_event) - post_port_dom_threshold_info_to_db(logical_port_name, port_mapping, xcvr_table_helper.get_dom_tbl(asic_index), stop_event) - - # Do not notify media settings during warm reboot to avoid dataplane traffic impact - if is_warm_start == False: - notify_media_setting(logical_port_name, transceiver_dict, xcvr_table_helper.get_app_port_tbl(asic_index), port_mapping) - transceiver_dict.clear() + rc = post_port_sfp_info_to_db(logical_port_name, port_mapping, xcvr_table_helper.get_int_tbl(asic_index), transceiver_dict, stop_event) + if rc != SFP_EEPROM_NOT_READY: + post_port_dom_info_to_db(logical_port_name, port_mapping, xcvr_table_helper.get_dom_tbl(asic_index), stop_event) + post_port_dom_threshold_info_to_db(logical_port_name, port_mapping, xcvr_table_helper.get_dom_tbl(asic_index), stop_event) + + # Do not notify media settings during warm reboot to avoid dataplane traffic impact + if is_warm_start == False: + notify_media_setting(logical_port_name, transceiver_dict, xcvr_table_helper.get_app_port_tbl(asic_index), port_mapping) + transceiver_dict.clear() + else: + retry_eeprom_set.add(logical_port_name) + + return retry_eeprom_set # Delete port dom/sfp info from db @@ -833,7 +837,6 @@ class DomInfoUpdateTask(object): def __init__(self, port_mapping): self.task_thread = None self.task_stopping_event = threading.Event() - self.event_queue = queue.Queue() self.port_mapping = copy.deepcopy(port_mapping) def task_worker(self, y_cable_presence): @@ -841,6 +844,7 @@ def task_worker(self, y_cable_presence): mux_tbl = {} dom_info_cache = {} dom_th_info_cache = {} + sel, asic_context = port_mapping.subscribe_port_config_change() # Start loop to update dom info in DB periodically while not self.task_stopping_event.wait(DOM_INFO_UPDATE_PERIOD_SECS): @@ -849,7 +853,7 @@ def task_worker(self, y_cable_presence): dom_th_info_cache.clear() # Handle port change event from main thread - self.handle_port_change_event() + port_mapping.handle_port_config_change(sel, asic_context, self.task_stopping_event, self.port_mapping, helper_logger, self.on_port_config_change) logical_port_list = self.port_mapping.logical_port_list for logical_port_name in logical_port_list: # Get the asic to which this port belongs @@ -877,26 +881,10 @@ def task_stop(self): self.task_stopping_event.set() self.task_thread.join() - def notify_port_change_event(self, port_change_event): - """Called by main thread. When main thread detects a port configuration add/remove, it puts an event to - the event queue - - Args: - port_change_event (object): port change event - """ - self.event_queue.put_nowait(port_change_event) - - def handle_port_change_event(self): - """Handle port change event until the queue is empty, update local port mapping and DB data accordingly. - """ - while True: - try: - port_change_event = self.event_queue.get_nowait() - if port_change_event.event_type == PortChangeEvent.PORT_REMOVE: - self.on_remove_logical_port(port_change_event) - self.port_mapping.handle_port_change_event(port_change_event) - except queue.Empty: - break + def on_port_config_change(self, port_change_event): + if port_change_event.event_type == port_mapping.PortChangeEvent.PORT_REMOVE: + self.on_remove_logical_port(port_change_event) + self.port_mapping.handle_port_change_event(port_change_event) def on_remove_logical_port(self, port_change_event): """Called when a logical port is removed from CONFIG_DB @@ -918,13 +906,12 @@ def on_remove_logical_port(self, port_change_event): class SfpStateUpdateTask(object): RETRY_EEPROM_READING_INTERVAL = 60 - def __init__(self, port_mapping): + def __init__(self, port_mapping, retry_eeprom_set): self.task_process = None self.task_stopping_event = multiprocessing.Event() - self.event_queue = multiprocessing.Queue() self.port_mapping = copy.deepcopy(port_mapping) # A set to hold those SFPs who fail to read EEPROM - self.retry_eeprom_set = set() + self.retry_eeprom_set = retry_eeprom_set # To avoid retry EEPROM read too fast, record the last EEPROM read timestamp in this member self.last_retry_eeprom_time = 0 # A dict to hold SFP error event, for SFP insert/remove event, it is not necessary to cache them @@ -956,7 +943,7 @@ def _mapping_event_from_change_event(self, status, port_dict): helper_logger.log_debug("mapping from {} {} to {}".format(status, port_dict, event)) return event - def task_worker(self, stopping_event, sfp_error_event, y_cable_presence, event_queue): + def task_worker(self, stopping_event, sfp_error_event, y_cable_presence): helper_logger.log_info("Start SFP monitoring loop") transceiver_dict = {} @@ -1029,8 +1016,10 @@ def task_worker(self, stopping_event, sfp_error_event, y_cable_presence, event_q retry = 0 timeout = RETRY_PERIOD_FOR_SYSTEM_READY_MSECS state = STATE_INIT + sel, asic_context = port_mapping.subscribe_port_config_change() + port_change_event_handler = functools.partial(self.on_port_config_change, stopping_event, y_cable_presence) while not stopping_event.is_set(): - self.handle_port_change_event(event_queue, stopping_event, y_cable_presence) + port_mapping.handle_port_config_change(sel, asic_context, stopping_event, self.port_mapping, helper_logger, port_change_event_handler) # Retry those logical ports whose EEPROM reading failed or timeout when the SFP is inserted self.retry_eeprom_reading() @@ -1207,48 +1196,32 @@ def task_run(self, sfp_error_event, y_cable_presence): return self.task_process = multiprocessing.Process(target=self.task_worker, args=( - self.task_stopping_event, sfp_error_event, y_cable_presence, self.event_queue)) + self.task_stopping_event, sfp_error_event, y_cable_presence)) self.task_process.start() def task_stop(self): self.task_stopping_event.set() os.kill(self.task_process.pid, signal.SIGKILL) - def notify_port_change_event(self, port_change_event): - """Called by main thread. When main thread detects a port configuration add/remove, it puts an event to - the event queue - - Args: - port_change_event (object): port change event - """ - self.event_queue.put_nowait(port_change_event) - - def handle_port_change_event(self, event_queue, stopping_event, y_cable_presence): - """Handle port change event until the queue is empty, update local port mapping and DB data accordingly. - """ - while True: - try: - port_change_event = event_queue.get_nowait() - if port_change_event.event_type == PortChangeEvent.PORT_REMOVE: - self.on_remove_logical_port(port_change_event) - # Update y_cable related database once a logical port is removed - y_cable_helper.change_ports_status_for_y_cable_change_event( - {port_change_event.port_name:sfp_status_helper.SFP_STATUS_REMOVED}, - self.port_mapping, - y_cable_presence, - stopping_event) - self.port_mapping.handle_port_change_event(port_change_event) - elif port_change_event.event_type == PortChangeEvent.PORT_ADD: - self.port_mapping.handle_port_change_event(port_change_event) - logical_port_event_dict = self.on_add_logical_port(port_change_event) - # Update y_cable related database once a logical port is added - y_cable_helper.change_ports_status_for_y_cable_change_event( - logical_port_event_dict, - self.port_mapping, - y_cable_presence, - stopping_event) - except queue.Empty: - break + def on_port_config_change(self, stopping_event, y_cable_presence, port_change_event): + if port_change_event.event_type == port_mapping.PortChangeEvent.PORT_REMOVE: + self.on_remove_logical_port(port_change_event) + # Update y_cable related database once a logical port is removed + y_cable_helper.change_ports_status_for_y_cable_change_event( + {port_change_event.port_name:sfp_status_helper.SFP_STATUS_REMOVED}, + self.port_mapping, + y_cable_presence, + stopping_event) + self.port_mapping.handle_port_change_event(port_change_event) + elif port_change_event.event_type == port_mapping.PortChangeEvent.PORT_ADD: + self.port_mapping.handle_port_change_event(port_change_event) + logical_port_event_dict = self.on_add_logical_port(port_change_event) + # Update y_cable related database once a logical port is added + y_cable_helper.change_ports_status_for_y_cable_change_event( + logical_port_event_dict, + self.port_mapping, + y_cable_presence, + stopping_event) def on_remove_logical_port(self, port_change_event): """Called when a logical port is removed from CONFIG_DB. @@ -1401,8 +1374,6 @@ def __init__(self, log_identifier): self.stop_event = threading.Event() self.sfp_error_event = multiprocessing.Event() self.y_cable_presence = [False] - self.port_change_event_observers = [] - self.port_mapping = PortMapping() # Signal handler def signal_handler(self, sig, frame): @@ -1423,87 +1394,22 @@ def wait_for_port_config_done(self, namespace): appl_db = daemon_base.db_connect("APPL_DB", namespace=namespace) sel = swsscommon.Select() - sst = swsscommon.SubscriberStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME) - sel.addSelectable(sst) + port_tbl = swsscommon.SubscriberStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME) + sel.addSelectable(port_tbl) # Make sure this daemon started after all port configured while not self.stop_event.is_set(): - (state, c) = sel.select(SELECT_TIMEOUT_MSECS) + (state, c) = sel.select(port_mapping.SELECT_TIMEOUT_MSECS) if state == swsscommon.Select.TIMEOUT: continue if state != swsscommon.Select.OBJECT: self.log_warning("sel.select() did not return swsscommon.Select.OBJECT") continue - (key, op, fvp) = sst.pop() + (key, op, fvp) = port_tbl.pop() if key in ["PortConfigDone", "PortInitDone"]: break - def handle_port_config_change(self): - """Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers - """ - sel = swsscommon.Select() - asic_context = {} - namespaces = multi_asic.get_front_end_namespaces() - for namespace in namespaces: - config_db = daemon_base.db_connect("CONFIG_DB", namespace=namespace) - asic_id = multi_asic.get_asic_index_from_namespace(namespace) - sst = swsscommon.SubscriberStateTable(config_db, swsscommon.CFG_PORT_TABLE_NAME) - asic_context[sst] = asic_id - sel.addSelectable(sst) - - while not self.stop_event.is_set(): - (state, _) = sel.select(SELECT_TIMEOUT_MSECS) - if state == swsscommon.Select.TIMEOUT: - continue - if state != swsscommon.Select.OBJECT: - self.log_warning("sel.select() did not return swsscommon.Select.OBJECT") - continue - - for sst in asic_context.keys(): - while True: - (key, op, fvp) = sst.pop() - if not key: - break - if op == swsscommon.SET_COMMAND: - # Only process new logical port, here we assume that the physical index for a logical port should - # never change, if user configure a new value for the "index" field for exsitng locial port, it is - # an error - if not self.port_mapping.is_logical_port(key): - fvp = dict(fvp) - if 'index' in fvp: - port_change_event = PortChangeEvent(key, fvp['index'], asic_context[sst], PortChangeEvent.PORT_ADD) - self.notify_port_change_event(port_change_event) - elif op == swsscommon.DEL_COMMAND: - if self.port_mapping.is_logical_port(key): - port_change_event = PortChangeEvent(key, - self.port_mapping.get_logical_to_physical(key)[0], - asic_context[sst], - PortChangeEvent.PORT_REMOVE) - self.notify_port_change_event(port_change_event) - else: - self.log_warning("Invalid DB operation: {}".format(op)) - - def notify_port_change_event(self, port_change_event): - """Notify observers that there is a port change event - - Args: - port_change_event (object): port change event - """ - - self.log_notice('Sending port change event {} to observers'.format(port_change_event)) - for observer in self.port_change_event_observers: - observer.notify_port_change_event(port_change_event) - self.port_mapping.handle_port_change_event(port_change_event) - - def subscribe_port_change_event(self, observer): - """Subscribe port change event - - Args: - observer (object): observer who listen to port change event - """ - self.port_change_event_observers.append(observer) - def load_media_settings(self): global g_dict (platform_path, _) = device_info.get_paths_to_platform_and_hwsku_dirs() @@ -1515,20 +1421,6 @@ def load_media_settings(self): with open(media_settings_file_path, "r") as media_file: g_dict = json.load(media_file) - - def init_port_mapping(self): - """Initialize port mapping from CONFIG_DB for the first run - """ - namespaces = multi_asic.get_front_end_namespaces() - for namespace in namespaces: - asic_id = multi_asic.get_asic_index_from_namespace(namespace) - config_db = daemon_base.db_connect("CONFIG_DB", namespace=namespace) - port_table = swsscommon.Table(config_db, swsscommon.CFG_PORT_TABLE_NAME) - for key in port_table.getKeys(): - _, port_config = port_table.get(key) - port_config_dict = dict(port_config) - port_change_event = PortChangeEvent(key, port_config_dict['index'], asic_id, PortChangeEvent.PORT_ADD) - self.port_mapping.handle_port_change_event(port_change_event) # Initialize daemon def init(self): @@ -1579,38 +1471,42 @@ def init(self): for namespace in xcvr_table_helper.namespaces: self.wait_for_port_config_done(namespace) - self.init_port_mapping() + + port_mapping_data = port_mapping.get_port_mapping() # Post all the current interface dom/sfp info to STATE_DB self.log_info("Post all port DOM/SFP info to DB") - post_port_sfp_dom_info_to_db(is_warm_start, self.port_mapping, self.stop_event) + retry_eeprom_set = post_port_sfp_dom_info_to_db(is_warm_start, port_mapping_data, self.stop_event) # Init port sfp status table self.log_info("Init port sfp status table") - init_port_sfp_status_tbl(self.port_mapping, self.stop_event) + init_port_sfp_status_tbl(port_mapping_data, self.stop_event) # Init port y_cable status table y_cable_helper.init_ports_status_for_y_cable( - platform_sfputil, platform_chassis, self.y_cable_presence, self.port_mapping, self.stop_event) + platform_sfputil, platform_chassis, self.y_cable_presence, port_mapping_data, self.stop_event) + + return port_mapping_data, retry_eeprom_set # Deinitialize daemon def deinit(self): self.log_info("Start daemon deinit...") # Delete all the information from DB and then exit - logical_port_list = self.port_mapping.logical_port_list + port_mapping_data = port_mapping.get_port_mapping() + logical_port_list = port_mapping_data.logical_port_list for logical_port_name in logical_port_list: # Get the asic to which this port belongs - asic_index = self.port_mapping.get_asic_id_for_logical_port(logical_port_name) + asic_index = port_mapping_data.get_asic_id_for_logical_port(logical_port_name) if asic_index is None: helper_logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) continue - del_port_sfp_dom_info_from_db(logical_port_name, self.port_mapping, xcvr_table_helper.get_int_tbl(asic_index), xcvr_table_helper.get_dom_tbl(asic_index)) + del_port_sfp_dom_info_from_db(logical_port_name, port_mapping_data, xcvr_table_helper.get_int_tbl(asic_index), xcvr_table_helper.get_dom_tbl(asic_index)) delete_port_from_status_table(logical_port_name, xcvr_table_helper.get_status_tbl(asic_index)) if self.y_cable_presence[0] is True: - y_cable_helper.delete_ports_status_for_y_cable(self.port_mapping) + y_cable_helper.delete_ports_status_for_y_cable(port_mapping_data) del globals()['platform_chassis'] @@ -1620,28 +1516,26 @@ def run(self): self.log_info("Starting up...") # Start daemon initialization sequence - self.init() + port_mapping_data, retry_eeprom_set = self.init() # Start the dom sensor info update thread - dom_info_update = DomInfoUpdateTask(self.port_mapping) - self.subscribe_port_change_event(dom_info_update) + dom_info_update = DomInfoUpdateTask(port_mapping_data) dom_info_update.task_run(self.y_cable_presence) # Start the sfp state info update process - sfp_state_update = SfpStateUpdateTask(self.port_mapping) - self.subscribe_port_change_event(sfp_state_update) + sfp_state_update = SfpStateUpdateTask(port_mapping_data, retry_eeprom_set) sfp_state_update.task_run(self.sfp_error_event, self.y_cable_presence) # Start the Y-cable state info update process if Y cable presence established y_cable_state_update = None if self.y_cable_presence[0] is True: - y_cable_state_update = y_cable_helper.YCableTableUpdateTask(self.port_mapping) + y_cable_state_update = y_cable_helper.YCableTableUpdateTask(port_mapping_data) y_cable_state_update.task_run() # Start main loop self.log_info("Start daemon main loop") - self.handle_port_config_change() + self.stop_event.wait() self.log_info("Stop daemon main loop") diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py index d2ecd7314..bf44fb70b 100644 --- a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py @@ -1,3 +1,10 @@ +from sonic_py_common import daemon_base +from sonic_py_common import multi_asic +from swsscommon import swsscommon + +SELECT_TIMEOUT_MSECS = 1000 + + class PortChangeEvent: PORT_ADD = 0 PORT_REMOVE = 1 @@ -75,3 +82,85 @@ def logical_port_name_to_physical_port_list(self, port_name): return self.get_logical_to_physical(port_name) else: return None + + +def subscribe_port_config_change(): + sel = swsscommon.Select() + asic_context = {} + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + config_db = daemon_base.db_connect("CONFIG_DB", namespace=namespace) + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + port_tbl = swsscommon.SubscriberStateTable(config_db, swsscommon.CFG_PORT_TABLE_NAME) + asic_context[port_tbl] = asic_id + sel.addSelectable(port_tbl) + return sel, asic_context + + +def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_change_event_handler): + """Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers + """ + if not stop_event.is_set(): + (state, _) = sel.select(SELECT_TIMEOUT_MSECS) + if state == swsscommon.Select.TIMEOUT: + return + if state != swsscommon.Select.OBJECT: + logger.log_warning('sel.select() did not return swsscommon.Select.OBJECT') + return + + read_port_config_change(asic_context, port_mapping, logger, port_change_event_handler) + + +def read_port_config_change(asic_context, port_mapping, logger, port_change_event_handler): + for port_tbl in asic_context.keys(): + while True: + (key, op, fvp) = port_tbl.pop() + if not key: + break + if op == swsscommon.SET_COMMAND: + fvp = dict(fvp) + if 'index' not in fvp: + continue + + new_physical_index = int(fvp['index']) + if not port_mapping.is_logical_port(key): + # New logical port created + port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_ADD) + port_change_event_handler(port_change_event) + else: + current_physical_index = port_mapping.get_logical_to_physical(key)[0] + if current_physical_index != new_physical_index: + port_change_event = PortChangeEvent(key, + current_physical_index, + asic_context[port_tbl], + PortChangeEvent.PORT_REMOVE) + port_change_event_handler(port_change_event) + + port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_ADD) + port_change_event_handler(port_change_event) + elif op == swsscommon.DEL_COMMAND: + if port_mapping.is_logical_port(key): + port_change_event = PortChangeEvent(key, + port_mapping.get_logical_to_physical(key)[0], + asic_context[port_tbl], + PortChangeEvent.PORT_REMOVE) + port_change_event_handler(port_change_event) + else: + logger.log_warning('Invalid DB operation: {}'.format(op)) + + +def get_port_mapping(): + """Get port mapping from CONFIG_DB + """ + port_mapping = PortMapping() + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + config_db = daemon_base.db_connect("CONFIG_DB", namespace=namespace) + port_table = swsscommon.Table(config_db, swsscommon.CFG_PORT_TABLE_NAME) + for key in port_table.getKeys(): + _, port_config = port_table.get(key) + port_config_dict = dict(port_config) + port_change_event = PortChangeEvent(key, port_config_dict['index'], asic_id, PortChangeEvent.PORT_ADD) + port_mapping.handle_port_change_event(port_change_event) + return port_mapping \ No newline at end of file diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/y_cable_helper.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/y_cable_helper.py index 1e070909e..46f4ab3ae 100644 --- a/sonic-xcvrd/xcvrd/xcvrd_utilities/y_cable_helper.py +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/y_cable_helper.py @@ -17,6 +17,7 @@ from sonic_y_cable import y_cable_vendor_mapping from swsscommon import swsscommon from . import sfp_status_helper +from .port_mapping import read_port_config_change SELECT_TIMEOUT = 1000 @@ -1436,16 +1437,12 @@ def __init__(self, port_mapping): self.task_cli_thread = None self.task_download_firmware_thread = {} self.task_stopping_event = threading.Event() - self.event_queue = queue.Queue() self.port_mapping = copy.deepcopy(port_mapping) if multi_asic.is_multi_asic(): # Load the namespace details first from the database_global.json file. swsscommon.SonicDBConfig.initializeGlobalConfig() - def notify_port_change_event(self, port_change_event): - self.event_queue.put_nowait(port_change_event) - def handle_port_change_event(self): while True: try: @@ -1461,6 +1458,7 @@ def task_worker(self): y_cable_tbl_keys = {} mux_cable_command_tbl, y_cable_command_tbl = {}, {} mux_metrics_tbl = {} + asic_context = {} sel = swsscommon.Select() @@ -1483,8 +1481,11 @@ def task_worker(self): mux_metrics_tbl[asic_id] = swsscommon.Table( state_db[asic_id], swsscommon.STATE_MUX_METRICS_TABLE_NAME) y_cable_tbl_keys[asic_id] = y_cable_tbl[asic_id].getKeys() + port_tbl = swsscommon.SubscriberStateTable(config_db[asic_id], swsscommon.CFG_PORT_TABLE_NAME) + asic_context[port_tbl] = asic_id sel.addSelectable(status_tbl[asic_id]) sel.addSelectable(mux_cable_command_tbl[asic_id]) + sel.addSelectable(port_tbl) # Listen indefinitely for changes to the HW_MUX_CABLE_TABLE in the Application DB's while True: @@ -1510,7 +1511,7 @@ def task_worker(self): # Get the corresponding namespace from redisselect db connector object namespace = redisSelectObj.getDbConnector().getNamespace() asic_index = multi_asic.get_asic_index_from_namespace(namespace) - self.handle_port_change_event() + read_port_config_change(asic_context, self.port_mapping, helper_logger, self.port_mapping.handle_port_change_event) while True: (port, op, fvp) = status_tbl[asic_index].pop()