From d29a496dbc63983454c19d9d682df4b2614139d3 Mon Sep 17 00:00:00 2001 From: Stepan Blyshchak <38952541+stepanblyschak@users.noreply.github.com> Date: Fri, 17 May 2019 04:49:00 +0300 Subject: [PATCH] [mlnx] refactor and fix mlnx-sfpd shutdown (#2907) * [mlnx] fix mlnx-sfpd shutdown Signed-off-by: Stepan Blyschak * fix type and handle only EINTR and EAGAIN errors from select Signed-off-by: Stepan Blyschak * handle select.error as well during init/run Signed-off-by: Stepan Blyschak --- platform/mellanox/mlnx-sfpd/scripts/mlnx-sfpd | 339 +++++++++--------- 1 file changed, 177 insertions(+), 162 deletions(-) diff --git a/platform/mellanox/mlnx-sfpd/scripts/mlnx-sfpd b/platform/mellanox/mlnx-sfpd/scripts/mlnx-sfpd index a1d2e6d9c3..764bcc7c5a 100644 --- a/platform/mellanox/mlnx-sfpd/scripts/mlnx-sfpd +++ b/platform/mellanox/mlnx-sfpd/scripts/mlnx-sfpd @@ -10,6 +10,7 @@ import os import time import syslog import signal +import select import json import threading from python_sdk_api.sx_api import * @@ -27,9 +28,12 @@ STATUS_PLUGIN = '1' STATUS_PLUGOUT = '0' STATUS_UNKNOWN = '2' -SFPD_LIVENESS_UPDATE_INTERVAL_SECS = 30 +SFPD_LIVENESS_EXPIRE_SECS = 30 -sfp_value_status_dict = {SDK_SFP_STATE_IN:STATUS_PLUGIN, SDK_SFP_STATE_OUT:STATUS_PLUGOUT} +sfp_value_status_dict = { + SDK_SFP_STATE_IN: STATUS_PLUGIN, + SDK_SFP_STATE_OUT: STATUS_PLUGOUT, +} # ========================== Syslog wrappers ========================== def log_info(msg, also_print_to_console=False): @@ -56,188 +60,199 @@ def log_error(msg, also_print_to_console=False): if also_print_to_console: print(msg) -# ========================== Signal Handling ========================== -def signal_handler(sig, frame): - if sig == signal.SIGHUP: - log_info("Caught SIGHUP - ignoring...") - return - elif sig == signal.SIGINT: - log_info("Caught SIGINT - exiting...") - sys.exit(128 + sig) - elif sig == signal.SIGTERM: - log_info("Caught SIGTERM - exiting...") - sys.exit(128 + sig) - else: - log_warning("Caught unhandled signal '" + sig + "'") +# ========================== MlnxSfpd class ========================== +class MlnxSfpd: + ''' Listen to plugin/plugout cable events ''' + SX_OPEN_RETRIES = 20 + SELECT_TIMEOUT = 1 -def sx_recv(fd_p, handle): - # recv parameters - pkt_size = 2000 - pkt_size_p = new_uint32_t_p() - uint32_t_p_assign(pkt_size_p, pkt_size) - pkt = new_uint8_t_arr(pkt_size) - recv_info_p = new_sx_receive_info_t_p() - pmpe_t = sx_event_pmpe_t() - logical_port_list = new_sx_port_log_id_t_arr(4) - port_attributes_list = new_sx_port_attributes_t_arr(64) - port_cnt_p = new_uint32_t_p() - uint32_t_p_assign(port_cnt_p,64) - label_port_list = [] - status = True - module_state = 0 + def __init__(self): + self.swid = 0 + self.running = False + self.handle = None - rc = sx_lib_host_ifc_recv(fd_p, pkt, pkt_size_p, recv_info_p) - if rc != 0: - log_error("event receive exit with error, rc %d" % rc) - status = False - return status, label_port_list, module_state + # Allocate SDK fd and user channel structures + self.rx_fd_p = new_sx_fd_t_p() + self.user_channel_p = new_sx_user_channel_t_p() - pmpe_t = recv_info_p.event_info.pmpe - port_list_size = pmpe_t.list_size - logical_port_list = pmpe_t.log_port_list - module_state = pmpe_t.module_state + self.state_db = SonicV2Connector(host=REDIS_HOSTIP) - for i in range(0, port_list_size): - logical_port = sx_port_log_id_t_arr_getitem(logical_port_list, i) - rc = sx_api_port_device_get(handle, 1 , 0, port_attributes_list, port_cnt_p) - port_cnt = uint32_t_p_value(port_cnt_p) + # Register our signal handlers + signal.signal(signal.SIGHUP, self.signal_handler) + signal.signal(signal.SIGINT, self.signal_handler) + signal.signal(signal.SIGTERM, self.signal_handler) - for i in range(0, port_cnt): - port_attributes = sx_port_attributes_t_arr_getitem(port_attributes_list,i) - if port_attributes.log_port == logical_port: - lable_port = port_attributes.port_mapping.module_port + def signal_handler(self, signum, frame): + if signum == signal.SIGHUP: + log_info("Caught SIGHUP - ignoring...") + elif signum == signal.SIGINT: + log_info("Caught SIGINT - exiting...") + self.running = False + elif signum == signal.SIGTERM: + log_info("Caught SIGINT - exiting...") + self.running = False + else: + log_warning("Caught unhandled signal '{}'".format(signum)) + + def initialize(self): + self.state_db.connect("STATE_DB") + + # open SDK API handle + # retry at most SX_OPEN_RETRIES times to wait + # until SDK is started during system startup + retry = 1 + while True: + rc, self.handle = sx_api_open(None) + if rc == SX_STATUS_SUCCESS: break - label_port_list.append(lable_port) - return status, label_port_list, module_state, + log_warning("failed to open SDK API handle... retrying {}".format(retry)) -def send_sfp_notification(db, interface, state): - sfp_notify = [interface, state] - msg = json.dumps(sfp_notify, separators=(',', ':')) - db.publish('STATE_DB', 'TRANSCEIVER_NOTIFY', msg) - return + time.sleep(2 ** retry) + retry += 1 -def update_sfpd_liveness_key(db, timeout_secs): - if db.exists('STATE_DB', 'MLNX_SFPD_TASK|LIVENESS'): - db.expire('STATE_DB', 'MLNX_SFPD_TASK|LIVENESS', timeout_secs) - else: - db.set('STATE_DB', 'MLNX_SFPD_TASK|LIVENESS', 'value', 'ok') - db.expire('STATE_DB', 'MLNX_SFPD_TASK|LIVENESS', timeout_secs) + if retry > self.SX_OPEN_RETRIES: + raise RuntimeError("failed to open SDK API handle after {} retries".format(retry)) -# Timer thread wrapper class to update mlnx-sfpd liveness info to DB periodically -class sfpd_liveness_update_task: - def __init__(self, db): - self.task_stopping_event = threading.Event() - self.task_timer = None - self.state_db = db + rc = sx_api_host_ifc_open(self.handle, self.rx_fd_p) + if rc != SX_STATUS_SUCCESS: + raise RuntimeError("sx_api_host_ifc_open exited with error, rc {}".format(rc)) - def task_run(self): - if self.task_stopping_event.isSet(): - log_error("Error: sfpd liveness update thread received stop event, exiting...") + self.user_channel_p.type = SX_USER_CHANNEL_TYPE_FD + self.user_channel_p.fd = self.rx_fd_p + + rc = sx_api_host_ifc_trap_id_register_set(self.handle, + SX_ACCESS_CMD_REGISTER, + self.swid, + SX_TRAP_ID_PMPE, + self.user_channel_p) + if rc != SX_STATUS_SUCCESS: + raise RuntimeError("sx_api_host_ifc_trap_id_register_set exited with error, rc {}".format(c)) + + def deinitialize(self): + # remove mlnx-sfpd liveness key in DB if not expired yet + if self.state_db.exists("STATE_DB", "MLNX_SFPD_TASK|LIVENESS"): + self.state_db.delete("STATE_DB", "MLNX_SFPD_TASK|LIVENESS") + + if self.handle is None: return - update_sfpd_liveness_key(self.state_db, 2*SFPD_LIVENESS_UPDATE_INTERVAL_SECS) + # unregister trap id + rc = sx_api_host_ifc_trap_id_register_set(self.handle, + SX_ACCESS_CMD_DEREGISTER, + self.swid, + SX_TRAP_ID_PMPE, + self.user_channel_p) + if rc != SX_STATUS_SUCCESS: + log_error("sx_api_host_ifc_trap_id_register_set exited with error, rc {}".format(rc)) - self.task_timer = threading.Timer(SFPD_LIVENESS_UPDATE_INTERVAL_SECS, self.task_run) - self.task_timer.start() + rc = sx_api_host_ifc_close(self.handle, self.rx_fd_p) + if rc != SX_STATUS_SUCCESS: + log_error("sx_api_host_ifc_close exited with error, rc {}".format(rc)) - def task_stop(self): - self.task_stopping_event.set() - self.task_timer.join() + rc = sx_api_close(self.handle) + if rc != SX_STATUS_SUCCESS: + log_error("sx_api_close exited with error, rc {}".format(rc)) + + def run(self): + self.running = True + + while self.running: + try: + read, _, _ = select.select([self.rx_fd_p.fd], [], [], self.SELECT_TIMEOUT) + except select.error as err: + rc, msg = err + if rc == errno.EAGAIN or rc == errno.EINTR: + continue + else: + raise + + for fd in read: + if fd == self.rx_fd_p.fd: + rc, port_list, module_state = self.on_pmpe(self.rx_fd_p) + if rc != SX_STATUS_SUCCESS: + raise RuntimeError("failed to read from {}".format(fd)) + + sfp_state = sfp_value_status_dict.get(module_state, STATUS_UNKNOWN) + if sfp_state == STATUS_UNKNOWN: + log_error("unknown module state {} on port {}".format(module_state, port)) + continue + + for port in port_list: + log_info("SFP on port {} state {}".format(port, sfp_state)) + self.send_sfp_notification(port, sfp_state) + + self.update_sfpd_liveness_key(SFPD_LIVENESS_EXPIRE_SECS) + + def send_sfp_notification(self, port, state): + sfp_notify = [port, state] + msg = json.dumps(sfp_notify, seperators=(',', ':')) + self.state_db.publish('STATE_DB', 'TRANSCEIVER_NOTIFY', msg) + + def update_sfpd_liveness_key(self, timeout_secs): + if not self.state_db.exists('STATE_DB', 'MLNX_SFPD_TASK|LIVENESS'): + self.state_db.set('STATE_DB', 'MLNX_SFPD_TASK|LIVENESS', 'value', 'ok') + self.state_db.expire('STATE_DB', 'MLNX_SFPD_TASK|LIVENESS', timeout_secs) + + def on_pmpe(self, fd_p): + ''' on port module plug event handler ''' + + # recv parameters + pkt_size = 2000 + pkt_size_p = new_uint32_t_p() + uint32_t_p_assign(pkt_size_p, pkt_size) + pkt = new_uint8_t_arr(pkt_size) + recv_info_p = new_sx_receive_info_t_p() + pmpe_t = sx_event_pmpe_t() + logical_port_list = new_sx_port_log_id_t_arr(4) + port_attributes_list = new_sx_port_attributes_t_arr(64) + port_cnt_p = new_uint32_t_p() + uint32_t_p_assign(port_cnt_p,64) + label_port_list = [] + status = True + module_state = 0 + + rc = sx_lib_host_ifc_recv(fd_p, pkt, pkt_size_p, recv_info_p) + if rc != 0: + log_error("sx_lib_host_ifc_recv exited with error, rc %d" % rc) + status = False + return status, label_port_list, module_state + + pmpe_t = recv_info_p.event_info.pmpe + port_list_size = pmpe_t.list_size + logical_port_list = pmpe_t.log_port_list + module_state = pmpe_t.module_state + + for i in xrange(port_list_size): + logical_port = sx_port_log_id_t_arr_getitem(logical_port_list, i) + rc = sx_api_port_device_get(self.handle, 1 , 0, port_attributes_list, port_cnt_p) + port_cnt = uint32_t_p_value(port_cnt_p) + + for i in xrange(port_cnt): + port_attributes = sx_port_attributes_t_arr_getitem(port_attributes_list,i) + if port_attributes.log_port == logical_port: + lable_port = port_attributes.port_mapping.module_port + break + label_port_list.append(lable_port) + + return status, label_port_list, module_state, # main start def main(): - # Register our signal handlers - signal.signal(signal.SIGHUP, signal_handler) - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) + log_info("mlnx-sfpd daemon started") - # Connect to state db for notification sending - state_db = SonicV2Connector(host=REDIS_HOSTIP) - state_db.connect(state_db.STATE_DB) + sfpd = MlnxSfpd() + try: + sfpd.initialize() + sfpd.run() + except (RuntimeError, select.error) as err: + log_error("error: {}".format(err)) + finally: + sfpd.deinitialize() - # Open SDK handler - log_info("starting mlnx-sfpd...") - rc, handle = sx_api_open(None) - retry_time = 1 - while rc != SX_STATUS_SUCCESS: - time.sleep(2**retry_time) - retry_time += 1 - rc, handle = sx_api_open(None) - if retry_time > 20: - log_error("Failed to open api handle. Please check that SDK is running.") - sys.exit(errno.EACCES) - - # Open recv fd - rx_fd_p = new_sx_fd_t_p() - rc = sx_api_host_ifc_open(handle, rx_fd_p) - if rc != 0: - log_error("sx_api_host_ifc_open exit with error, rc %d" % rc) - exit(rc) - - # Set up general host ifc parameters - swid = 0 - cmd = SX_ACCESS_CMD_REGISTER - uc_p = new_sx_user_channel_t_p() - uc_p.type = SX_USER_CHANNEL_TYPE_FD - uc_p.channel.fd = rx_fd_p - trap_id = SX_TRAP_ID_PMPE - - rc = sx_api_host_ifc_trap_id_register_set(handle, cmd, swid, trap_id, uc_p) - if rc != 0: - log_error("sx_api_host_ifc_trap_id_register_set exit with error, rc %d" % rc) - exit(rc) - - liveness_info_update = sfpd_liveness_update_task(state_db) - liveness_info_update.task_run() - - # Main loop for sfp event listening - log_info("mlnx-sfpd started") - while True: - sfp_state = STATUS_UNKNOWN - rc, port_list, module_state = sx_recv(rx_fd_p, handle) - if not rc: - log_error("Failed to recv event from SDK, please check that SDK is running.") - break - - if module_state in sfp_value_status_dict: sfp_state = sfp_value_status_dict[module_state] - - if sfp_state != STATUS_UNKNOWN: - for port in port_list: - log_info("SFP on port %d state %s" % (port, sfp_state)) - send_sfp_notification(state_db, str(port), sfp_state) - - log_info("sfp change event handling done") - - # Stop liveness update task - liveness_info_update.task_stop() - - # Remove mlnx-sfpd liveness key in DB if not expired yet. - if state_db.exists('STATE_DB', 'MLNX_SFPD_TASK|LIVENESS'): - state_db.delete(state_db, 'MLNX_SFPD_TASK|LIVENESS') - - # unregister trap id - cmd = SX_ACCESS_CMD_DEREGISTER - rc = sx_api_host_ifc_trap_id_register_set(handle, cmd, swid, trap_id, uc_p) - if rc != 0: - log_error("sx_api_host_ifc_trap_id_register_set exit with error, rc %d" % rc) - exit(rc) - - # Close read fp - rc = sx_api_host_ifc_close(handle, rx_fd_p) - if rc != 0: - log_error("sx_api_host_ifc_close exit with error, rc %d" % rc) - exit(rc) - - # Close sdk handler - rc = sx_api_close(handle) - if rc != 0: - log_error("sx_api_close exit with error, rc %d" % rc) - exit(rc) - - log_info("mlnx-sfpd exited") + log_info("mlnx-sfpd daemon exited") if __name__ == '__main__':