sonic-buildimage/platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py
Andriy Yurkiv 70d71f99f5
[Mellanox] Credo Y-cable | add more log info, checks, fix exception message (#10779)
- Why I did it
Script fails when there is an exception while reading

- How I did it
Add more logs and checks. Fix wrong variable naming and messages.

- How to verify it
Provoke exception while read_eeprom() and check that it is handled properly
2022-05-19 17:36:02 +03:00

730 lines
30 KiB
Python

#
# Copyright (c) 2019-2021 NVIDIA CORPORATION & AFFILIATES.
# Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#############################################################################
# Mellanox
#
# Module contains an implementation of SONiC Platform Base API and
# provides the FANs status which are available in the platform
#
#############################################################################
try:
import subprocess
import os
from sonic_platform_base.sonic_eeprom import eeprom_dts
from sonic_py_common.logger import Logger
from . import utils
from .device_data import DeviceDataManager
from sonic_platform_base.sonic_xcvr.sfp_optoe_base import SfpOptoeBase
except ImportError as e:
raise ImportError (str(e) + "- required module not found")
try:
# python_sdk_api does not support python3 for now. Daemons like thermalctld or psud
# also import this file without actually use the sdk lib. So we catch the ImportError
# and ignore it here. Meanwhile, we have to trigger xcvrd using python2 now because it
# uses the sdk lib.
from python_sdk_api.sxd_api import *
from python_sdk_api.sx_api import *
except ImportError as e:
pass
try:
if os.environ["PLATFORM_API_UNIT_TESTING"] == "1":
# Unable to import SDK constants under unit test
# Define them here
SX_PORT_MODULE_STATUS_INITIALIZING = 0
SX_PORT_MODULE_STATUS_PLUGGED = 1
SX_PORT_MODULE_STATUS_UNPLUGGED = 2
SX_PORT_MODULE_STATUS_PLUGGED_WITH_ERROR = 3
SX_PORT_MODULE_STATUS_PLUGGED_DISABLED = 4
except KeyError:
pass
# identifier value of xSFP module which is in the first byte of the EEPROM
# if the identifier value falls into SFP_TYPE_CODE_LIST the module is treated as a SFP module and parsed according to 8472
# for QSFP_TYPE_CODE_LIST the module is treated as a QSFP module and parsed according to 8436/8636
# Originally the type (SFP/QSFP) of each module is determined according to the SKU dictionary
# where the type of each FP port is defined. The content of EEPROM is parsed according to its type.
# However, sometimes the SFP module can be fit in an adapter and then pluged into a QSFP port.
# In this case the EEPROM content is in format of SFP but parsed as QSFP, causing failure.
# To resolve that issue the type field of the xSFP module is also fetched so that we can know exectly what type the
# module is. Currently only the following types are recognized as SFP/QSFP module.
# Meanwhile, if the a module's identifier value can't be recognized, it will be parsed according to the SKU dictionary.
# This is because in the future it's possible that some new identifier value which is not regonized but backward compatible
# with the current format and by doing so it can be parsed as much as possible.
SFP_TYPE_CODE_LIST = [
'03' # SFP/SFP+/SFP28
]
QSFP_TYPE_CODE_LIST = [
'0d', # QSFP+ or later
'11' # QSFP28 or later
]
QSFP_DD_TYPE_CODE_LIST = [
'18' # QSFP-DD Double Density 8X Pluggable Transceiver
]
#variables for sdk
REGISTER_NUM = 1
DEVICE_ID = 1
SWITCH_ID = 0
SX_PORT_ATTR_ARR_SIZE = 64
PMAOS_ASE = 1
PMAOS_EE = 1
PMAOS_E = 2
PMAOS_RST = 0
PMAOS_ENABLE = 1
PMAOS_DISABLE = 2
PMMP_LPMODE_BIT = 8
MCION_TX_DISABLE_BIT = 1
#on page 0
#i2c address 0x50
MCIA_ADDR_TX_CHANNEL_DISABLE = 86
MCIA_ADDR_POWER_OVERRIDE = 93
#power set bit
MCIA_ADDR_POWER_OVERRIDE_PS_BIT = 1
#power override bit
MCIA_ADDR_POWER_OVERRIDE_POR_BIT = 0
#on page 0
#i2c address 0x51
MCIA_ADDR_TX_DISABLE = 110
MCIA_ADDR_TX_DISABLE_BIT = 6
PORT_TYPE_NVE = 8
PORT_TYPE_CPU = 4
PORT_TYPE_OFFSET = 28
PORT_TYPE_MASK = 0xF0000000
NVE_MASK = PORT_TYPE_MASK & (PORT_TYPE_NVE << PORT_TYPE_OFFSET)
CPU_MASK = PORT_TYPE_MASK & (PORT_TYPE_CPU << PORT_TYPE_OFFSET)
# parameters for SFP presence
SFP_STATUS_INSERTED = '1'
# SFP constants
SFP_PAGE_SIZE = 256
SFP_UPPER_PAGE_OFFSET = 128
SFP_VENDOR_PAGE_START = 640
BYTES_IN_DWORD = 4
# Global logger class instance
logger = Logger()
# SDK initializing stuff, called from chassis
def initialize_sdk_handle():
rc, sdk_handle = sx_api_open(None)
if (rc != SX_STATUS_SUCCESS):
logger.log_warning("Failed to open api handle, please check whether SDK is running.")
sdk_handle = None
return sdk_handle
def deinitialize_sdk_handle(sdk_handle):
if sdk_handle is not None:
rc = sx_api_close(sdk_handle)
if (rc != SX_STATUS_SUCCESS):
logger.log_warning("Failed to close api handle.")
return rc == SXD_STATUS_SUCCESS
else:
logger.log_warning("Sdk handle is none")
return False
class MlxregManager:
def __init__(self, mst_pci_device, slot_id, sdk_index):
self.mst_pci_device = mst_pci_device
self.slot_id = slot_id
self.sdk_index = sdk_index
def construct_dword(self, write_buffer):
if len(write_buffer) == 0:
return None
used_bytes_in_dword = len(write_buffer) % BYTES_IN_DWORD
res = "dword[0]=0x"
for idx, x in enumerate(write_buffer):
word = hex(x)[2:]
if (idx > 0) and (idx % BYTES_IN_DWORD) == 0:
res += ",dword[{}]=0x".format(str((idx + 1)//BYTES_IN_DWORD))
res += word.zfill(2)
if used_bytes_in_dword > 0:
res += (BYTES_IN_DWORD - used_bytes_in_dword) * "00"
return res
def write_mlxreg_eeprom(self, num_bytes, dword, device_address, page):
if not dword:
return False
try:
cmd = "mlxreg -d /dev/mst/{} --reg_name MCIA --indexes \
slot_index={},module={},device_address={},page_number={},i2c_device_address=0x50,size={},bank_number=0 \
--set {} -y".format(self.mst_pci_device, self.slot_id, self.sdk_index, device_address, page, num_bytes, dword)
subprocess.check_call(cmd, shell=True, universal_newlines=True, stdout=subprocess.DEVNULL)
except subprocess.CalledProcessError as e:
logger.log_error("Error! Unable to write data dword={} for {} port, page {} offset {}, rc = {}, err msg: {}".format(dword, self.sdk_index, page, device_address, e.returncode, e.output))
return False
return True
def read_mlxred_eeprom(self, offset, page, num_bytes):
try:
cmd = "mlxreg -d /dev/mst/{} --reg_name MCIA --indexes \
slot_index={},module={},device_address={},page_number={},i2c_device_address=0x50,size={},bank_number=0 \
--get".format(self.mst_pci_device, self.slot_id, self.sdk_index, offset, page, num_bytes)
result = subprocess.check_output(cmd, universal_newlines=True, shell=True)
except subprocess.CalledProcessError as e:
logger.log_error("Error! Unable to read data for {} port, page {} offset {}, rc = {}, err msg: {}".format(self.sdk_index, page, offset, e.returncode, e.output))
return None
return result
def parse_mlxreg_read_output(self, read_output, num_bytes):
if not read_output:
return None
res = ""
dword_num = num_bytes // BYTES_IN_DWORD
used_bytes_in_dword = num_bytes % BYTES_IN_DWORD
arr = [value for value in read_output.split('\n') if value[0:5] == "dword"]
for i in range(dword_num):
dword = arr[i].split()[2]
res += dword[2:]
if used_bytes_in_dword > 0:
# Cut needed info and insert into final hex string
# Example: 3 bytes : 0x12345600
# ^ ^
dword = arr[dword_num].split()[2]
res += dword[2 : 2 + used_bytes_in_dword * 2]
return bytearray.fromhex(res) if res else None
class SdkHandleContext(object):
def __init__(self):
self.sdk_handle = None
def __enter__(self):
self.sdk_handle = initialize_sdk_handle()
return self.sdk_handle
def __exit__(self, exc_type, exc_val, exc_tb):
deinitialize_sdk_handle(self.sdk_handle)
class SFP(SfpOptoeBase):
"""Platform-specific SFP class"""
shared_sdk_handle = None
SFP_MLNX_ERROR_DESCRIPTION_LONGRANGE_NON_MLNX_CABLE = 'Long range for non-Mellanox cable or module'
SFP_MLNX_ERROR_DESCRIPTION_ENFORCE_PART_NUMBER_LIST = 'Enforce part number list'
SFP_MLNX_ERROR_DESCRIPTION_PMD_TYPE_NOT_ENABLED = 'PMD type not enabled'
SFP_MLNX_ERROR_DESCRIPTION_PCIE_POWER_SLOT_EXCEEDED = 'PCIE system power slot exceeded'
SFP_MLNX_ERROR_DESCRIPTION_RESERVED = 'Reserved'
SFP_MLNX_ERROR_BIT_LONGRANGE_NON_MLNX_CABLE = 0x00010000
SFP_MLNX_ERROR_BIT_ENFORCE_PART_NUMBER_LIST = 0x00020000
SFP_MLNX_ERROR_BIT_PMD_TYPE_NOT_ENABLED = 0x00040000
SFP_MLNX_ERROR_BIT_PCIE_POWER_SLOT_EXCEEDED = 0x00080000
SFP_MLNX_ERROR_BIT_RESERVED = 0x80000000
def __init__(self, sfp_index, slot_id=0, linecard_port_count=0, lc_name=None):
super(SFP, self).__init__()
if slot_id == 0: # For non-modular chassis
self.index = sfp_index + 1
self.sdk_index = sfp_index
from .thermal import initialize_sfp_thermal
self._thermal_list = initialize_sfp_thermal(sfp_index)
else: # For modular chassis
# (slot_id % MAX_LC_CONUNT - 1) * MAX_PORT_COUNT + (sfp_index + 1) * (MAX_PORT_COUNT / LC_PORT_COUNT)
max_linecard_count = DeviceDataManager.get_linecard_count()
max_linecard_port_count = DeviceDataManager.get_linecard_max_port_count()
self.index = (slot_id % max_linecard_count - 1) * max_linecard_port_count + sfp_index * (max_linecard_port_count / linecard_port_count) + 1
self.sdk_index = sfp_index
from .thermal import initialize_linecard_sfp_thermal
self._thermal_list = initialize_linecard_sfp_thermal(lc_name, slot_id, sfp_index)
self.slot_id = slot_id
self.mst_pci_device = self.get_mst_pci_device()
# get MST PCI device name
def get_mst_pci_device(self):
device_name = None
try:
device_name = subprocess.check_output("ls /dev/mst/ | grep pciconf", universal_newlines=True, shell=True).strip()
except subprocess.CalledProcessError as e:
logger.log_error("Failed to find mst PCI device rc={} err.msg={}".format(e.returncode, e.output))
return device_name
@property
def sdk_handle(self):
if not SFP.shared_sdk_handle:
SFP.shared_sdk_handle = initialize_sdk_handle()
if not SFP.shared_sdk_handle:
logger.log_error('Failed to open SDK handle')
return SFP.shared_sdk_handle
def reinit(self):
"""
Re-initialize this SFP object when a new SFP inserted
:return:
"""
self.refresh_xcvr_api()
def get_presence(self):
"""
Retrieves the presence of the device
Returns:
bool: True if device is present, False if not
"""
eeprom_raw = self.read_eeprom(0, 1)
return eeprom_raw is not None
# Read out any bytes from any offset
def _read_eeprom_specific_bytes(self, offset, num_bytes):
if offset + num_bytes > SFP_VENDOR_PAGE_START:
logger.log_error("Error mismatch between page size and bytes to read (offset: {} num_bytes: {}) ".format(offset, num_bytes))
return None
eeprom_raw = []
ethtool_cmd = "ethtool -m sfp{} hex on offset {} length {}".format(self.index, offset, num_bytes)
try:
output = subprocess.check_output(ethtool_cmd,
shell=True,
universal_newlines=True)
output_lines = output.splitlines()
first_line_raw = output_lines[0]
if "Offset" in first_line_raw:
for line in output_lines[2:]:
line_split = line.split()
eeprom_raw = eeprom_raw + line_split[1:]
except subprocess.CalledProcessError as e:
return None
eeprom_raw = list(map(lambda h: int(h, base=16), eeprom_raw))
return bytearray(eeprom_raw)
# read eeprom specfic bytes beginning from offset with size as num_bytes
def read_eeprom(self, offset, num_bytes):
"""
Read eeprom specfic bytes beginning from a random offset with size as num_bytes
Returns:
bytearray, if raw sequence of bytes are read correctly from the offset of size num_bytes
None, if the read_eeprom fails
Example:
mlxreg -d /dev/mst/mt52100_pciconf0 --reg_name MCIA --indexes slot_index=0,module=1,device_address=148,page_number=0,i2c_device_address=0x50,size=16,bank_number=0 -g
Sending access register...
Field Name | Data
===================================
status | 0x00000000
slot_index | 0x00000000
module | 0x00000001
l | 0x00000000
device_address | 0x00000094
page_number | 0x00000000
i2c_device_address | 0x00000050
size | 0x00000010
bank_number | 0x00000000
dword[0] | 0x43726564
dword[1] | 0x6f202020
dword[2] | 0x20202020
dword[3] | 0x20202020
dword[4] | 0x00000000
dword[5] | 0x00000000
....
16 bytes to read from dword -> 0x437265646f2020202020202020202020 -> Credo
"""
# recalculate offset and page. Use 'ethtool' if there is no need to read vendor pages
if offset < SFP_VENDOR_PAGE_START:
return self._read_eeprom_specific_bytes(offset, num_bytes)
else:
page = (offset - SFP_PAGE_SIZE) // SFP_UPPER_PAGE_OFFSET + 1
# calculate offset per page
device_address = (offset - SFP_PAGE_SIZE) % SFP_UPPER_PAGE_OFFSET + SFP_UPPER_PAGE_OFFSET
if not self.mst_pci_device:
return None
mlxreg_mngr = MlxregManager(self.mst_pci_device, self.slot_id, self.sdk_index)
read_output = mlxreg_mngr.read_mlxred_eeprom(device_address, page, num_bytes)
return mlxreg_mngr.parse_mlxreg_read_output(read_output, num_bytes)
# write eeprom specfic bytes beginning from offset with size as num_bytes
def write_eeprom(self, offset, num_bytes, write_buffer):
"""
write eeprom specfic bytes beginning from a random offset with size as num_bytes
and write_buffer as the required bytes
Returns:
Boolean, true if the write succeeded and false if it did not succeed.
Example:
mlxreg -d /dev/mst/mt52100_pciconf0 --reg_name MCIA --indexes slot_index=0,module=1,device_address=154,page_number=5,i2c_device_address=0x50,size=1,bank_number=0 --set dword[0]=0x01000000 -y
"""
if num_bytes != len(write_buffer):
logger.log_error("Error mismatch between buffer length and number of bytes to be written")
return False
# recalculate offset and page
if offset < SFP_PAGE_SIZE:
page = 0
device_address = offset
else:
page = (offset - SFP_PAGE_SIZE) // SFP_UPPER_PAGE_OFFSET + 1
# calculate offset per page
device_address = (offset - SFP_PAGE_SIZE) % SFP_UPPER_PAGE_OFFSET + SFP_UPPER_PAGE_OFFSET
if not self.mst_pci_device:
return False
mlxreg_mngr = MlxregManager(self.mst_pci_device, self.slot_id, self.sdk_index)
dword = mlxreg_mngr.construct_dword(write_buffer)
return mlxreg_mngr.write_mlxreg_eeprom(num_bytes, dword, device_address, page)
@classmethod
def mgmt_phy_mod_pwr_attr_get(cls, power_attr_type, sdk_handle, sdk_index, slot_id):
sx_mgmt_phy_mod_pwr_attr_p = new_sx_mgmt_phy_mod_pwr_attr_t_p()
sx_mgmt_phy_mod_pwr_attr = sx_mgmt_phy_mod_pwr_attr_t()
sx_mgmt_phy_mod_pwr_attr.power_attr_type = power_attr_type
sx_mgmt_phy_mod_pwr_attr_t_p_assign(sx_mgmt_phy_mod_pwr_attr_p, sx_mgmt_phy_mod_pwr_attr)
module_id_info = sx_mgmt_module_id_info_t()
module_id_info.slot_id = slot_id
module_id_info.module_id = sdk_index
try:
rc = sx_mgmt_phy_module_pwr_attr_get(sdk_handle, module_id_info, sx_mgmt_phy_mod_pwr_attr_p)
assert SX_STATUS_SUCCESS == rc, "sx_mgmt_phy_module_pwr_attr_get failed {}".format(rc)
sx_mgmt_phy_mod_pwr_attr = sx_mgmt_phy_mod_pwr_attr_t_p_value(sx_mgmt_phy_mod_pwr_attr_p)
pwr_mode_attr = sx_mgmt_phy_mod_pwr_attr.pwr_mode_attr
return pwr_mode_attr.admin_pwr_mode_e, pwr_mode_attr.oper_pwr_mode_e
finally:
delete_sx_mgmt_phy_mod_pwr_attr_t_p(sx_mgmt_phy_mod_pwr_attr_p)
def get_lpmode(self):
"""
Retrieves the lpmode (low power mode) status of this SFP
Returns:
A Boolean, True if lpmode is enabled, False if disabled
"""
if utils.is_host():
# To avoid performance issue,
# call class level method to avoid initialize the whole sonic platform API
get_lpmode_code = 'from sonic_platform import sfp;\n' \
'with sfp.SdkHandleContext() as sdk_handle:' \
'print(sfp.SFP._get_lpmode(sdk_handle, {}, {}))'.format(self.sdk_index, self.slot_id)
lpm_cmd = "docker exec pmon python3 -c \"{}\"".format(get_lpmode_code)
try:
output = subprocess.check_output(lpm_cmd, shell=True, universal_newlines=True)
return 'True' in output
except subprocess.CalledProcessError as e:
print("Error! Unable to get LPM for {}, rc = {}, err msg: {}".format(self.sdk_index, e.returncode, e.output))
return False
else:
return self._get_lpmode(self.sdk_handle, self.sdk_index, self.slot_id)
@classmethod
def _get_lpmode(cls, sdk_handle, sdk_index, slot_id):
"""Class level method to get low power mode.
Args:
sdk_handle: SDK handle
sdk_index (integer): SDK port index
slot_id (integer): Slot ID
Returns:
[boolean]: True if low power mode is on else off
"""
_, oper_pwr_mode = cls.mgmt_phy_mod_pwr_attr_get(SX_MGMT_PHY_MOD_PWR_ATTR_PWR_MODE_E, sdk_handle, sdk_index, slot_id)
return oper_pwr_mode == SX_MGMT_PHY_MOD_PWR_MODE_LOW_E
def reset(self):
"""
Reset SFP and return all user module settings to their default state.
Returns:
A boolean, True if successful, False if not
refer plugins/sfpreset.py
"""
if utils.is_host():
# To avoid performance issue,
# call class level method to avoid initialize the whole sonic platform API
reset_code = 'from sonic_platform import sfp;\n' \
'with sfp.SdkHandleContext() as sdk_handle:' \
'print(sfp.SFP._reset(sdk_handle, {}, {}))' \
.format(self.sdk_index, self.slot_id)
reset_cmd = "docker exec pmon python3 -c \"{}\"".format(reset_code)
try:
output = subprocess.check_output(reset_cmd, shell=True, universal_newlines=True)
return 'True' in output
except subprocess.CalledProcessError as e:
print("Error! Unable to set LPM for {}, rc = {}, err msg: {}".format(self.sdk_index, e.returncode, e.output))
return False
else:
return self._reset(self.sdk_handle, self.sdk_index, self.slot_id)
@classmethod
def _reset(cls, sdk_handle, sdk_index, slot_id):
module_id_info = sx_mgmt_module_id_info_t()
module_id_info.slot_id = slot_id
module_id_info.module_id = sdk_index
rc = sx_mgmt_phy_module_reset(sdk_handle, module_id_info)
if rc != SX_STATUS_SUCCESS:
logger.log_error("Error occurred when resetting SFP module {}, slot {}, error code {}".format(sdk_index, slot_id, rc))
return rc == SX_STATUS_SUCCESS
@classmethod
def is_nve(cls, port):
return (port & NVE_MASK) != 0
@classmethod
def is_cpu(cls, port):
return (port & CPU_MASK) != 0
@classmethod
def is_port_admin_status_up(cls, sdk_handle, log_port):
oper_state_p = new_sx_port_oper_state_t_p()
admin_state_p = new_sx_port_admin_state_t_p()
module_state_p = new_sx_port_module_state_t_p()
rc = sx_api_port_state_get(sdk_handle, log_port, oper_state_p, admin_state_p, module_state_p)
assert rc == SXD_STATUS_SUCCESS, "sx_api_port_state_get failed, rc = %d" % rc
admin_state = sx_port_admin_state_t_p_value(admin_state_p)
delete_sx_port_oper_state_t_p(oper_state_p)
delete_sx_port_admin_state_t_p(admin_state_p)
delete_sx_port_module_state_t_p(module_state_p)
return admin_state == SX_PORT_ADMIN_STATUS_UP
@classmethod
def set_port_admin_status_by_log_port(cls, sdk_handle, log_port, admin_status):
rc = sx_api_port_state_set(sdk_handle, log_port, admin_status)
if SX_STATUS_SUCCESS != rc:
logger.log_error("sx_api_port_state_set failed, rc = %d" % rc)
return SX_STATUS_SUCCESS == rc
@classmethod
def get_logical_ports(cls, sdk_handle, sdk_index, slot_id):
# Get all the ports related to the sfp, if port admin status is up, put it to list
port_attributes_list = new_sx_port_attributes_t_arr(SX_PORT_ATTR_ARR_SIZE)
port_cnt_p = new_uint32_t_p()
uint32_t_p_assign(port_cnt_p, SX_PORT_ATTR_ARR_SIZE)
rc = sx_api_port_device_get(sdk_handle, DEVICE_ID , SWITCH_ID, port_attributes_list, port_cnt_p)
assert rc == SX_STATUS_SUCCESS, "sx_api_port_device_get failed, rc = %d" % rc
port_cnt = uint32_t_p_value(port_cnt_p)
log_port_list = []
for i in range(0, port_cnt):
port_attributes = sx_port_attributes_t_arr_getitem(port_attributes_list, i)
if not cls.is_nve(int(port_attributes.log_port)) \
and not cls.is_cpu(int(port_attributes.log_port)) \
and port_attributes.port_mapping.module_port == sdk_index \
and port_attributes.port_mapping.slot == slot_id \
and cls.is_port_admin_status_up(sdk_handle, port_attributes.log_port):
log_port_list.append(port_attributes.log_port)
delete_sx_port_attributes_t_arr(port_attributes_list)
delete_uint32_t_p(port_cnt_p)
return log_port_list
@classmethod
def mgmt_phy_mod_pwr_attr_set(cls, sdk_handle, sdk_index, slot_id, power_attr_type, admin_pwr_mode):
result = False
sx_mgmt_phy_mod_pwr_attr = sx_mgmt_phy_mod_pwr_attr_t()
sx_mgmt_phy_mod_pwr_mode_attr = sx_mgmt_phy_mod_pwr_mode_attr_t()
sx_mgmt_phy_mod_pwr_attr.power_attr_type = power_attr_type
sx_mgmt_phy_mod_pwr_mode_attr.admin_pwr_mode_e = admin_pwr_mode
sx_mgmt_phy_mod_pwr_attr.pwr_mode_attr = sx_mgmt_phy_mod_pwr_mode_attr
sx_mgmt_phy_mod_pwr_attr_p = new_sx_mgmt_phy_mod_pwr_attr_t_p()
sx_mgmt_phy_mod_pwr_attr_t_p_assign(sx_mgmt_phy_mod_pwr_attr_p, sx_mgmt_phy_mod_pwr_attr)
module_id_info = sx_mgmt_module_id_info_t()
module_id_info.slot_id = slot_id
module_id_info.module_id = sdk_index
try:
rc = sx_mgmt_phy_module_pwr_attr_set(sdk_handle, SX_ACCESS_CMD_SET, module_id_info, sx_mgmt_phy_mod_pwr_attr_p)
if SX_STATUS_SUCCESS != rc:
logger.log_error("Error occurred when setting power mode for SFP module {}, slot {}, error code {}".format(sdk_index, slot_id, rc))
result = False
else:
result = True
finally:
delete_sx_mgmt_phy_mod_pwr_attr_t_p(sx_mgmt_phy_mod_pwr_attr_p)
return result
@classmethod
def _set_lpmode_raw(cls, sdk_handle, sdk_index, slot_id, ports, attr_type, power_mode):
result = False
# Check if the module already works in the same mode
admin_pwr_mode, oper_pwr_mode = cls.mgmt_phy_mod_pwr_attr_get(attr_type, sdk_handle, sdk_index, slot_id)
if (power_mode == SX_MGMT_PHY_MOD_PWR_MODE_LOW_E and oper_pwr_mode == SX_MGMT_PHY_MOD_PWR_MODE_LOW_E) \
or (power_mode == SX_MGMT_PHY_MOD_PWR_MODE_AUTO_E and admin_pwr_mode == SX_MGMT_PHY_MOD_PWR_MODE_AUTO_E):
return True
try:
# Bring the port down
for port in ports:
cls.set_port_admin_status_by_log_port(sdk_handle, port, SX_PORT_ADMIN_STATUS_DOWN)
# Set the desired power mode
result = cls.mgmt_phy_mod_pwr_attr_set(sdk_handle, sdk_index, slot_id, attr_type, power_mode)
finally:
# Bring the port up
for port in ports:
cls.set_port_admin_status_by_log_port(sdk_handle, port, SX_PORT_ADMIN_STATUS_UP)
return result
def set_lpmode(self, lpmode):
"""
Sets the lpmode (low power mode) of SFP
Args:
lpmode: A Boolean, True to enable lpmode, False to disable it
Note : lpmode can be overridden by set_power_override
Returns:
A boolean, True if lpmode is set successfully, False if not
"""
if utils.is_host():
# To avoid performance issue,
# call class level method to avoid initialize the whole sonic platform API
set_lpmode_code = 'from sonic_platform import sfp;\n' \
'with sfp.SdkHandleContext() as sdk_handle:' \
'print(sfp.SFP._set_lpmode({}, sdk_handle, {}, {}))' \
.format('True' if lpmode else 'False', self.sdk_index, self.slot_id)
lpm_cmd = "docker exec pmon python3 -c \"{}\"".format(set_lpmode_code)
# Set LPM
try:
output = subprocess.check_output(lpm_cmd, shell=True, universal_newlines=True)
return 'True' in output
except subprocess.CalledProcessError as e:
print("Error! Unable to set LPM for {}, rc = {}, err msg: {}".format(self.sdk_index, e.returncode, e.output))
return False
else:
return self._set_lpmode(lpmode, self.sdk_handle, self.sdk_index, self.slot_id)
@classmethod
def _set_lpmode(cls, lpmode, sdk_handle, sdk_index, slot_id):
log_port_list = cls.get_logical_ports(sdk_handle, sdk_index, slot_id)
sdk_lpmode = SX_MGMT_PHY_MOD_PWR_MODE_LOW_E if lpmode else SX_MGMT_PHY_MOD_PWR_MODE_AUTO_E
cls._set_lpmode_raw(sdk_handle,
sdk_index,
slot_id,
log_port_list,
SX_MGMT_PHY_MOD_PWR_ATTR_PWR_MODE_E,
sdk_lpmode)
logger.log_info("{} low power mode for module {}, slot {}".format("Enabled" if lpmode else "Disabled", sdk_index, slot_id))
return True
def is_replaceable(self):
"""
Indicate whether this device is replaceable.
Returns:
bool: True if it is replaceable.
"""
return True
def _get_error_code(self):
"""
Get error code of the SFP module
Returns:
The error code fetch from SDK API
"""
module_id_info_list = new_sx_mgmt_module_id_info_t_arr(1)
module_info_list = new_sx_mgmt_phy_module_info_t_arr(1)
module_id_info = sx_mgmt_module_id_info_t()
module_id_info.slot_id = 0
module_id_info.module_id = self.sdk_index
sx_mgmt_module_id_info_t_arr_setitem(module_id_info_list, 0, module_id_info)
rc = sx_mgmt_phy_module_info_get(self.sdk_handle, module_id_info_list, 1, module_info_list)
assert SX_STATUS_SUCCESS == rc, "sx_mgmt_phy_module_info_get failed, error code {}".format(rc)
mod_info = sx_mgmt_phy_module_info_t_arr_getitem(module_info_list, 0)
return mod_info.module_state.oper_state, mod_info.module_state.error_type
@classmethod
def _get_error_description_dict(cls):
return {0: cls.SFP_ERROR_DESCRIPTION_POWER_BUDGET_EXCEEDED,
1: cls.SFP_MLNX_ERROR_DESCRIPTION_LONGRANGE_NON_MLNX_CABLE,
2: cls.SFP_ERROR_DESCRIPTION_I2C_STUCK,
3: cls.SFP_ERROR_DESCRIPTION_BAD_EEPROM,
4: cls.SFP_MLNX_ERROR_DESCRIPTION_ENFORCE_PART_NUMBER_LIST,
5: cls.SFP_ERROR_DESCRIPTION_UNSUPPORTED_CABLE,
6: cls.SFP_ERROR_DESCRIPTION_HIGH_TEMP,
7: cls.SFP_ERROR_DESCRIPTION_BAD_CABLE,
8: cls.SFP_MLNX_ERROR_DESCRIPTION_PMD_TYPE_NOT_ENABLED,
12: cls.SFP_MLNX_ERROR_DESCRIPTION_PCIE_POWER_SLOT_EXCEEDED,
255: cls.SFP_MLNX_ERROR_DESCRIPTION_RESERVED
}
def get_error_description(self):
"""
Get error description
Args:
error_code: The error code returned by _get_error_code
Returns:
The error description
"""
oper_status, error_code = self._get_error_code()
if oper_status == SX_PORT_MODULE_STATUS_INITIALIZING:
error_description = self.SFP_STATUS_INITIALIZING
elif oper_status == SX_PORT_MODULE_STATUS_PLUGGED:
error_description = self.SFP_STATUS_OK
elif oper_status == SX_PORT_MODULE_STATUS_UNPLUGGED:
error_description = self.SFP_STATUS_UNPLUGGED
elif oper_status == SX_PORT_MODULE_STATUS_PLUGGED_DISABLED:
error_description = self.SFP_STATUS_DISABLED
elif oper_status == SX_PORT_MODULE_STATUS_PLUGGED_WITH_ERROR:
error_description_dict = self._get_error_description_dict()
if error_code in error_description_dict:
error_description = error_description_dict[error_code]
else:
error_description = "Unknown error ({})".format(error_code)
else:
error_description = "Unknow SFP module status ({})".format(oper_status)
return error_description