Skip to content

Commit

Permalink
Fix review comment and adjust unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
Junchao-Mellanox committed Sep 17, 2021
1 parent 1aa2ced commit da1ca2c
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 57 deletions.
25 changes: 14 additions & 11 deletions sonic-xcvrd/tests/test_xcvrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,20 +429,21 @@ def test_DaemonXcvrd_handle_port_config_change(self, mock_select, mock_sub_table
xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER)
xcvrd.stop_event.is_set = MagicMock(side_effect=[False, True])
mock_observer = MagicMock()
mock_observer.notify_port_change_event = MagicMock()
mock_observer.enque_port_change_event = MagicMock()
xcvrd.subscribe_port_change_event(mock_observer)

xcvrd.handle_port_config_change()
assert mock_observer.notify_port_change_event.call_count == 1
sel, asic_context = xcvrd.subscribe_port_config_change()
xcvrd.handle_port_config_change(sel, asic_context)
assert mock_observer.enque_port_change_event.call_count == 1
assert xcvrd.port_mapping.logical_port_list.count('Ethernet0')
assert xcvrd.port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0
assert xcvrd.port_mapping.get_physical_to_logical(1) == ['Ethernet0']
assert xcvrd.port_mapping.get_logical_to_physical('Ethernet0') == [1]

xcvrd.stop_event.is_set = MagicMock(side_effect=[False, True])
mock_selectable.pop = MagicMock(side_effect=[('Ethernet0', swsscommon.DEL_COMMAND, (('index', '1'), )), (None, None, None)])
xcvrd.handle_port_config_change()
assert mock_observer.notify_port_change_event.call_count == 2
xcvrd.handle_port_config_change(sel, asic_context)
assert mock_observer.enque_port_change_event.call_count == 2
assert not xcvrd.port_mapping.logical_port_list
assert not xcvrd.port_mapping.logical_to_physical
assert not xcvrd.port_mapping.physical_to_logical
Expand Down Expand Up @@ -479,13 +480,15 @@ def test_DaemonXcvrd_init_deinit(self):

@patch('xcvrd.xcvrd.DaemonXcvrd.init')
@patch('xcvrd.xcvrd.DaemonXcvrd.deinit')
@patch('xcvrd.xcvrd.DaemonXcvrd.subscribe_port_config_change', MagicMock(return_value=(0, 0)))
@patch('xcvrd.xcvrd.DaemonXcvrd.handle_port_config_change')
@patch('xcvrd.xcvrd.DomInfoUpdateTask.task_run')
@patch('xcvrd.xcvrd.SfpStateUpdateTask.task_run')
@patch('xcvrd.xcvrd.DomInfoUpdateTask.task_stop')
@patch('xcvrd.xcvrd.SfpStateUpdateTask.task_stop')
def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, mock_task_run2, mock_handle_port_config_change, mock_deinit, mock_init):
xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER)
xcvrd.stop_event.is_set = MagicMock(side_effect=[False, True])
xcvrd.run()
# TODO: more check
assert mock_task_stop1.call_count == 1
Expand All @@ -501,15 +504,15 @@ def test_DomInfoUpdateTask_handle_port_change_event(self):
port_mapping = PortMapping()
task = DomInfoUpdateTask(port_mapping)
port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD)
task.notify_port_change_event(port_change_event)
task.enque_port_change_event(port_change_event)
task.handle_port_change_event()
assert task.port_mapping.logical_port_list.count('Ethernet0')
assert task.port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0
assert task.port_mapping.get_physical_to_logical(1) == ['Ethernet0']
assert task.port_mapping.get_logical_to_physical('Ethernet0') == [1]

port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_REMOVE)
task.notify_port_change_event(port_change_event)
task.enque_port_change_event(port_change_event)
task.handle_port_change_event()
assert not task.port_mapping.logical_port_list
assert not task.port_mapping.logical_to_physical
Expand All @@ -532,7 +535,7 @@ def test_DomInfoUpdateTask_task_worker(self, mock_post_dom_th, mock_post_dom_inf
task = DomInfoUpdateTask(port_mapping)
task.task_stopping_event.wait = MagicMock(side_effect=[False, True])
port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD)
task.notify_port_change_event(port_change_event)
task.enque_port_change_event(port_change_event)
mock_detect_error.return_value = True
task.task_worker([False])
assert task.port_mapping.logical_port_list.count('Ethernet0')
Expand All @@ -559,7 +562,7 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_table_helper):
port_mapping = PortMapping()
task = SfpStateUpdateTask(port_mapping)
port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD)
task.notify_port_change_event(port_change_event)
task.enque_port_change_event(port_change_event)
wait_time = 5
while wait_time > 0:
task.handle_port_change_event(task.event_queue, stopping_event, [False])
Expand All @@ -573,7 +576,7 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_table_helper):
assert task.port_mapping.get_logical_to_physical('Ethernet0') == [1]

port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_REMOVE)
task.notify_port_change_event(port_change_event)
task.enque_port_change_event(port_change_event)
wait_time = 5
while wait_time > 0:
task.handle_port_change_event(task.event_queue, stopping_event, [False])
Expand Down Expand Up @@ -752,7 +755,7 @@ class MockTable:
dom_tbl.get = MagicMock(return_value=(True, (('key3', 'value3'),)))
dom_tbl.set = MagicMock()
mock_table_helper.get_status_tbl = MagicMock(return_value=status_tbl)
mock_table_helper.get_int_tbl = MagicMock(return_value=int_tbl)
mock_table_helper.get_intf_tbl = MagicMock(return_value=int_tbl)
mock_table_helper.get_dom_tbl = MagicMock(return_value=dom_tbl)

port_mapping = PortMapping()
Expand Down
96 changes: 50 additions & 46 deletions sonic-xcvrd/xcvrd/xcvrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ def post_port_sfp_dom_info_to_db(is_warm_start, port_mapping, stop_event=threadi
if asic_index is None:
helper_logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name))
continue
post_port_sfp_info_to_db(logical_port_name, port_mapping, xcvr_table_helper.get_int_tbl(asic_index), transceiver_dict, stop_event)
post_port_sfp_info_to_db(logical_port_name, port_mapping, xcvr_table_helper.get_intf_tbl(asic_index), transceiver_dict, stop_event)
post_port_dom_info_to_db(logical_port_name, port_mapping, xcvr_table_helper.get_dom_tbl(asic_index), stop_event)
post_port_dom_threshold_info_to_db(logical_port_name, port_mapping, xcvr_table_helper.get_dom_tbl(asic_index), stop_event)

Expand Down Expand Up @@ -877,7 +877,7 @@ def task_stop(self):
self.task_stopping_event.set()
self.task_thread.join()

def notify_port_change_event(self, port_change_event):
def enque_port_change_event(self, port_change_event):
"""Called by main thread. When main thread detects a port configuration add/remove, it puts an event to
the event queue
Expand Down Expand Up @@ -923,7 +923,7 @@ def __init__(self, port_mapping):
self.task_stopping_event = multiprocessing.Event()
self.event_queue = multiprocessing.Queue()
self.port_mapping = copy.deepcopy(port_mapping)
# A set to hold those SFPs who fail to read EEPROM
# A set to hold those logical port names who fail to read EEPROM
self.retry_eeprom_set = set()
# To avoid retry EEPROM read too fast, record the last EEPROM read timestamp in this member
self.last_retry_eeprom_time = 0
Expand Down Expand Up @@ -1115,12 +1115,12 @@ def task_worker(self, stopping_event, sfp_error_event, y_cable_presence, event_q
update_port_transceiver_status_table(
logical_port, xcvr_table_helper.get_status_tbl(asic_index), sfp_status_helper.SFP_STATUS_INSERTED)
helper_logger.log_info("receive plug in and update port sfp status table.")
rc = post_port_sfp_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_int_tbl(asic_index), transceiver_dict)
rc = post_port_sfp_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_intf_tbl(asic_index), transceiver_dict)
# If we didn't get the sfp info, assuming the eeprom is not ready, give a try again.
if rc == SFP_EEPROM_NOT_READY:
helper_logger.log_warning("SFP EEPROM is not ready. One more try...")
time.sleep(TIME_FOR_SFP_READY_SECS)
rc = post_port_sfp_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_int_tbl(asic_index), transceiver_dict)
rc = post_port_sfp_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_intf_tbl(asic_index), transceiver_dict)
if rc == SFP_EEPROM_NOT_READY:
# If still failed to read EEPROM, put it to retry set
self.retry_eeprom_set.add(logical_port)
Expand All @@ -1135,7 +1135,7 @@ def task_worker(self, stopping_event, sfp_error_event, y_cable_presence, event_q
update_port_transceiver_status_table(
logical_port, xcvr_table_helper.get_status_tbl(asic_index), sfp_status_helper.SFP_STATUS_REMOVED)
helper_logger.log_info("receive plug out and pdate port sfp status table.")
del_port_sfp_dom_info_from_db(logical_port, self.port_mapping, xcvr_table_helper.get_int_tbl(asic_index), xcvr_table_helper.get_dom_tbl(asic_index))
del_port_sfp_dom_info_from_db(logical_port, self.port_mapping, xcvr_table_helper.get_intf_tbl(asic_index), xcvr_table_helper.get_dom_tbl(asic_index))
else:
try:
error_bits = int(value)
Expand Down Expand Up @@ -1214,7 +1214,7 @@ def task_stop(self):
self.task_stopping_event.set()
os.kill(self.task_process.pid, signal.SIGKILL)

def notify_port_change_event(self, port_change_event):
def enque_port_change_event(self, port_change_event):
"""Called by main thread. When main thread detects a port configuration add/remove, it puts an event to
the event queue
Expand Down Expand Up @@ -1261,7 +1261,7 @@ def on_remove_logical_port(self, port_change_event):
# but it is necessary because TRANSCEIVER_DOM_INFO is also updated in this sub process when a new SFP is inserted.
del_port_sfp_dom_info_from_db(port_change_event.port_name,
self.port_mapping,
xcvr_table_helper.get_int_tbl(port_change_event.asic_id),
xcvr_table_helper.get_intf_tbl(port_change_event.asic_id),
xcvr_table_helper.get_dom_tbl(port_change_event.asic_id))
delete_port_from_status_table(port_change_event.port_name, xcvr_table_helper.get_status_tbl(port_change_event.asic_id))

Expand Down Expand Up @@ -1289,7 +1289,7 @@ def on_add_logical_port(self, port_change_event):
sfp_status = None
sibling_port = None
status_tbl = xcvr_table_helper.get_status_tbl(port_change_event.asic_id)
int_tbl = xcvr_table_helper.get_int_tbl(port_change_event.asic_id)
int_tbl = xcvr_table_helper.get_intf_tbl(port_change_event.asic_id)
dom_tbl = xcvr_table_helper.get_dom_tbl(port_change_event.asic_id)
physical_port_list = self.port_mapping.logical_port_name_to_physical_port_list(port_change_event.port_name)

Expand Down Expand Up @@ -1379,7 +1379,7 @@ def retry_eeprom_reading(self):
retry_success_set = set()
for logical_port in self.retry_eeprom_set:
asic_index = self.port_mapping.get_asic_id_for_logical_port(logical_port)
rc = post_port_sfp_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_int_tbl(asic_index), transceiver_dict)
rc = post_port_sfp_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_intf_tbl(asic_index), transceiver_dict)
if rc != SFP_EEPROM_NOT_READY:
post_port_dom_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_dom_tbl(asic_index))
post_port_dom_threshold_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_dom_tbl(asic_index))
Expand Down Expand Up @@ -1439,9 +1439,7 @@ def wait_for_port_config_done(self, namespace):
if key in ["PortConfigDone", "PortInitDone"]:
break

def handle_port_config_change(self):
"""Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers
"""
def subscribe_port_config_change(self):
sel = swsscommon.Select()
asic_context = {}
namespaces = multi_asic.get_front_end_namespaces()
Expand All @@ -1452,37 +1450,41 @@ def handle_port_config_change(self):
asic_context[sst] = asic_id
sel.addSelectable(sst)

while not self.stop_event.is_set():
(state, _) = sel.select(SELECT_TIMEOUT_MSECS)
if state == swsscommon.Select.TIMEOUT:
continue
if state != swsscommon.Select.OBJECT:
self.log_warning("sel.select() did not return swsscommon.Select.OBJECT")
continue

for sst in asic_context.keys():
while True:
(key, op, fvp) = sst.pop()
if not key:
break
if op == swsscommon.SET_COMMAND:
# Only process new logical port, here we assume that the physical index for a logical port should
# never change, if user configure a new value for the "index" field for exsitng locial port, it is
# an error
if not self.port_mapping.is_logical_port(key):
fvp = dict(fvp)
if 'index' in fvp:
port_change_event = PortChangeEvent(key, fvp['index'], asic_context[sst], PortChangeEvent.PORT_ADD)
self.notify_port_change_event(port_change_event)
elif op == swsscommon.DEL_COMMAND:
if self.port_mapping.is_logical_port(key):
port_change_event = PortChangeEvent(key,
self.port_mapping.get_logical_to_physical(key)[0],
asic_context[sst],
PortChangeEvent.PORT_REMOVE)
return sel, asic_context

def handle_port_config_change(self, sel, asic_context):
"""Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers
"""
(state, _) = sel.select(SELECT_TIMEOUT_MSECS)
if state == swsscommon.Select.TIMEOUT:
return
if state != swsscommon.Select.OBJECT:
self.log_warning("sel.select() did not return swsscommon.Select.OBJECT")
return

for sst in asic_context.keys():
while True:
(key, op, fvp) = sst.pop()
if not key:
break
if op == swsscommon.SET_COMMAND:
# Only process new logical port, here we assume that the physical index for a logical port should
# never change, if user configure a new value for the "index" field for exsitng locial port, it is
# an error
if not self.port_mapping.is_logical_port(key):
fvp = dict(fvp)
if 'index' in fvp:
port_change_event = PortChangeEvent(key, fvp['index'], asic_context[sst], PortChangeEvent.PORT_ADD)
self.notify_port_change_event(port_change_event)
else:
self.log_warning("Invalid DB operation: {}".format(op))
elif op == swsscommon.DEL_COMMAND:
if self.port_mapping.is_logical_port(key):
port_change_event = PortChangeEvent(key,
self.port_mapping.get_logical_to_physical(key)[0],
asic_context[sst],
PortChangeEvent.PORT_REMOVE)
self.notify_port_change_event(port_change_event)
else:
self.log_warning("Invalid DB operation: {}".format(op))

def notify_port_change_event(self, port_change_event):
"""Notify observers that there is a port change event
Expand All @@ -1493,7 +1495,7 @@ def notify_port_change_event(self, port_change_event):

self.log_notice('Sending port change event {} to observers'.format(port_change_event))
for observer in self.port_change_event_observers:
observer.notify_port_change_event(port_change_event)
observer.enque_port_change_event(port_change_event)
self.port_mapping.handle_port_change_event(port_change_event)

def subscribe_port_change_event(self, observer):
Expand Down Expand Up @@ -1642,7 +1644,9 @@ def run(self):
# Start main loop
self.log_info("Start daemon main loop")

self.handle_port_config_change()
sel, asic_context = self.subscribe_port_config_change()
while not self.stop_event.is_set():
self.handle_port_config_change(sel, asic_context)

self.log_info("Stop daemon main loop")

Expand Down Expand Up @@ -1679,7 +1683,7 @@ def __init__(self):
appl_db = daemon_base.db_connect("APPL_DB", namespace)
self.app_port_tbl[asic_id] = swsscommon.ProducerStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME)

def get_int_tbl(self, asic_id):
def get_intf_tbl(self, asic_id):
return self.int_tbl[asic_id]

def get_dom_tbl(self, asic_id):
Expand Down

0 comments on commit da1ca2c

Please sign in to comment.